메시지 변환과 라우팅
기업 애플리케이션 통합 과정에서 가장 중요한 두 가지 작업은 메시지 변환(Transformation)과 라우팅(Routing)입니다. 다양한 시스템 간의 통합에서 데이터 형식을 일치시키고, 각 메시지를 적절한 대상으로 전달하는 것은 복잡한 통합 솔루션의 핵심입니다.
메시지 변환(Transformation)
메시지 변환은 수신된 메시지를 다른 형식이나 구조로 변환하는 과정입니다. 변환의 목적은 다양합니다:
외부 시스템의 데이터 형식을 내부 형식으로 변환(예: XML -> Java 객체)
메시지 페이로드 변환(예: 문자열 -> JSON 객체)
메시지 보강(Enrichment)을 통한 추가 정보 제공
메시지 필터링 또는 내용 수정
기본 트랜스포머 구현
가장 간단한 형태의 트랜스포머는 메시지를 받아 새로운 페이로드로 변환하는 것입니다:
@Bean
public IntegrationFlow simpleTransformFlow() {
return IntegrationFlows
.from("inputChannel")
.transform(payload -> {
String input = (String) payload;
return input.toUpperCase();
})
.channel("outputChannel")
.get();
}
표준 트랜스포머
Spring Integration은 일반적인 변환 작업을 위한 다양한 표준 트랜스포머를 제공합니다:
객체-JSON 변환
@Bean
public IntegrationFlow jsonTransformFlow() {
return IntegrationFlows
.from("orderChannel")
.transform(new ObjectToJsonTransformer())
.channel("jsonOrdersChannel")
.get();
}
@Bean
public IntegrationFlow jsonToObjectFlow() {
return IntegrationFlows
.from("jsonOrdersChannel")
.transform(new JsonToObjectTransformer(Order.class))
.channel("processedOrdersChannel")
.get();
}
XML 변환
@Bean
public IntegrationFlow xmlTransformFlow() {
return IntegrationFlows
.from("orderChannel")
.transform(new ObjectToXmlTransformer(marshaller))
.channel("xmlOrdersChannel")
.get();
}
@Bean
public IntegrationFlow xmlToObjectFlow() {
return IntegrationFlows
.from("xmlOrdersChannel")
.transform(new XmlToObjectTransformer(unmarshaller))
.channel("processedOrdersChannel")
.get();
}
파일 내용 변환
@Bean
public IntegrationFlow fileContentTransformFlow() {
return IntegrationFlows
.from("fileInputChannel")
.transform(Files.toStringTransformer(StandardCharsets.UTF_8))
.channel("fileContentChannel")
.get();
}
어노테이션 기반 트랜스포머
자바 메서드에 @Transformer
어노테이션을 사용하여 트랜스포머를 구현할 수 있습니다:
@Component
public class OrderTransformer {
@Autowired
private CustomerRepository customerRepository;
@Transformer(inputChannel = "rawOrderChannel", outputChannel = "enrichedOrderChannel")
public EnrichedOrder enrichOrder(Order order) {
Customer customer = customerRepository.findById(order.getCustomerId())
.orElseThrow(() -> new RuntimeException("Customer not found"));
return new EnrichedOrder(
order.getId(),
order.getItems(),
order.getTotalAmount(),
customer.getName(),
customer.getEmail(),
customer.getShippingAddress()
);
}
}
메시지 헤더 조작
트랜스포머는 메시지 페이로드뿐만 아니라 헤더도 변경할 수 있습니다:
@Bean
public IntegrationFlow headerEnrichmentFlow() {
return IntegrationFlows
.from("inputChannel")
.enrichHeaders(h -> h
.header("processedTimestamp", new Date())
.headerExpression("uppercasePayload", "payload.toUpperCase()")
.headerFunction("payloadLength",
message -> ((String) message.getPayload()).length()))
.channel("outputChannel")
.get();
}
콘텐츠 보강기(Content Enricher)
메시지를 외부 시스템에서 가져온 추가 데이터로 보강할 수 있습니다:
@Bean
public IntegrationFlow orderEnrichmentFlow() {
return IntegrationFlows
.from("newOrderChannel")
.enrich(e -> e
// 고객 정보 보강
.requestChannel("customerLookupChannel")
.requestPayloadExpression("payload.customerId")
.replyChannel("customerResponseChannel")
.propertyFunction("customer", (order, customer) -> {
Order enrichedOrder = (Order) order;
enrichedOrder.setCustomerDetails((Customer) customer);
return enrichedOrder;
})
// 배송 정보 보강
.requestChannel("shippingInfoChannel")
.requestPayload(Message::getPayload)
.replyChannel("shippingInfoResponseChannel")
.propertyFunction("shippingInfo", (order, shippingInfo) -> {
Order enrichedOrder = (Order) order;
enrichedOrder.setShippingInfo((ShippingInfo) shippingInfo);
return enrichedOrder;
})
// 헤더 추가
.header("enriched", true)
.headerExpression("priority",
"payload.totalAmount > 1000 ? 'HIGH' : 'NORMAL'"))
.channel("enrichedOrderChannel")
.get();
}
분할기(Splitter)와 집계기(Aggregator)
복잡한 메시지를 여러 개의 작은 메시지로 분할하거나, 여러 메시지를 하나로 집계하는 변환 작업도 가능합니다:
// 분할기: 주문을 개별 주문 항목으로 분할
@Bean
public IntegrationFlow orderSplitterFlow() {
return IntegrationFlows
.from("bulkOrderChannel")
.split(Order.class, order -> {
// 각 주문 항목에 대해 별도의 처리 메시지 생성
return order.getItems().stream()
.map(item -> new OrderItemProcessor(
order.getId(),
item,
order.getCustomerId()))
.collect(Collectors.toList());
})
.channel("orderItemsChannel")
.get();
}
// 집계기: 처리된 주문 항목을 원래 주문으로 다시 집계
@Bean
public IntegrationFlow orderAggregatorFlow() {
return IntegrationFlows
.from("processedItemsChannel")
.aggregate(a -> a
.correlationStrategy(message -> {
// 주문 ID로 상관관계 설정
OrderItemProcessor item = (OrderItemProcessor) message.getPayload();
return item.getOrderId();
})
.releaseStrategy(group -> {
// 모든 항목이 처리되었는지 확인
OrderItemProcessor firstItem = (OrderItemProcessor) group.getMessages()
.get(0).getPayload();
String orderId = firstItem.getOrderId();
Order originalOrder = orderRepository.findById(orderId);
int expectedItems = originalOrder.getItems().size();
return group.size() >= expectedItems;
})
.outputProcessor(group -> {
// 처리된 항목을 주문으로 변환
List<Message<?>> messages = group.getMessages();
List<OrderItemProcessor> processedItems = messages.stream()
.map(message -> (OrderItemProcessor) message.getPayload())
.collect(Collectors.toList());
String orderId = processedItems.get(0).getOrderId();
Order originalOrder = orderRepository.findById(orderId);
// 처리 결과를 원래 주문에 반영
processedItems.forEach(item ->
originalOrder.updateItemStatus(item.getItemId(), item.getStatus()));
return originalOrder;
}))
.channel("completedOrdersChannel")
.get();
}
메시지 라우팅(Routing)
라우팅은 메시지를 특정 조건에 따라 적절한 채널로 전달하는 과정입니다. Spring Integration은 다양한 라우팅 전략을 지원합니다:
1. 내용 기반 라우터(Content-based Router)
메시지 페이로드의 내용에 따라 라우팅합니다:
@Bean
public IntegrationFlow orderRoutingFlow() {
return IntegrationFlows
.from("newOrdersChannel")
.<Order, OrderType>route(Order::getType,
mapping -> mapping
.subFlowMapping(OrderType.RETAIL, sf -> sf.channel("retailOrdersChannel"))
.subFlowMapping(OrderType.WHOLESALE, sf -> sf.channel("wholesaleOrdersChannel"))
.subFlowMapping(OrderType.INTERNATIONAL, sf -> sf.channel("internationalOrdersChannel"))
.defaultSubFlowMapping(sf -> sf.channel("defaultOrdersChannel")))
.get();
}
좀 더 복잡한 라우팅 로직을 구현할 수도 있습니다:
@Bean
public IntegrationFlow complexRoutingFlow() {
return IntegrationFlows
.from("ordersChannel")
.<Order, String>route(order -> {
if (order.getTotalAmount().compareTo(new BigDecimal("10000")) > 0) {
return "highValue";
} else if (order.getItems().size() > 10) {
return "bulkOrder";
} else if (order.getCustomerType() == CustomerType.VIP) {
return "vipOrder";
} else {
return "standardOrder";
}
}, mapping -> mapping
.subFlowMapping("highValue", sf -> sf
.enrichHeaders(h -> h.header("priority", "HIGH"))
.channel("highValueOrdersChannel"))
.subFlowMapping("bulkOrder", sf -> sf
.enrichHeaders(h -> h.header("batchProcessing", true))
.channel("bulkOrdersChannel"))
.subFlowMapping("vipOrder", sf -> sf
.enrichHeaders(h -> h.header("expedite", true))
.channel("vipOrdersChannel"))
.defaultSubFlowMapping(sf -> sf
.channel("standardOrdersChannel")))
.get();
}
2. 헤더 값 라우터(Header Value Router)
메시지 헤더의 값을 기준으로 라우팅합니다:
@Bean
public IntegrationFlow headerValueRoutingFlow() {
return IntegrationFlows
.from("incomingChannel")
.<Message<?>, Object>route(message -> message.getHeaders().get("routeKey"),
mapping -> mapping
.channelMapping("A", "channelA")
.channelMapping("B", "channelB")
.channelMapping("C", "channelC")
.defaultOutputChannel("defaultChannel"))
.get();
}
3. 페이로드 타입 라우터(Payload Type Router)
메시지 페이로드의 타입에 따라 라우팅합니다:
@Bean
public IntegrationFlow payloadTypeRoutingFlow() {
return IntegrationFlows
.from("inputChannel")
.route(new PayloadTypeRouter()
.channelMapping(String.class.getName(), "stringChannel")
.channelMapping(Integer.class.getName(), "integerChannel")
.channelMapping(Order.class.getName(), "orderChannel")
.defaultOutputChannel("defaultChannel"))
.get();
}
4. 수신자 목록 라우터(Recipient List Router)
하나의 메시지를 여러 채널로 동시에 라우팅합니다:
@Bean
public IntegrationFlow recipientListRoutingFlow() {
return IntegrationFlows
.from("inputChannel")
.routeToRecipients(r -> r
.recipient("loggingChannel") // 모든 메시지를
.recipient("auditChannel") // 모든 채널로 전송
.recipient("channelA", m -> m.getHeaders().get("toA") != null) // 조건부 라우팅
.recipient("channelB", m -> m.getHeaders().get("toB") != null)
.recipient("channelC", m ->
((String) m.getPayload()).contains("urgent")))
.get();
}
5. 어노테이션 기반 라우터
@Router
어노테이션을 사용하여 라우터를 구현할 수 있습니다:
@Component
public class OrderRouter {
@Router(inputChannel = "newOrdersChannel")
public String routeOrder(Order order) {
switch (order.getRegion()) {
case "NORTH_AMERICA":
return "naOrdersChannel";
case "EUROPE":
return "euOrdersChannel";
case "ASIA_PACIFIC":
return "apacOrdersChannel";
default:
return "internationalOrdersChannel";
}
}
// 또는 여러 채널을 반환
@Router(inputChannel = "criticalOrdersChannel")
public List<String> routeCriticalOrder(Order order) {
List<String> channels = new ArrayList<>();
// 모든 중요 주문은 로깅 및 알림
channels.add("loggingChannel");
channels.add("notificationChannel");
// 금액에 따라 추가 채널 선택
if (order.getTotalAmount().compareTo(new BigDecimal("50000")) > 0) {
channels.add("managerApprovalChannel");
channels.add("financeNotificationChannel");
}
return channels;
}
}
고급 메시지 변환 패턴
실제 기업 환경에서는 더 복잡한 변환 및 라우팅 패턴이 필요합니다. 몇 가지 고급 패턴을 살펴보겠습니다:
1. 정규화기(Normalizer)
다양한 형식의 입력을 표준 형식으로 변환합니다:
@Bean
public IntegrationFlow orderNormalizerFlow() {
return IntegrationFlows
.from("multiFormatOrdersChannel")
.route(Message.class, message -> {
Object payload = message.getPayload();
if (payload instanceof String) {
// 문자열 페이로드 검사
String content = (String) payload;
if (content.startsWith("<")) {
return "xmlOrdersChannel";
} else if (content.startsWith("{")) {
return "jsonOrdersChannel";
} else if (content.contains(",")) {
return "csvOrdersChannel";
}
} else if (payload instanceof byte[]) {
return "binaryOrdersChannel";
} else if (payload instanceof Order) {
return "standardOrdersChannel";
}
return "unknownFormatChannel";
})
// XML 형식 처리
.subFlowMapping("xmlOrdersChannel", sf -> sf
.transform(new XmlToObjectTransformer(orderUnmarshaller))
.channel("standardOrdersChannel"))
// JSON 형식 처리
.subFlowMapping("jsonOrdersChannel", sf -> sf
.transform(new JsonToObjectTransformer(Order.class))
.channel("standardOrdersChannel"))
// CSV 형식 처리
.subFlowMapping("csvOrdersChannel", sf -> sf
.transform(this::convertCsvToOrder)
.channel("standardOrdersChannel"))
// 바이너리 형식 처리
.subFlowMapping("binaryOrdersChannel", sf -> sf
.transform(this::deserializeBinaryOrder)
.channel("standardOrdersChannel"))
// 알 수 없는 형식 처리
.subFlowMapping("unknownFormatChannel", sf -> sf
.handle(message -> {
log.error("Unknown order format: {}", message.getPayload());
throw new IllegalArgumentException("Unsupported order format");
}))
.get();
}
2. 클레임 체크 트랜스포머(Claim Check Transformer)
큰 페이로드를 일시적으로 저장소에 보관하고 참조만 전달합니다:
@Bean
public IntegrationFlow claimCheckFlow() {
return IntegrationFlows
.from("largePayloadChannel")
// 체크인: 페이로드 저장 및 ID 반환
.claimCheckIn(messageStore())
// 저장된 메시지 ID만 가지고 경량 처리
.handle((payload, headers) -> {
String messageId = (String) payload;
log.info("처리 중인 메시지 ID: {}", messageId);
// 가벼운 처리 로직
return messageId;
})
// 체크아웃: ID로 원본 페이로드 복원
.claimCheckOut(messageStore())
.channel("processedChannel")
.get();
}
@Bean
public MessageStore messageStore() {
return new SimpleMessageStore(100); // 최대 100개 메시지 저장
}
통합 주문 처리 파이프라인
통합적인 주문 처리 시스템의 변환 및 라우팅 로직을 구현한 예제입니다:
@Configuration
@EnableIntegration
public class OrderProcessingIntegrationConfig {
@Autowired
private CustomerRepository customerRepository;
@Autowired
private ProductRepository productRepository;
@Autowired
private InventoryService inventoryService;
@Autowired
private PricingService pricingService;
// 1. 주문 입력 및 정규화
@Bean
public IntegrationFlow orderInputFlow() {
return IntegrationFlows
.from("rawOrdersChannel")
// 형식에 따라 라우팅
.<Object, String>route(payload -> {
if (payload instanceof WebOrderDTO) {
return "webOrder";
} else if (payload instanceof MobileOrderDTO) {
return "mobileOrder";
} else if (payload instanceof String) {
String content = (String) payload;
if (content.startsWith("<")) return "xmlOrder";
if (content.startsWith("{")) return "jsonOrder";
}
return "unknownOrder";
}, mapping -> mapping
.subFlowMapping("webOrder", sf -> sf
.transform(this::convertWebOrder))
.subFlowMapping("mobileOrder", sf -> sf
.transform(this::convertMobileOrder))
.subFlowMapping("xmlOrder", sf -> sf
.transform(new XmlToObjectTransformer(orderUnmarshaller)))
.subFlowMapping("jsonOrder", sf -> sf
.transform(new JsonToObjectTransformer(PartnerOrderDTO.class))
.transform(this::convertPartnerOrder)))
.channel("normalizedOrderChannel")
.get();
}
// 2. 주문 보강 및 검증
@Bean
public IntegrationFlow orderEnrichmentFlow() {
return IntegrationFlows
.from("normalizedOrderChannel")
// 고객 정보 보강
.enrich(e -> e
.requestChannel("customerLookupChannel")
.requestPayloadExpression("payload.customerId")
.replyChannel("customerResponseChannel")
.propertyFunction("customer", (order, customer) -> {
Order enrichedOrder = (Order) order;
enrichedOrder.setCustomer((Customer) customer);
return enrichedOrder;
}))
// 제품 정보 보강
.enrich(e -> e
.requestChannel("productLookupChannel")
.requestPayload(message -> {
Order order = (Order) message.getPayload();
return order.getItems().stream()
.map(OrderItem::getProductId)
.collect(Collectors.toList());
})
.replyChannel("productResponseChannel")
.propertyFunction("products", (order, products) -> {
Order enrichedOrder = (Order) order;
Map<String, Product> productMap = ((List<Product>) products).stream()
.collect(Collectors.toMap(Product::getId, p -> p));
// 제품 정보로 주문 항목 보강
enrichedOrder.getItems().forEach(item -> {
Product product = productMap.get(item.getProductId());
item.setProductName(product.getName());
item.setUnitPrice(product.getPrice());
});
return enrichedOrder;
}))
// 주문 유효성 검사
.filter(this::validateOrder, f -> f.discardChannel("invalidOrdersChannel"))
.channel("validatedOrderChannel")
.get();
}
// 3. 주문 라우팅 및 처리
@Bean
public IntegrationFlow orderRoutingFlow() {
return IntegrationFlows
.from("validatedOrderChannel")
.<Order, String>route(order -> {
// 재고 확인
boolean allItemsInStock = order.getItems().stream()
.allMatch(item -> inventoryService.checkAvailability(
item.getProductId(), item.getQuantity()));
if (!allItemsInStock) {
return "backorderFlow";
}
// 주문 금액에 따른 처리
if (order.getTotalAmount().compareTo(new BigDecimal("10000")) > 0) {
return "highValueFlow";
}
// 고객 유형에 따른 처리
if (order.getCustomer().getType() == CustomerType.VIP) {
return "vipFlow";
}
// 국제 주문 처리
if (!order.getShippingAddress().getCountry().equals("KR")) {
return "internationalFlow";
}
// 표준 주문 처리
return "standardFlow";
}, mapping -> mapping
.subFlowMapping("backorderFlow", sf -> sf
.channel("backorderChannel"))
.subFlowMapping("highValueFlow", sf -> sf
.enrichHeaders(h -> h.header("requiresApproval", true))
.channel("approvalRequiredChannel"))
.subFlowMapping("vipFlow", sf -> sf
.enrichHeaders(h -> h.header("expediteDelivery", true))
.channel("vipOrderChannel"))
.subFlowMapping("internationalFlow", sf -> sf
.channel("internationalOrderChannel"))
.defaultSubFlowMapping(sf -> sf
.channel("standardOrderChannel")))
.get();
}
// 고객 조회 서브플로우 구현
@Bean
public IntegrationFlow customerLookupFlow() {
return IntegrationFlows
.from("customerLookupChannel")
.handle(message -> {
String customerId = (String) message.getPayload();
return customerRepository.findById(customerId)
.orElseThrow(() -> new IllegalArgumentException("Customer not found: " + customerId));
})
.channel("customerResponseChannel")
.get();
}
}
Spring Integration의 변환 및 라우팅 기능을 활용하면 복잡한 기업 통합 시나리오를 선언적이고 유연한 방식으로 구현할 수 있습니다. 다양한 시스템에서 오는 메시지를 표준화하고, 메시지의 특성에 따라 적절한 처리 경로로 라우팅함으로써 확장 가능하고 유지보수가 용이한 통합 솔루션을 개발할 수 있습니다.
Last updated