개요
Spring Integration은 엔터프라이즈 통합 패턴(Enterprise Integration Patterns)을 구현한 Spring 프레임워크의 확장 모듈입니다. 이 모듈은 Gregor Hohpe와 Bobby Woolf의 "Enterprise Integration Patterns" 책에서 설명된 패턴들을 Java 개발자들이 쉽게 적용할 수 있도록 합니다.
Spring Integration의 주요 목표:
시스템 간의 느슨한 결합(loose coupling) 제공
비동기 메시징 지원
다양한 전송 프로토콜 및 엔드포인트 연결
메시지 변환 및 라우팅 단순화
선언적 어댑터를 통한 외부 시스템 통합
핵심 개념
메시지(Message)
메시지는 Spring Integration의 기본 데이터 전송 단위입니다.
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
메시지는 두 가지 주요 부분으로 구성됩니다:
페이로드(Payload): 실제 전송되는 데이터
헤더(Headers): 메타데이터와 라우팅 정보를 담고 있는 이름-값 쌍의 컬렉션
메시지 생성은 MessageBuilder 클래스를 사용하여 수행할 수 있습니다:
Message<String> message = MessageBuilder.withPayload("주문 접수됨")
.setHeader("orderId", "12345")
.setHeader("timestamp", System.currentTimeMillis())
.build();
메시지 채널(Message Channel)
메시지 채널은 메시지 생산자와 소비자 사이의 파이프라인 역할을 합니다.
public interface MessageChannel {
boolean send(Message<?> message);
boolean send(Message<?> message, long timeout);
}
Spring Integration에서 제공하는 주요 채널 유형:
DirectChannel
가장 기본적인 채널 유형으로, 동기식 포인트-투-포인트 메시징을 지원합니다:
@Bean
public DirectChannel orderProcessingChannel() {
return new DirectChannel();
}
특징:
메시지를 버퍼링하지 않고 즉시 핸들러에게 전달
발신자의 스레드에서 실행
단일 메시지 핸들러를 호출 (여러 핸들러가 있을 경우 로드 밸런싱)
트랜잭션 컨텍스트 전파 지원
QueueChannel
메시지를 내부 큐에 저장하는 채널로, 비동기 통신에 적합합니다:
@Bean
public QueueChannel orderBacklogChannel() {
return new QueueChannel(100); // 용량 지정 가능
}
특징:
내부 메시지 큐를 사용하여 메시지 버퍼링
발신자와 수신자 간의 완전한 결합 해제
수신자는 명시적으로 메시지를 폴링해야 함
선입선출(FIFO) 방식의 메시지 전달
PublishSubscribeChannel
메시지를 여러 구독자에게 브로드캐스트하는 채널:
@Bean
public PublishSubscribeChannel orderEventChannel() {
return new PublishSubscribeChannel();
}
특징:
등록된 모든 구독자에게 메시지 전달
기본적으로 발신자의 스레드에서 동기적으로 실행
이벤트 기반 시스템에서 상태 변경 알림에 유용
메시지 엔드포인트(Message Endpoint)
메시지 엔드포인트는 메시지를 실제로 처리하는 컴포넌트입니다.
서비스 액티베이터(Service Activator)
메시지를 수신하여 비즈니스 로직을 실행합니다:
@ServiceActivator(inputChannel = "orderChannel")
public Order processOrder(Order order) {
// 주문 처리 로직
order.setStatus(OrderStatus.PROCESSING);
return order;
}
트랜스포머(Transformer)
메시지 페이로드를 변환합니다:
@Transformer(inputChannel = "newOrderChannel", outputChannel = "enrichedOrderChannel")
public EnrichedOrder enrichOrder(Order order) {
// 주문 데이터 보강 로직
return new EnrichedOrder(order);
}
필터(Filter)
특정 조건에 따라 메시지를 필터링합니다:
@Filter(inputChannel = "allOrdersChannel", outputChannel = "highValueOrdersChannel")
public boolean isHighValueOrder(Order order) {
return order.getTotalAmount().compareTo(new BigDecimal("10000")) > 0;
}
라우터(Router)
메시지를 특정 조건에 따라 다른 채널로 라우팅합니다:
@Router(inputChannel = "newOrdersChannel")
public String routeByOrderType(Order order) {
switch (order.getType()) {
case RETAIL: return "retailOrdersChannel";
case WHOLESALE: return "wholesaleOrdersChannel";
default: return "defaultOrdersChannel";
}
}
스플리터(Splitter)
하나의 메시지를 여러 개로 분할합니다:
@Splitter(inputChannel = "bulkOrderChannel", outputChannel = "orderItemsChannel")
public List<OrderItem> splitOrderIntoItems(Order order) {
return order.getItems();
}
어그리게이터(Aggregator)
여러 개의 관련 메시지를 하나로 결합합니다:
@Aggregator(inputChannel = "processedItemsChannel", outputChannel = "shipmentsChannel")
public Shipment createShipment(List<ProcessedItem> items) {
return new Shipment(items);
}
@CorrelationStrategy
public Object correlateByOrderId(ProcessedItem item) {
return item.getOrderId();
}
@ReleaseStrategy
public boolean isComplete(List<Message<?>> messages) {
if (messages.isEmpty()) return false;
Message<?> first = messages.get(0);
Integer expectedItems = first.getHeaders().get("expectedItemCount", Integer.class);
return expectedItems != null && messages.size() >= expectedItems;
}
채널 어댑터와 게이트웨이
채널 어댑터(Channel Adapter)
채널 어댑터는 메시지 채널과 외부 시스템 사이의 단방향 통신을 제공합니다:
인바운드 채널 어댑터(Inbound Channel Adapter): 외부 시스템에서 메시지를 수신하여 채널로 전송
아웃바운드 채널 어댑터(Outbound Channel Adapter): 채널의 메시지를 외부 시스템으로 전송
예: 파일 시스템 어댑터
@Bean
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File("/path/to/input"));
source.setFilter(new SimplePatternFileListFilter("*.csv"));
return source;
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(),
c -> c.poller(Pollers.fixedDelay(5000)))
.channel("fileContentChannel")
.get();
}
메시징 게이트웨이(Messaging Gateway)
게이트웨이는 양방향 통신(요청-응답)을 위한 인터페이스를 제공합니다:
@MessagingGateway(defaultRequestChannel = "orderProcessingChannel",
defaultReplyChannel = "orderConfirmationChannel")
public interface OrderService {
OrderConfirmation processOrder(Order order);
@Gateway(requestChannel = "priorityOrdersChannel")
OrderConfirmation processHighPriorityOrder(Order order);
@Async
Future<OrderConfirmation> processOrderAsync(Order order);
}
Java DSL을 사용한 통합 플로우 구성
Spring Integration 5.0부터는 Java DSL을 사용하여 통합 플로우를 정의할 수 있습니다:
@Configuration
@EnableIntegration
public class OrderProcessingIntegrationConfig {
@Bean
public IntegrationFlow orderProcessingFlow() {
return IntegrationFlows
.from("inputChannel")
.filter(this::isValidOrder)
.<Order, OrderType>route(Order::getType,
mapping -> mapping
.subFlowMapping(OrderType.RETAIL, sf -> sf
.channel("retailOrdersChannel")
.handle(orderService(), "processRetailOrder"))
.subFlowMapping(OrderType.WHOLESALE, sf -> sf
.channel("wholesaleOrdersChannel")
.handle(orderService(), "processWholesaleOrder")))
.get();
}
private boolean isValidOrder(Order order) {
return order != null && !order.getItems().isEmpty();
}
}
Last updated