메시징 엔드포인트

메시징 엔드포인트의 개념

메시징 엔드포인트는 메시지 채널과 실제 비즈니스 로직 사이의 연결점 역할을 합니다. Spring Integration은 다양한 유형의 메시징 엔드포인트를 제공하여 메시지의 변환, 라우팅, 분할, 집계 등 여러 통합 패턴을 구현할 수 있게 합니다.

채널 어댑터(Channel Adapters)

채널 어댑터는 Spring Integration 시스템과 외부 시스템 사이의 연결을 담당합니다.

인바운드 채널 어댑터(Inbound Channel Adapters)

외부 시스템에서 메시지를 수신하여 Spring Integration 채널로 전달합니다.

@Bean
public IntegrationFlow fileReadingFlow() {
    return IntegrationFlows
        .from(Files.inboundAdapter(new File("/path/to/input/directory"))
                .patternFilter("*.txt")
                .preventDuplicates(true),
              e -> e.poller(Pollers.fixedRate(5000)))
        .transform(Files.toStringTransformer())
        .channel("fileContentChannel")
        .get();
}

아웃바운드 채널 어댑터(Outbound Channel Adapters)

Spring Integration 채널에서 메시지를 수신하여 외부 시스템으로 전달합니다.

@Bean
public IntegrationFlow fileWritingFlow() {
    return IntegrationFlows
        .from("fileOutputChannel")
        .handle(Files.outboundAdapter(new File("/path/to/output/directory"))
                .fileNameGenerator(message -> "output_" + System.currentTimeMillis() + ".txt")
                .autoCreateDirectory(true))
        .get();
}

메시지 핸들러(Message Handlers)

메시지 핸들러는 채널에서 메시지를 수신하여 처리하는 컴포넌트입니다.

서비스 액티베이터(Service Activators)

비즈니스 로직을 수행하는 서비스를 호출하는 엔드포인트입니다.

@Bean
public IntegrationFlow serviceActivatorFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .handle("orderService", "processOrder")
        .channel("outputChannel")
        .get();
}

// 또는 람다 표현식 사용
@Bean
public IntegrationFlow lambdaServiceActivatorFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .handle((payload, headers) -> {
            Order order = (Order) payload;
            return orderProcessor.process(order);
        })
        .channel("outputChannel")
        .get();
}

트랜스포머(Transformers)

메시지의 내용이나 구조를 변환하는 엔드포인트입니다.

@Bean
public IntegrationFlow transformerFlow() {
    return IntegrationFlows
        .from("inputChannel")
        // 객체를 JSON으로 변환
        .transform(new ObjectToJsonTransformer())
        // 헤더 추가
        .enrichHeaders(h -> h
            .header("Content-Type", "application/json")
            .headerExpression("timestamp", "T(java.lang.System).currentTimeMillis()"))
        .channel("outputChannel")
        .get();
}

필터(Filters)

조건에 맞는 메시지만 다음 단계로 전달하는 엔드포인트입니다.

@Bean
public IntegrationFlow filterFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .filter(payload -> {
            Order order = (Order) payload;
            // 금액이 1000 이상인 주문만 처리
            return order.getAmount().compareTo(new BigDecimal("1000")) >= 0;
        }, f -> f.discardChannel("smallOrdersChannel"))
        .channel("largeOrdersChannel")
        .get();
}

라우터(Routers)

조건에 따라 메시지를 다른 채널로 라우팅하는 엔드포인트입니다.

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .<Order, String>route(order -> {
            if (order.getType() == OrderType.RETAIL) {
                return "retailChannel";
            } else if (order.getType() == OrderType.WHOLESALE) {
                return "wholesaleChannel";
            } else {
                return "otherOrdersChannel";
            }
        }, mapping -> mapping
            .subFlowMapping("retailChannel", sf -> sf
                .handle("retailOrderProcessor", "process")
                .channel("processedRetailChannel"))
            .subFlowMapping("wholesaleChannel", sf -> sf
                .handle("wholesaleOrderProcessor", "process")
                .channel("processedWholesaleChannel"))
            .defaultSubFlowMapping(sf -> sf
                .handle("defaultOrderProcessor", "process")
                .channel("processedOtherChannel")))
        .get();
}

스플리터(Splitters)

하나의 메시지를 여러 개의 메시지로 분할하는 엔드포인트입니다.

