메시지 흐름 제어와 오류 처리

기업 애플리케이션에서는 메시지 처리 과정에서 다양한 상황이 발생할 수 있습니다. 처리량이 갑자기 증가하거나, 외부 시스템이 응답하지 않거나, 예상치 못한 데이터가 유입될 수 있습니다. Spring Integration은 이러한 상황을 효과적으로 처리하기 위한 다양한 메커니즘을 제공합니다.

메시지 흐름 제어 메커니즘

폴러(Poller) 구성

폴러는 채널에서 메시지를 가져오는 빈도와 방식을 제어합니다. 특히 과 같은 폴링 채널을 사용할 때 중요합니다. QueueChannel

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .channel(c -> c.queue("processingQueue"))
        .handle("orderProcessor", "processOrder", e -> e.poller(
            Pollers.fixedRate(1000)  // 1초마다 폴링
                .maxMessagesPerPoll(10)  // 한 번에 최대 10개 메시지 처리
                .errorChannel("pollingErrorChannel")  // 폴링 중 오류 발생시 전달할 채널
                .taskExecutor(taskExecutor())  // 폴링 작업에 사용할 스레드 풀
        ))
        .channel("outputChannel")
        .get();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(25);
    executor.setThreadNamePrefix("order-processor-");
    executor.initialize();
    return executor;
}

스로틀링(Throttling)

메시지 처리 속도를 제한하여 시스템 과부하를 방지합니다:

@Bean
public MessageChannelInterceptor throttlingInterceptor() {
    ChannelInterceptorAdapter interceptor = new ChannelInterceptorAdapter() {
        private final RateLimiter rateLimiter = RateLimiter.create(10); // 초당 10개 메시지로 제한
        
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            rateLimiter.acquire(); // 토큰 획득 전까지 블로킹
            return super.preSend(message, channel);
        }
    };
    return interceptor;
}

@Bean
public DirectChannel throttledChannel() {
    DirectChannel channel = new DirectChannel();
    channel.addInterceptor(throttlingInterceptor());
    return channel;
}

회로 차단기(Circuit Breaker)

외부 시스템에 문제가 발생했을 때 지속적인 연결 시도를 방지합니다:

@Bean
public IntegrationFlow circuitBreakerFlow() {
    return IntegrationFlows
        .from("requestChannel")
        .handle("externalService", "process", 
                e -> e.advice(circuitBreakerAdvice()))
        .channel("responseChannel")
        .get();
}

@Bean
public RequestHandlerCircuitBreakerAdvice circuitBreakerAdvice() {
    RequestHandlerCircuitBreakerAdvice advice = new RequestHandlerCircuitBreakerAdvice();
    advice.setThreshold(5);  // 5번 실패하면 회로 개방
    advice.setHalfOpenAfter(30000);  // 30초 후 반개방 상태로 변경
    return advice;
}

흐름 동적 제어

조건에 따라 메시지 흐름을 동적으로 제어할 수 있습니다:

@Bean
public IntegrationFlow dynamicControlFlow() {
    return IntegrationFlows
        .from("inputChannel")
        // 조건부 게이트웨이: 처리량이 임계값 이하일 때만 처리
        .gateway(m -> metricsService.getCurrentThroughput() <= maxThroughput,
                 g -> g.transform(this::processMessage)
                       .channel("outputChannel"),
                 // 처리량 초과시 지연 메시지로 변환
                 g -> g.transform(m -> {
                          DelayedMessage delayed = new DelayedMessage(m);
                          log.info("Rate limiting applied, message delayed: {}", m.getHeaders().getId());
                          return delayed;
                      })
                       .channel("delayedMessagesChannel"))
        .get();
}

// 지연된 메시지 재처리
@Bean
public IntegrationFlow delayedMessageFlow() {
    return IntegrationFlows
        .from(delayedMessagesSource())
        .transform(DelayedMessage::getOriginalMessage)
        .channel("inputChannel")  // 재시도를 위해 원래 채널로 전송
        .get();
}

@Bean
@InboundChannelAdapter(value = "delayedMessagesChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> delayedMessagesSource() {
    return () -> {
        if (metricsService.getCurrentThroughput() <= maxThroughput * 0.8) {
            // 처리량이 충분히 낮아졌을 때 지연된 메시지 처리
            return delayedMessagesQueue.poll();
        }
        return null;
    };
}

오류 처리 전략

Spring Integration은 메시지 처리 과정에서 발생하는 다양한 오류를 처리하기 위한 여러 메커니즘을 제공합니다.

