본문 바로가기
JAVA

R2DBC-Proxy(DB Connection Proxy) 적용하기

by 열정적인 이찬형 2024. 7. 22.

적용 목적.

 

현재 WebFlux + R2DBC 환경에서 Join이 필요할 때 DatabaseClient을 이용한 Native Query으로 작성해서 DB에서 데이터를 가져오고 있습니다.

Query를 작성할 때의 Human Issue가 발생하거나, ParameterBinding이 올바르게 진행되어 DB에 요청이 가는지 확인하기가 어렵습니다.

    public Flux<?> findTableT1(String id) {

		    return databaseClient.sql(queryStore.queryJoinTable2ByT1Id())
            .bind("id", id)
            .map((row, rowMetadata) -> findTableT1Response.ofFindTableT1Row(row))
            .all()
            .map(findTableT1Response::toDomain);
	  }
    
    
    public String queryJoinTable2ByT1Id() {
        return """
               SELECT T1.id, T1.name, ....
               FROM TABLE1 T1
                  JOIN TABLE2 T2 ON T1.id = T2.t1_id
                  JOIN TABLE3 T3 ON T2.id = T3.t2_id
                  ...
                WHERE T1.id = :id
                ...
            """;
    }

 

위와 같은 이유로, WebFlux + R2DBC 환경에서 DB에 요청되는 Query와 걸린 시간을 확인해서 Slow Query를 개발자가 즉각적으로 확인하여 성능적인 이슈가 발생하기 이전에 대처할 수 있는 방법이 필요하였습니다.

 

해당 기능을 제공하는 R2DBC-Proxy 찾게 되었습니다.

 

GitHub - r2dbc/r2dbc-proxy: R2DBC Proxying Framework

R2DBC Proxying Framework. Contribute to r2dbc/r2dbc-proxy development by creating an account on GitHub.

github.com


R2DBC - Proxy란?

 

R2DBC Proxy의 공식 문서에서는 아래와 같이 설명하고 있습니다.

R2DBC Proxy is a proxy framework providing callbacks for query executions, method invocations, and parameter bindings.

R2DBC Proxy는 Query 실행, Method 호출, Parameter 바인딩에 대한 CallBack을 제공하는 Proxy Framework 입니다.

 

R2DBC Proxy에서는 Listener을 제공하여, Application, Upper Layer Library에서 호출에 대한 CallBack을 Listener가 수신하게 됩니다.

 

Listener을 이용해서 CallBack을 받았을 때 아래와 같은 요소들을 적용시킬 수 있습니다.

  • 각 쿼리 실행에 대한 로깅
  • Slow Query 감지
  • Method 추적
  • Metric
  • 분산 추적
  • 주장과 검증
  • 자신의 행동

R2DBC Proxy는 상위 계층의 R2DBC SPI로 간주되어 사용되기 때문에, 기존 Connection Factory에 Wrapping을 진행한다면 쉽게 적용할 수 있습니다.

 

ProxyConnectionFactory을 통해서 CallBack Listener을 설정한 뒤 R2DBC SPI로 인식하도록 합니다.

 

- 핵심 개념

 

1) Proxies

 

R2DBC SPI Object(Connection, Statement, Batch, Result, Row)에 대해서 ProxyConnectionFactory으로 Wrapping 합니다.

 

해당 Proxy에 등록된 Listener을 통해서 Callback이 호출되면 정의된 작업을 진행하게 됩니다.

 

2) Listeners

 

CallBack 호출이 왔을 때 실질적으로 작업을 진행하는 역할입니다.

 

ProxyExecutionListener의 Interface을 통해서 정의 할 수 있습니다.

 

[before|after]Query : Query 처리 CallBack 정의

 

[before|after]Method : Method 처리 CallBack 정의

 

eachQueryResult : Query 결과인 Row.get()을 호출 CallBack 정의

 

Listener을 환경과 정책에 맞게 Custom해서 사용해서 적용하시면 됩니다.

 

3) Formatters

 

Listener가 CallBack으로 받은 Parameter(MethodExecutionInfo, QueryExecutionInfo)을 어떻게 Formating 할 지 결정하며, 대부분 Log을 찍기 위한 응답값을 Custom합니다.

 

기본적인 Utility 클래스로는 QueryExecutionInfoFormatter와 MethodExecutionInfoFormatter 가 존재합니다.

 

QueryExecutionInfoFormatter : QueryExecutionInfo의 정보를 String 형태로 Formatting 합니다.

 

MethodExecutionInfoFormatter : MethodExecutionInfo 의 정보를 String 형태로 Formatting 합니다.

 

→ 해당 결과는 적용하기에서 확인하실 수 있습니다.

 