@Bean
public IntegrationFlow splitterFlow() {
    return IntegrationFlows
        .from("batchOrdersChannel")
        .split(orderBatch -> {
            // 주문 배치를 개별 주문으로 분할
            List<Order> orders = ((OrderBatch) orderBatch).getOrders();
            return orders;
        })
        .channel("individualOrdersChannel")
        .get();
}

애그리게이터(Aggregators)

여러 개의 메시지를 하나의 메시지로 결합하는 엔드포인트입니다.

@Bean
public IntegrationFlow aggregatorFlow() {
    return IntegrationFlows
        .from("individualResultsChannel")
        .<ProcessingResult, String>aggregate(aggregator -> aggregator
            .correlationStrategy(message -> {
                // 주문 ID로 상관관계 설정
                ProcessingResult result = (ProcessingResult) message.getPayload();
                return result.getOrderId();
            })
            .releaseStrategy(group -> {
                // 모든 아이템이 처리되면 릴리스
                return group.size() == group.getSequenceSize();
            })
            .expireGroupsUponCompletion(true)
            .groupTimeout(30000) // 30초 타임아웃
            .sendPartialResultOnExpiry(true)
            .outputProcessor(group -> {
                // 결과 합치기
                List<ProcessingResult> results = group.getMessages()
                    .stream()
                    .map(message -> (ProcessingResult) message.getPayload())
                    .collect(Collectors.toList());
                
                return new AggregatedResult(group.getGroupId(), results);
            })
        )
        .channel("aggregatedResultsChannel")
        .get();
}

게이트웨이(Gateways)

애플리케이션 코드에서 메시징 시스템에 접근할 수 있는 인터페이스를 제공합니다.

@MessagingGateway
public interface OrderService {
    @Gateway(requestChannel = "ordersChannel", replyChannel = "orderResultsChannel")
    OrderConfirmation submitOrder(Order order);
    
    @Gateway(requestChannel = "orderStatusRequestChannel", replyChannel = "orderStatusChannel")
    OrderStatus checkOrderStatus(String orderId);
}

사용 예:

@Service
public class OrderManagementService {
    
    private final OrderService orderService;
    
    @Autowired
    public OrderManagementService(OrderService orderService) {
        this.orderService = orderService;
    }
    
    public OrderConfirmation processNewOrder(Order order) {
        // 메시징 시스템을 통해 주문 처리
        return orderService.submitOrder(order);
    }
    
    public OrderStatus getOrderStatus(String orderId) {
        return orderService.checkOrderStatus(orderId);
    }
}

브릿지(Bridges)

서로 다른 채널 간에 메시지를 전달하는 간단한 엔드포인트입니다.

@Bean
public IntegrationFlow bridgeFlow() {
    return IntegrationFlows
        .from("inputChannel")
        .bridge(e -> e.poller(Pollers.fixedRate(1000)))
        .channel("outputChannel")
        .get();
}

채널 인터셉터(Channel Interceptors)

채널을 통과하는 메시지를 가로채어 추가 처리를 수행할 수 있습니다.

@Bean
public MessageChannel auditedChannel() {
    DirectChannel channel = new DirectChannel();
    channel.addInterceptor(loggingInterceptor());
    return channel;
}

@Bean
public ChannelInterceptor loggingInterceptor() {
    return new ChannelInterceptorAdapter() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            // 전송 전 로깅
            log.info("Sending message: {}", message);
            return message;
        }
        
        @Override
        public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
            // 전송 후 로깅
            log.info("Message sent: {}, success: {}", message, sent);
        }
        
        @Override
        public void afterSendCompletion(Message<?> message, MessageChannel channel, 
                                       boolean sent, Exception ex) {
            if (ex != null) {
                // 예외 발생 시 로깅
                log.error("Error sending message: {}", message, ex);
            }
        }
    };
}

메시징 템플릿(Messaging Templates)

프로그래매틱하게 메시지를 보내고 받을 수 있는 편리한 방법을 제공합니다.

@Service
public class OrderNotificationService {
    
    private final MessagingTemplate messagingTemplate;
    
    @Autowired
    public OrderNotificationService(MessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }
    
    public void notifyOrderProcessed(Order order) {
        // 메시지 생성
        Message<Order> message = MessageBuilder
            .withPayload(order)
            .setHeader("notificationType", "ORDER_PROCESSED")
            .setHeader("timestamp", System.currentTimeMillis())
            .build();
        
        // 채널로 메시지 전송
        messagingTemplate.send("notificationChannel", message);
    }
    