오류 채널(Error Channel)

기본적으로 모든 처리 오류는 errorChannel이라는 이름의 글로벌 오류 채널로 전송됩니다:

@Bean
public IntegrationFlow errorHandlingFlow() {
    return IntegrationFlows
        .from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
        .handle(message -> {
            MessagingException exception = (MessagingException) message.getPayload();
            Message<?> failedMessage = exception.getFailedMessage();
            
            log.error("메시지 처리 오류: [{}], 메시지: [{}]", 
                      exception.getMessage(), 
                      failedMessage.getPayload(),
                      exception);
            
            // 오류 유형별 처리
            if (exception.getCause() instanceof DataAccessException) {
                // 데이터베이스 관련 오류
                dataAccessErrorHandler.handleError(failedMessage, exception);
            } else if (exception.getCause() instanceof HttpClientErrorException) {
                // HTTP 클라이언트 오류 (4xx)
                httpClientErrorHandler.handleError(failedMessage, exception);
            } else if (exception.getCause() instanceof HttpServerErrorException) {
                // HTTP 서버 오류 (5xx)
                httpServerErrorHandler.handleError(failedMessage, exception);
            } else {
                // 기타 일반 오류
                genericErrorHandler.handleError(failedMessage, exception);
            }
        })
        .get();
}

오류 채널 라우터(Error Channel Router)

오류를 유형별로 다른 채널로 라우팅할 수 있습니다:

@Bean
public IntegrationFlow errorRoutingFlow() {
    return IntegrationFlows
        .from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
        .<MessagingException, Class<?>>route(
            exception -> exception.getCause().getClass(),
            mapping -> mapping
                .subFlowMapping(ValidationException.class, sf -> sf
                    .channel("validationErrorChannel"))
                .subFlowMapping(DataAccessException.class, sf -> sf
                    .channel("dbErrorChannel"))
                .subFlowMapping(HttpClientErrorException.class, sf -> sf
                    .channel("httpClientErrorChannel"))
                .subFlowMapping(TimeoutException.class, sf -> sf
                    .channel("timeoutErrorChannel"))
                .defaultSubFlowMapping(sf -> sf
                    .channel("genericErrorChannel")))
        .get();
}

메시지별 오류 채널 지정

특정 메시지 처리에 대한 오류 채널을 지정할 수 있습니다:

@Bean
public IntegrationFlow customErrorChannelFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .enrichHeaders(h -> h
            .header(IntegrationMessageHeaderAccessor.ERROR_CHANNEL, "orderProcessingErrorChannel"))
        .handle("orderService", "processOrder")
        .channel("outputChannel")
        .get();
}

@Bean
public IntegrationFlow orderErrorHandlingFlow() {
    return IntegrationFlows
        .from("orderProcessingErrorChannel")
        .handle(message -> {
            MessagingException exception = (MessagingException) message.getPayload();
            Message<?> failedMessage = exception.getFailedMessage();
            Order order = (Order) failedMessage.getPayload();
            
            log.error("주문 처리 오류: 주문 ID [{}], 오류: [{}]", 
                      order.getId(), 
                      exception.getMessage());
            
            // 주문 상태 업데이트
            orderRepository.updateStatus(order.getId(), OrderStatus.PROCESSING_FAILED);
            
            // 고객에게 알림
            notificationService.sendOrderErrorNotification(order, exception.getMessage());
        })
        .get();
}

재시도 메커니즘(Retry Mechanism)

일시적인 오류에 대응하기 위해 재시도 전략을 구현할 수 있습니다:

@Bean
public RequestHandlerRetryAdvice retryAdvice() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
    
    RetryTemplate retryTemplate = new RetryTemplate();
    
    // 지수 백오프 정책 설정
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);  // 첫 번째 재시도 전 1초 대기
    backOffPolicy.setMultiplier(2.0);        // 대기 시간을 2배씩 증가
    backOffPolicy.setMaxInterval(60000);     // 최대 대기 시간은 1분
    retryTemplate.setBackOffPolicy(backOffPolicy);
    
    // 재시도 정책 설정
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);  // 최대 3번 시도
    
    // 특정 예외에 대해서만 재시도
    Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
    retryableExceptions.put(ResourceAccessException.class, true);
    retryableExceptions.put(HttpServerErrorException.class, true);
    retryableExceptions.put(JmsException.class, true);
    retryableExceptions.put(DataAccessException.class, false);  // DB 오류는 재시도하지 않음
    
    retryPolicy = new SimpleRetryPolicy(3, retryableExceptions);
    retryTemplate.setRetryPolicy(retryPolicy);
    
    advice.setRetryTemplate(retryTemplate);
    
    return advice;
}