또한 기본적인 Utility 클래스가 아닌 Custom해서 각 환경과 정책에 맞게 적용이 가능합니다.

 

4) Converters

 

Query 호출 결과를 변환하는 용도이며, 현재는 BindParameterConverterResultRowConverter을 지원하고 있습니다.

 

BindParameterConverter : Binding된 Parameter 정보를 변환합니다.

 

ResultRowConverter : Query 결과 Row을 변환합니다.

 


R2DBC-PROXY 적용하기

 

- Dependencies

 

[Maven]

<dependency>
  <groupId>io.r2dbc</groupId>
  <artifactId>r2dbc-proxy</artifactId>
  <version>${version}</version>
</dependency>

 

[Gradle]

dependencies {
		implementation("io.r2dbc:r2dbc-proxy")
}

 

- R2DBC - Proxy Connection SetUp

 

[URL-base]

# with driver
r2dbc:proxy:mariadb://localhost:3306/myDB?proxyListener=com.example.MyListener

# with pooling
r2dbc:proxy:pool:mariadb://localhost:3306/myDB?proxyListener=com.example.MyListener&maxIdleTime=PT60S

[Programmatic]

ConnectionFactory factory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
   .option(ConnectionFactoryOptions.DRIVER, "proxy")
   .option(ConnectionFactoryOptions.PROTOCOL, "pool:mariadb")
   .option(ConnectionFactoryOptions.HOST, "localhost")
   .option(ConnectionFactoryOptions.PORT, 3306)
   .option(ConnectionFactoryOptions.DATABASE, "myDB")
   .option(ProxyConnectionFactoryProvider.PROXY_LISTENERS, myListener)
   .build());

 

proxyListener는 완전한 Listener을 설정해주어야 하며, ProxyConnectionFactory을 Wrapping할 때 구현할 것이라면 적용시키지 않고 이후에 따로 정의해주어야 합니다.

 

ConnectionFactory factory = ...

//ProxyConnectionFactory으로 Wrapping할 때 Listener 정의
ConnectionFactory connectionFactory = ProxyConnectionFactory.builder(factory)
    .onAfterQuery(queryInfo ->
        ...  // after query callback logic
    )
    .onBeforeMethod(methodInfo ->
        ...  // before method callback logic
    )
    .listener(...)  // add listener
    .build();
    
 ---------------------------------------------------------------------
 
 //proxy:pool:mariadb, pool을 사용할 때
 ConnectionFactory factory = ...
 
 ConnectionPoolConfiguration configuration = ...
 
 ConnectionPool connectionPool = new ConnectionPool(configuration);
 
 ConnectionFactory connectionFactory = ProxyConnectionFactory.builder(connectionPool)
    .onAfterQuery(queryInfo ->
        ...  // after query callback logic
    )
    .onBeforeMethod(methodInfo ->
        ...  // before method callback logic
    )
    .listener(...)  // add listener
    .build();

 

- Components

 

ProxyConnectionFactory

 

ConnectionFactory을 Wrapping하여 Proxy 형태로 만드는 요소입니다.

ConnectionFactory factory = ...

ConnectionFactory connectionFactory = ProxyConnectionFactory.builder(factory)
    .onAfterQuery(queryInfo ->
        ...  // Query 실행 이후, Callback Logic
    )
    .onBeforeMethod(methodInfo ->
        ...  // Query 실행 이전, Callback Logic
    )
    .listener(...)  // 정의한 Listener 추가
    .build();

 

ProxyConfig

 

미리 정의한 proxyExecutionListener, ProxyFactory, BindParameterConverter에 대한 Proxy 관련 Configuration을 설정하여 적용합니다.

 

만약, proxyConfig을 전달하지 않을 경우에는 기본 형식의 Listener, Converter 등이 자동으로 형성됩니다.

ProxyConfig proxyConfig = ...

ConnectionFactory connectionFactory = ProxyConnectionFactory.builder(original, proxyConfig)
    ...
    .build();
    
 ---------------------------------------------------------------------
 //Config을 지정하지 않을 경우 기본 구성으로 자동 생성
 ConnectionFactory connectionFactory = ProxyConnectionFactory.builder(original)
    ...
    .build();

 

 

ProxyFactory

 

프록시를 생성하기 위한 전략 인터페이스입니다.

 

기본적으로는 JdkProxyFactory으로 사용되며, JDK에 맞게 동적으로 만들어집니다.

 

또한, Custom을 제공하여, 다양한 프록시 메커니즘을 활용할 수 있습니다.

 

공식 문서에서 Custom한 예시는 아래와 같습니다.

 