    public OrderStatus getOrderStatus(String orderId) {
        // 요청 메시지 생성
        Message<String> requestMessage = MessageBuilder
            .withPayload(orderId)
            .build();
        
        // 요청 전송 및 응답 수신
        Message<?> responseMessage = messagingTemplate.sendAndReceive(
            "orderStatusRequestChannel", requestMessage);
        
        return (OrderStatus) responseMessage.getPayload();
    }
}

채널 어댑터와 엔드포인트 종류

파일 시스템

파일 시스템과의 통합을 위한 어댑터를 제공합니다.

@Bean
public IntegrationFlow fileReadingFlow() {
    return IntegrationFlows
        .from(Files.inboundAdapter(new File("/path/to/input"))
                .patternFilter("*.csv")
                .preventDuplicates(true),
              e -> e.poller(Pollers.fixedRate(5000)))
        .transform(Files.toStringTransformer())
        .transform(new CsvToOrderTransformer())
        .channel("newOrdersChannel")
        .get();
}

@Bean
public IntegrationFlow fileWritingFlow() {
    return IntegrationFlows
        .from("processedOrdersChannel")
        .transform(new OrderToCsvTransformer())
        .handle(Files.outboundAdapter(new File("/path/to/output"))
                .fileNameGenerator(message -> {
                    Order order = (Order) message.getPayload();
                    return "order_" + order.getId() + ".csv";
                })
                .autoCreateDirectory(true))
        .get();
}

JMS

JMS 메시징 시스템과의 통합을 위한 어댑터를 제공합니다.

@Bean
public IntegrationFlow jmsInboundFlow() {
    return IntegrationFlows
        .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                .destination("orderQueue")
                .configureListenerContainer(c -> c.sessionTransacted(true)))
        .transform(new JmsMessageToOrderTransformer())
        .channel("orderProcessingChannel")
        .get();
}

@Bean
public IntegrationFlow jmsOutboundFlow() {
    return IntegrationFlows
        .from("processedOrdersChannel")
        .handle(Jms.outboundAdapter(connectionFactory)
                .destination("processedOrderQueue"))
        .get();
}

AMQP (RabbitMQ)

AMQP 프로토콜을 지원하는 메시징 시스템과의 통합을 위한 어댑터를 제공합니다.

@Bean
public IntegrationFlow amqpInboundFlow() {
    return IntegrationFlows
        .from(Amqp.inboundAdapter(connectionFactory, "orderQueue")
                .configureContainer(c -> c.prefetchCount(10)))
        .transform(new AmqpMessageToOrderTransformer())
        .channel("orderProcessingChannel")
        .get();
}

@Bean
public IntegrationFlow amqpOutboundFlow() {
    return IntegrationFlows
        .from("processedOrdersChannel")
        .handle(Amqp.outboundAdapter(rabbitTemplate)
                .exchangeName("orders")
                .routingKey("processed"))
        .get();
}

HTTP

HTTP 기반 통신을 위한 어댑터를 제공합니다.

@Bean
public IntegrationFlow httpInboundFlow() {
    return IntegrationFlows
        .from(Http.inboundGateway("/orders")
                .requestMapping(r -> r.methods(HttpMethod.POST)
                                     .consumes("application/json"))
                .requestPayloadType(Order.class))
        .channel("orderProcessingChannel")
        .get();
}

@Bean
public IntegrationFlow httpOutboundFlow() {
    return IntegrationFlows
        .from("inventoryCheckChannel")
        .handle(Http.outboundGateway("https://inventory-service/check")
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(InventoryStatus.class))
        .channel("inventoryStatusChannel")
        .get();
}

WebSocket

WebSocket 통신을 위한 어댑터를 제공합니다.

@Bean
public IntegrationFlow webSocketOutboundFlow() {
    return IntegrationFlows
        .from("notificationChannel")
        .transform(new OrderStatusToWebSocketMessageTransformer())
        .handle(new WebSocketHandler())
        .get();
}

public class WebSocketHandler implements MessageHandler {

    private final SimpMessagingTemplate messagingTemplate;
    
    @Autowired
    public WebSocketHandler(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }
    
    @Override
    public void handleMessage(Message<?> message) {
        String payload = (String) message.getPayload();
        String userId = (String) message.getHeaders().get("userId");
        
        // WebSocket으로 메시지 전송
        messagingTemplate.convertAndSendToUser(
            userId, "/queue/notifications", payload);
    }
}

Last updated