@Bean
public IntegrationFlow retryableFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .handle("externalServiceGateway", "process", 
                e -> e.advice(retryAdvice()))
        .channel("outputChannel")
        .get();
}

회로 차단기와 폴백(Fallback) 전략

외부 서비스 호출에 문제가 있을 때 대체 전략을 사용합니다:

@Bean
public IntegrationFlow serviceCallWithFallbackFlow() {
    return IntegrationFlows
        .from("requestChannel")
        .handle(message -> {
            try {
                // 주 서비스 호출 시도
                return primaryService.process(message.getPayload());
            } catch (Exception e) {
                // 실패 시 백업 서비스로 폴백
                log.warn("Primary service failed, using backup service: {}", e.getMessage());
                return backupService.process(message.getPayload());
            }
        })
        .channel("responseChannel")
        .get();
}

데드 레터 채널(Dead Letter Channel)

처리할 수 없는 메시지를 별도로 관리합니다:

@Bean
public IntegrationFlow deadLetterChannelFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .filter(payload -> {
            try {
                // 메시지 검증
                validator.validate(payload);
                return true;
            } catch (Exception e) {
                // 검증 실패한 메시지는 데드 레터 채널로 전송
                Message<?> failedMessage = MessageBuilder.withPayload(payload)
                    .setHeader("failureReason", e.getMessage())
                    .setHeader("failureTimestamp", System.currentTimeMillis())
                    .build();
                
                deadLetterChannel().send(failedMessage);
                return false;
            }
        })
        .handle("processor", "process")
        .channel("outputChannel")
        .get();
}

@Bean
public QueueChannel deadLetterChannel() {
    return new QueueChannel();
}

// 데드 레터 처리 플로우
@Bean
public IntegrationFlow deadLetterProcessingFlow() {
    return IntegrationFlows
        .from(deadLetterChannel())
        .handle(message -> {
            // 데드 레터 처리 (로깅, 데이터베이스에 저장 등)
            log.error("Dead letter received: {}, reason: {}", 
                     message.getPayload(), 
                     message.getHeaders().get("failureReason"));
            
            deadLetterRepository.save(new DeadLetter(
                message.getPayload(), 
                message.getHeaders().get("failureReason", String.class),
                new Date(message.getHeaders().get("failureTimestamp", Long.class))
            ));
        })
        .get();
}

트랜잭션 관리

Spring Integration은 메시지 처리 흐름에서 트랜잭션을 지원합니다.

기본 트랜잭션 구성

@Bean
public IntegrationFlow transactionalFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .transactional(transactionManager())
        .handle("orderService", "process")
        .channel("outputChannel")
        .get();
}

@Bean
public PlatformTransactionManager transactionManager() {
    return new DataSourceTransactionManager(dataSource);
}

다양한 트랜잭션 속성 설정

@Bean
public IntegrationFlow advancedTransactionalFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .transactional(txManager -> txManager
            .propagation(Propagation.REQUIRED)
            .isolation(Isolation.READ_COMMITTED)
            .timeout(30)  // 30초 타임아웃
            .readOnly(false)
            .transactionManager(transactionManager()))
        .split()
        .channel(c -> c.executor(taskExecutor()))
        .handle("itemProcessor", "process")
        .aggregate()
        .channel("outputChannel")
        .get();
}

분산 트랜잭션 관리

여러 리소스(JMS, 데이터베이스 등)에 걸친 트랜잭션을 관리합니다:

@Bean
public PlatformTransactionManager jtaTransactionManager() {
    JtaTransactionManager transactionManager = new JtaTransactionManager();
    transactionManager.setTransactionManagerName("java:/TransactionManager");
    return transactionManager;
}

@Bean
public IntegrationFlow jtaTransactionalFlow() {
    return IntegrationFlows
        .from(Jms.messageDrivenChannelAdapter(connectionFactory)
              .destination("incomingOrders")
              .configureListenerContainer(c -> c.sessionTransacted(true)))
        .transactional(jtaTransactionManager())
        .transform(new JsonToObjectTransformer(Order.class))
        .handle("orderService", "process")
        .handle(Jms.outboundAdapter(connectionFactory)
               .destination("processedOrders"))
        .get();
}