r2dbc-proxy-examples/java-agent-example/common/src/main/java/io/r2dbc/examples/agent/ByteBuddyProxyFactory.java at master · ttd

Samples for r2dbc-proxy. Contribute to ttddyy/r2dbc-proxy-examples development by creating an account on GitHub.

github.com

 

ProxyFactory을 Custom할 때, ProxyConfig을 등록할 수도 있습니다.

ProxyFactory customProxyFactory = ...

ProxyConfig proxyConfig = ProxyConfig.builder()
    .proxyFactoryFactory(() -> customProxyFactory)  // CustomProxyFactory 적용
    .build();

 

ProxyExecutionListener

 

Query 실행, Method호출, Query 결과에 따른 Root Listener 들입니다.

// Method 호출할 때 CallBack
void beforeMethod(MethodExecutionInfo executionInfo);

// Method 실행 이후 CallBack
void afterMethod(MethodExecutionInfo executionInfo);

// Query가 실행되기 이전 CallBack
void beforeQuery(QueryExecutionInfo execInfo);

// Query가 실행된 이후 CallBack
void afterQuery(QueryExecutionInfo execInfo);

// 각 Query로 매핑된 결과에 대한 CallBack
void eachQueryResult(QueryExecutionInfo execInfo);

 

위에서 설명한 것처럼 Parameter를 통해서 정보를 CallBack으로 전달받고 있습니다.

 

MethodExecutionInfo : Method 정보

 

QueryExecutionInfo : Query 정보

 

ProxyMethodExecutionListener

 

ConnectionFactory, Connection, Batch, Statement,  Result에 대한 Before/After Method을 정의할 수 있습니다.

 

[before|after]<method-name>On<class-name>

public class ConnectionStartToEndListener implements ProxyMethodExecutionListener {

  @Override
  public void beforeCreateOnConnectionFactory(MethodExecutionInfo methodExecutionInfo) {
    // called before ConnectionFactory#create()
  }

  @Override
  public void afterCloseOnConnection(MethodExecutionInfo methodExecutionInfo) {
    // called after  Connection#close()
  }

}

 

Formatters

 

- QueryExecutionInfoFormatter

QueryExecutionInfoFormatter queryFormatter = QueryExecutionInfoFormatter.showAll();
String queryInfo = queryFormatter.format(queryExecutionInfo);

2024-07-09T16:57:29.476+09:00  INFO 11941 --- [actor-tcp-nio-2] com.test.test  : Execute Query : Thread:reactor-tcp-nio-2(23) Connection:2 Transaction:{Create:0 Rollback:0 Commit:0} Success:True Time:23 Type:Statement BatchSize:0 BindingsSize:0 Query:["SELECT ... FROM ... JOIN ..."] Bindings:[]

 

- MethodExecutionInfoFormatter

MethodExecutionInfoFormatter methodFormatter = MethodExecutionInfoFormatter.withDefault();

ProxyConnectionFactoryBuilder.create(connectionFactory)
     .onBeforeMethod(methodExecutionInfo ->
           log.debug("Before ExecuteMethod : {}" , methodFormatter.format(methodExecutionInfo))
      )
      . onAfterMethod(methodExecutionInfo ->
          log.debug("After ExecuteMethod : {}", methodFormatter.format(methodExecutionInfo))
      )  
  .build();

2024-07-09T17:13:54.009+09:00  INFO 12363 --- [actor-tcp-nio-2] com.test.test  : Before Execute Method :  61: Thread:23 Connection:2 Time:0  PooledConnection#close()
2024-07-09T17:13:54.010+09:00  INFO 12363 --- [actor-tcp-nio-2] com.test.test  : After Execute Method :  62: Thread:23 Connection:2 Time:1  PooledConnection#close()

 

 

- Customizing Formatter

QueryExecutionInfoFormatter customFormatter = new QueryExecutionInfoFormatter();

customFormatter.addConsumer((queryInfo, stringbuilder) -> {
   stringbuilder.append("QUERY-EXECUTION : "); // set prefix
});

customFormatter.newLine();  // new line
customFormatter.showSuccess();

customFormatter.addConsumer((queryInfo, stringbuilder)  -> {
  stringbuilder.append("Connection-Info=").append(queryInfo.getConnectionInfo().getConnectionId());
});
customFormatter.showQuery();

String log = customFormatter.format(queryExecutionInfo);

2024-07-09T17:16:16.998+09:00  INFO 12380 --- [actor-tcp-nio-2] com.test.test : Execute Query : QUERY-EXECUTION :  
Success:True Connection-Info=2 Query:["SELECT ... FROM ... JOIN ..."]

 

- BindParameterConverter

 

DB에 요청하는 Query에 대해서 Parameter가 Binding되는 작업할 때 발동합니다.

 

