고급 기능

고급 에러 처리 및 복원성 패턴

전역 오류 처리 전략

@Configuration
public class GlobalErrorHandlingConfig {
    @Bean
    public IntegrationFlow globalErrorHandlingFlow() {
        return IntegrationFlows
            .from(ErrorChannel.class)
            .handle((message, headers) -> {
                MessagingException exception = (MessagingException) message.getPayload();
                
                // 고급 오류 로깅
                ErrorDetails errorDetails = new ErrorDetails(
                    exception.getFailedMessage(),
                    LocalDateTime.now(),
                    determineErrorSeverity(exception)
                );
                
                // 오류 분류 및 처리
                return routeError(errorDetails);
            })
            .channel("errorProcessingChannel")
            .get();
    }

    private ErrorSeverity determineErrorSeverity(MessagingException exception) {
        // 예외 유형에 따른 심각도 분류 로직
        if (exception.getCause() instanceof NetworkException) {
            return ErrorSeverity.CRITICAL;
        }
        // 다른 예외 유형에 대한 심각도 분류
        return ErrorSeverity.MINOR;
    }

    private ErrorRouteResult routeError(ErrorDetails errorDetails) {
        // 오류 유형에 따른 라우팅 및 복구 전략
        switch (errorDetails.getSeverity()) {
            case CRITICAL:
                return new ErrorRouteResult(
                    ErrorAction.RETRY_WITH_BACKOFF, 
                    errorDetails
                );
            case MAJOR:
                return new ErrorRouteResult(
                    ErrorAction.NOTIFY_SUPPORT, 
                    errorDetails
                );
            default:
                return new ErrorRouteResult(
                    ErrorAction.LOG_AND_CONTINUE, 
                    errorDetails
                );
        }
    }
}

재시도 및 폴백 메커니즘

@Configuration
public class RetryConfig {
    @Bean
    public RetryTemplate retryTemplate() {
        return RetryTemplate.builder()
            .maxAttempts(3)
            .retryPolicy(new SimpleRetryPolicy(3, 
                Map.of(
                    NetworkException.class, true,
                    ServiceUnavailableException.class, true
                )))
            .recoveryPolicy(new RecoveryCallback<Object>() {
                @Override
                public Object recover(RetryContext context) throws Exception {
                    // 모든 재시도 실패 후 폴백 로직
                    Exception lastException = context.getLastThrowable();
                    return handleFinalFailure(lastException);
                }
            })
            .exponentialBackOffPolicy()
            .build();
    }

    private Object handleFinalFailure(Exception exception) {
        // 최종 실패 처리 로직
        errorNotificationService.sendCriticalErrorAlert(exception);
        return ErrorAction.ABORT;
    }
}

고급 라우팅 전략

동적 라우팅

@Configuration
public class DynamicRoutingConfig {
    @Bean
    public IntegrationFlow dynamicRoutingFlow() {
        return IntegrationFlows
            .from("dynamicRoutingChannel")
            .route(Message<?>::getPayload, 
                mapping -> mapping
                    .subFlowMapping(Order.class, this::processOrderFlow)
                    .subFlowMapping(Payment.class, this::processPaymentFlow)
                    .subFlowMapping(User.class, this::processUserFlow)
                    .defaultSubFlowMapping(sf -> sf
                        .handle(message -> {
                            // 알 수 없는 메시지 유형 처리
                            log.warn("Unknown message type: {}", 
                                message.getPayload().getClass());
                        })
                    )
            )
            .get();
    }

    private IntegrationFlowDefinition<?> processOrderFlow(IntegrationFlowDefinition<?> flow) {
        return flow
            .transform(this::enrichOrder)
            .handle("orderProcessingService", "process")
            .channel("processedOrdersChannel");
    }

    private IntegrationFlowDefinition<?> processPaymentFlow(IntegrationFlowDefinition<?> flow) {
        return flow
            .handle("paymentValidationService", "validate")
            .route(Payment::getStatus, 
                mapping -> mapping
                    .subFlowMapping(PaymentStatus.APPROVED, 
                        sf -> sf.handle("paymentProcessingService", "processApproved"))
                    .subFlowMapping(PaymentStatus.DECLINED, 
                        sf -> sf.handle("paymentProcessingService", "processDeclined"))
            );
    }

    private IntegrationFlowDefinition<?> processUserFlow(IntegrationFlowDefinition<?> flow) {
        return flow
            .handle("userManagementService", "processUser")
            .channel("userProcessedChannel");
    }
}

고급 변환 및 보강 패턴

맞춤형 변환기

@Component
public class CustomOrderTransformer implements Transformer {
    @Override
    public Object transform(Message<?> message) {
        Order originalOrder = (Order) message.getPayload();
        
        // 복잡한 변환 로직
        EnrichedOrder enrichedOrder = new EnrichedOrder(originalOrder);
        
        // 비즈니스 규칙 적용
        applyBusinessRules(enrichedOrder);
        
        // 추가 메타데이터 보강
        enrichedOrder.setTransformationTimestamp(LocalDateTime.now());
        
        return enrichedOrder;
    }

    private void applyBusinessRules(EnrichedOrder order) {
        // 주문에 대한 고급 비즈니스 규칙 적용
        if (order.getTotalAmount().compareTo(BULK_ORDER_THRESHOLD) > 0) {
            order.setOrderType(OrderType.WHOLESALE);
            applyWholesaleDiscounts(order);
        }
    }
}

메시지 보강 패턴