메시지 만료 및 시간 제한

시간 제한을 설정하여 메시지 처리의 신뢰성을 높입니다.

메시지 헤더를 통한 만료 시간 설정

@Bean
public IntegrationFlow expiringMessageFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .enrichHeaders(h -> h
            .headerExpression(IntegrationMessageHeaderAccessor.EXPIRATION_DATE, 
                               "T(java.lang.System).currentTimeMillis() + 60000")) // 1분 후 만료
        .channel("processChannel")  // 여기서 처리 지연 가능
        .handle((payload, headers) -> {
            Long expirationDate = headers.get(
                IntegrationMessageHeaderAccessor.EXPIRATION_DATE, Long.class);
            
            if (expirationDate != null && System.currentTimeMillis() > expirationDate) {
                // 만료된 메시지 처리
                log.warn("Message expired: {}", headers.getId());
                return MessageBuilder.withPayload(new ExpiredMessageResponse()).build();
            }
            
            // 정상 처리
            return processor.process(payload);
        })
        .channel("outputChannel")
        .get();
}

요청-응답 시나리오의 타임아웃 설정

@MessagingGateway(defaultRequestTimeout = 5000,  // 기본 5초 타임아웃
                 defaultReplyTimeout = 5000)
public interface OrderService {
    @Gateway(requestTimeout = 10000,  // 이 메서드는 10초 타임아웃
             replyTimeout = 10000)
    OrderConfirmation processOrder(Order order);
    
    // 기본 타임아웃(5초) 사용
    OrderStatus checkOrderStatus(String orderId);
}

실제 비즈니스 시나리오: 종합 오류 처리 전략

복잡한 주문 처리 시스템의 오류 처리와 흐름 제어를 종합적으로 구현해 보겠습니다.

@Configuration
@EnableIntegration
public class OrderProcessingIntegrationConfig {

    // 주문 처리 메인 플로우
    @Bean
    public IntegrationFlow orderProcessingFlow() {
        return IntegrationFlows
            .from("newOrdersChannel")
            // 메시지 ID 로깅
            .wireTap(flow -> flow.handle(message -> 
                log.info("Processing order: {}", message.getHeaders().getId())))
            // 트랜잭션 시작
            .transactional(txManager())
            // 주문 검증
            .filter(this::validateOrder, 
                   f -> f.discardChannel("invalidOrdersChannel"))
            // 재고 확인
            .handle("inventoryService", "checkAndReserveStock",
                   e -> e.advice(retryAdvice()))
            // 결제 처리
            .handle(message -> {
                Order order = (Order) message.getPayload();
                // 결제 처리 자체는 별도 트랜잭션에서 수행하기 위해 gateway 사용
                return paymentGateway.processPayment(order);
            })
            // 결제 결과에 따른 라우팅
            .<PaymentResult, PaymentStatus>route(PaymentResult::getStatus,
                mapping -> mapping
                    .subFlowMapping(PaymentStatus.APPROVED, sf -> sf
                        .transform(PaymentResult::getOrder)
                        .handle("fulfillmentService", "scheduleFulfillment",
                               e -> e.advice(circuitBreakerAdvice()))
                        .channel("completedOrdersChannel"))
                    .subFlowMapping(PaymentStatus.DECLINED, sf -> sf
                        .transform(PaymentResult::getOrder)
                        .handle((order, headers) -> {
                            log.warn("Payment declined for order: {}", ((Order) order).getId());
                            orderRepository.updateStatus(
                                ((Order) order).getId(), OrderStatus.PAYMENT_DECLINED);
                            return order;
                        })
                        .channel("declinedOrdersChannel"))
                    .subFlowMapping(PaymentStatus.ERROR, sf -> sf
                        .transform(PaymentResult::getOrder)
                        .channel("paymentErrorChannel")))
            .get();
    }
    
    // 인벤토리 서비스 호출을 위한 재시도 어드바이스
    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        
        RetryTemplate retryTemplate = new RetryTemplate();
        
        // 지수 백오프 정책
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        // 재시도 정책 (최대 3회)
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        
        // 재시도 가능한 예외 목록
        Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
        retryableExceptions.put(InventoryUnavailableException.class, true);
        retryableExceptions.put(ResourceAccessException.class, true);
        retryableExceptions.put(DataAccessException.class, true);
        
        retryPolicy = new SimpleRetryPolicy(3, retryableExceptions);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        advice.setRetryTemplate(retryTemplate);
        