예를 들어, 바인딩되는 정보들에 대한 로그를 찍을 때 표현하는 방식을 수정할 수 있습니다.

 

- ResultRowConverter

 

Query 결과에 대한 Row을 가져올 때 CallBack으로 실행되는 Conveter 입니다.

Row.get()을 실행하기 이전에 작동합니다.

ResultRowConverter converter = ...
ProxyConfig proxyConfig = ProxyConfig.builder().resultRowConverter(converter).build();

// create a proxy ConnectionFactory
ConnectionFactory proxy = ProxyConnectionFactory.builder(connectionFactory, proxyConfig).build();

 

예를 들어, 데이터를 가져올 때 first_name, last_name이 존재한다면, full_name 형식의 Key에 대한 Row을 만들어서 가져올 수 있습니다.

ResultRowConverter converter = (proxyRow, method, args, getOperation) -> {
   if ("full_name".equals(args[0])) {
       String firstName = proxyRow.get("first_name", String.class);
       String lastName = proxyRow.get("last_name", String.class);
       return firstName + " " + lastName;
   }
   return getOperation.proceed();   // invoke original method
);

 


R2DBC-Proxy 사용 결과

 

Proxy을 이용해서 다양한 기능으로 사용할 수 있지만, 제가 주로 사용하는 기능에 대해서 보여드리겠습니다.

  • DB 요청 Query와 속도 측정
  • Slow Query 발생 시 해당 Query와 걸린 시간 로그

 

[R2dbcConfg.class]

ConnectionFactory을 만들 때 ProxyConnectionFactory로 Wrapping하도록 하였으며, 그에 따른 Query 처리 결과에 따른 Log를 찍고 있도록 Custom하였습니다.

Duration slowQueryLimit = Duration.ofSeconds(3L);
QueryExecutionInfoFormatter formatter = QueryExecutionInfoFormatter.showAll();
return ProxyConnectionFactory.builder(connectionPool)
     .onAfterQuery(queryInfo -> {
          if(slowQueryLimit.minus(queryInfo.getExecuteDuration()).isNegative()) {
               log.warn( String.format(
                   """
                          
                    ### Slow Query 탐지 ###
                    내용 : 실행된 Query 중 %s초 이상인 Query가 존재합니다.
                    실행시간: %s초
                    실행쿼리: %s

                    """
                    , slowQueryLimit.toSeconds()
                    , queryInfo.getExecuteDuration().toMillis() / 1000.0
                    , queryInfo.getQueries().get(0).getQuery()
                  ));
          }else{
              log.info("Execute Query : {}",  formatter.format(queryInfo));
          }
       )
.build();

 

[Query 실행하였을 때 Slow Query의 기준을 넘었을 때에는 아래와 같이 로그가 찍히게 됩니다.]

### Slow Query 탐지 ###
내용 : 실행된 Query 중 3초 이상인 Query가 존재합니다.
실행시간: 3.575초
실행쿼리: SELECT SLEEP(3.5)

 

또한, Slow Query가 아닌 경우 QueryExecutionInfoFormatter.showAll()을 사용하고 있기 때문에 기본 Fomatter으로 Query 요청 관련 정보들을 로그로 찍고 있습니다.

2024-07-09T16:57:29.476+09:00  INFO 11941 --- [actor-tcp-nio-2] com.test.test  : Execute Query : Thread:reactor-tcp-nio-2(23) Connection:2 Transaction:{Create:0 Rollback:0 Commit:0} Success:True Time:23 Type:Statement BatchSize:0 BindingsSize:0 Query:["SELECT ... FROM ... JOIN ..."] Bindings:[]

 

[추가적인 기능에 대해서 알아보고 싶으시다면, 공식 문서에서 확인하실 수 있습니다.]

 

R2DBC Proxy - R2DBC Proxying Framework

Name of the R2DBC query. (since the name contains %s the final value will be resolved at runtime)

r2dbc.io


정리

 

R2DBC을 사용하는 Reactive 환경에서 DB에 요청을 보낼 때 R2dbcRepository, DatabaseClient(NativeQuery) 등 Query을 호출하게 됩니다.

 

DB에 호출되는 Query가 올바르게 Binding되는 여부, Connection이 생긴 개수, 속도, Slow Query 감지 등을 시각적으로 확인할 수 있어야 성능 개선 및 생산성에 도움이 됩니다.(주관적 견해)

R2dbc을 사용하는 환경에서 API을 개발하면서 DB에 대한 Query 요청에 대한 정보를 Log로 실시간 확인하고 싶으시다면 R2DBC-Proxy을 사용해보시는 것을 추천드립니다.

댓글