@Configuration
public class MessageEnrichmentConfig {
    @Bean
    public IntegrationFlow orderEnrichmentFlow() {
        return IntegrationFlows
            .from("rawOrderChannel")
            .transform(new CustomOrderTransformer())
            .enrichHeaders(h -> h
                .header("processingStartTime", System.currentTimeMillis())
                .headerExpression("correlationId", 
                    "T(java.util.UUID).randomUUID().toString()")
                .header("orderSource", "ONLINE_CHANNEL")
            )
            .handle(new MessageEnricher())
            .channel("enrichedOrderChannel");
    }

    public class MessageEnricher implements MessageHandler {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            EnrichedOrder order = (EnrichedOrder) message.getPayload();
            
            // 외부 서비스 호출을 통한 추가 정보 보강
            CustomerProfile customerProfile = 
                customerService.getProfileById(order.getCustomerId());
            
            order.setCustomerTier(customerProfile.getTier());
            order.setCustomerSegment(customerProfile.getSegment());
            
            // 재고 정보 확인
            InventoryStatus inventoryStatus = 
                inventoryService.checkInventory(order.getLineItems());
            
            order.setFullyAvailable(inventoryStatus.isFullyAvailable());
            order.setAvailableItems(inventoryStatus.getAvailableItems());
        }
    }
}

고급 분할 및 집계 패턴

복잡한 분할 전략

@Configuration
public class AdvancedSplitterConfig {
    @Bean
    public IntegrationFlow complexSplitterFlow() {
        return IntegrationFlows
            .from("bulkOrderChannel")
            .split(new OrderBatchSplitter())
            .channel(c -> c.executor(splitTaskExecutor()))
            .aggregate(new CustomOrderAggregator())
            .channel("processedBatchOrderChannel")
            .get();
    }

    public class OrderBatchSplitter implements Splitter {
        @Override
        public List<?> split(Message<?> message) {
            OrderBatch batch = (OrderBatch) message.getPayload();
            
            return batch.getOrders().stream()
                .collect(Collectors.groupingBy(
                    this::determineProcessingGroup,
                    Collectors.toList()
                ))
                .values().stream()
                .flatMap(List::stream)
                .collect(Collectors.toList());
        }

        private String determineProcessingGroup(Order order) {
            // 주문 유형, 금액, 고객 세그먼트 등에 따른 그룹화
            if (order.getTotalAmount().compareTo(BULK_THRESHOLD) > 0) {
                return "LARGE_ORDER";
            }
            return "STANDARD_ORDER";
        }
    }

    public class CustomOrderAggregator implements Aggregator {
        @Override
        public Object aggregate(List<Message<?>> messages) {
            List<Order> processedOrders = messages.stream()
                .map(message -> (Order) message.getPayload())
                .collect(Collectors.toList());
            
            // 고급 집계 로직
            BatchProcessingResult result = new BatchProcessingResult(
                processedOrders,
                calculateOverallStatus(processedOrders)
            );
            
            return result;
        }

        private BatchProcessStatus calculateOverallStatus(List<Order> orders) {
            // 배치 처리 상태 계산 로직
            long successCount = orders.stream()
                .filter(Order::isProcessed)
                .count();
            
            double successRate = (double) successCount / orders.size();
            
            return successRate >= 0.9 ? 
                BatchProcessStatus.MOSTLY_SUCCESSFUL : 
                BatchProcessStatus.PARTIAL_FAILURE;
        }
    }
}

고급 게이트웨이 및 서비스 활성화기

비동기 게이트웨이

@MessagingGateway
public interface AsyncOrderProcessingGateway {
    @Gateway(
        requestChannel = "asyncOrderProcessingChannel", 
        replyChannel = "orderProcessingResultChannel",
        replyTimeout = 5000
    )
    CompletableFuture<OrderProcessingResult> processOrderAsync(Order order);

    @Gateway(
        requestChannel = "bulkOrderProcessingChannel",
        replyChannel = "bulkOrderResultChannel"
    )
    ListenableFuture<BatchProcessingResult> processBulkOrderAsync(List<Order> orders);
}

고급 서비스 활성화기

@Configuration
public class AdvancedServiceActivatorConfig {
    @Bean
    public IntegrationFlow advancedOrderProcessingFlow() {
        return IntegrationFlows
            .from("orderProcessingChannel")
            .handle(new ServiceActivator() {
                @Override
                public Object handle(Message<?> message) throws Exception {
                    Order order = (Order) message.getPayload();
                    
                    // 복잡한 주문 처리 로직
                    OrderProcessingContext context = 
                        createProcessingContext(order, message.getHeaders());
                    
                    OrderProcessingResult result = 
                        processOrder(context);
                    
                    // 후처리 및 이벤트 발행
                    publishOrderProcessingEvent(result);
                    
                    return result;
                }
            })
            .channel("orderResultChannel")
            .get();
}

워크플로우 및 상태 머신 통합

@Configuration
public class OrderWorkflowConfig {
    @Bean
    public IntegrationFlow orderWorkflowStateMachine() {
        return IntegrationFlows
            .from("orderInitiationChannel")
            .handle(new StateMachineHandler<OrderState, OrderEvent>() {
                @Override
                protected void configureStateMachine(
                    StateMachineConfigurer<OrderState, OrderEvent> config
                ) {
                    config
                        .withStates()
                            .initial(OrderState.CREATED)
                            .state(OrderState.VALIDATED)
                            .state(OrderState.PROCESSING)
                            .state(OrderState.SHIPPED)
                            .end(OrderState.COMPLETED)
                            .end(OrderState.CANCELLED)
                        .withTransitions()
                            .fromState(OrderState.CREATED)
                                .event(OrderEvent.VALIDATE)
                                .toState(OrderState.VALIDATED)
                            .fromState(OrderState.VALIDATED)
                                .event(OrderEvent.PROCESS)
                                .toState(OrderState.PROCESSING)
                            // 추가 상태 전이 정의
                }
            })
            .channel("orderWorkflowResultChannel")
            .get();
}

Last updated