        // 최종 실패 시 호출될 핸들러
        advice.setRecoveryCallback(context -> {
            Throwable lastThrowable = context.getLastThrowable();
            Order order = (Order) context.getAttribute("message");
            
            log.error("Retry exhausted for order: {}", order.getId(), lastThrowable);
            
            // 재고 부족 예외인 경우
            if (lastThrowable instanceof InventoryUnavailableException) {
                // 백오더로 처리
                return backOrderManager.createBackOrder(order);
            }
            
            // 기타 예외는 오류 처리
            throw new MessagingException("Failed to process order after retries", lastThrowable);
        });
        
        return advice;
    }
    
    // 서킷브레이커 어드바이스
    @Bean
    public RequestHandlerCircuitBreakerAdvice circuitBreakerAdvice() {
        RequestHandlerCircuitBreakerAdvice advice = new RequestHandlerCircuitBreakerAdvice();
        
        // 서킷 브레이커 구성
        advice.setThreshold(5);  // 5번 실패하면 회로 개방
        advice.setHalfOpenAfter(30000);  // 30초 후 반개방 상태로 변경
        
        return advice;
    }
    
    // 메인 오류 처리 플로우
    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows
            .from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
            .transform(MessagingException.class, exception -> {
                Message<?> failedMessage = exception.getFailedMessage();
                Throwable cause = exception.getCause();
                
                // 에러 정보 생성
                ErrorDetail errorDetail = new ErrorDetail();
                errorDetail.setMessageId(failedMessage.getHeaders().getId().toString());
                errorDetail.setTimestamp(new Date());
                errorDetail.setException(cause.getClass().getName());
                errorDetail.setMessage(cause.getMessage());
                errorDetail.setPayload(failedMessage.getPayload());
                
                // 중요 헤더 복사
                Map<String, Object> relevantHeaders = new HashMap<>();
                failedMessage.getHeaders().forEach((key, value) -> {
                    if (isRelevantHeader(key)) {
                        relevantHeaders.put(key, value);
                    }
                });
                errorDetail.setRelevantHeaders(relevantHeaders);
                
                return errorDetail;
            })
            .<ErrorDetail, String>route(errorDetail -> {
                // 에러 유형에 따라 라우팅
                String exceptionType = errorDetail.getException();
                
                if (exceptionType.contains("PaymentException")) {
                    return "payment";
                } else if (exceptionType.contains("InventoryException")) {
                    return "inventory";
                } else if (exceptionType.contains("FulfillmentException")) {
                    return "fulfillment";
                } else if (exceptionType.contains("DataAccessException")) {
                    return "database";
                } else if (exceptionType.contains("ValidationException")) {
                    return "validation";
                } else {
                    return "general";
                }
            }, mapping -> mapping
                .subFlowMapping("payment", sf -> sf.channel("paymentErrorChannel"))
                .subFlowMapping("inventory", sf -> sf.channel("inventoryErrorChannel"))
                .subFlowMapping("fulfillment", sf -> sf.channel("fulfillmentErrorChannel"))
                .subFlowMapping("database", sf -> sf.channel("databaseErrorChannel"))
                .subFlowMapping("validation", sf -> sf.channel("validationErrorChannel"))
                .defaultSubFlowMapping(sf -> sf.channel("generalErrorChannel")))
            .get();
    }
    
    // 결제 오류 처리 서브플로우
    @Bean
    public IntegrationFlow paymentErrorFlow() {
        return IntegrationFlows
            .from("paymentErrorChannel")
            .handle((errorDetail, headers) -> {
                log.error("Payment error: {}", errorDetail);
                
                // 원본 주문 추출
                Object payload = ((ErrorDetail) errorDetail).getPayload();
                Order order = null;
                
                if (payload instanceof Order) {
                    order = (Order) payload;
                } else if (payload instanceof PaymentResult) {
                    order = ((PaymentResult) payload).getOrder();
                }
                
                if (order != null) {
                    // 주문 상태 업데이트
                    orderRepository.updateStatus(order.getId(), OrderStatus.PAYMENT_ERROR);
                    
                    // 오류 로그 저장
                    paymentErrorRepository.save(new PaymentError(
                        order.getId(), 
                        ((ErrorDetail) errorDetail).getMessage(),
                        new Date()
                    ));
                    
                    // 알림 메시지 전송
                    notificationService.sendPaymentErrorNotification(
                        order, ((ErrorDetail) errorDetail).getMessage());
                }
                
                return null;
            })
            .get();
    }
}

Last updated