Kafka
์คํ๋ง ์นดํ์นด(Spring Kafka)๋ ์คํ๋ง ํ๋ ์์ํฌ์ Apache Kafka๋ฅผ ํตํฉํ์ฌ ๋ฉ์์ง ๊ธฐ๋ฐ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฝ๊ฒ ๊ฐ๋ฐํ ์ ์๋๋ก ์ง์ํ๋ ํ๋ก์ ํธ์ ๋๋ค. Apache Kafka๋ ๋์ฉ๋์ ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ์ ์ฒ๋ฆฌํ๊ธฐ ์ํ ๋ถ์ฐํ ๋ฉ์์ง ์์คํ ์ผ๋ก, ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ๊ณผ ์คํธ๋ฆฌ๋ฐ ์ ํ๋ฆฌ์ผ์ด์ ๊ตฌ์ถ์ ๋๋ฆฌ ์ฌ์ฉ๋ฉ๋๋ค.
๊ฐํธํ ์ค์ ๊ณผ ๊ตฌ์ฑ: ์คํ๋ง์ ์ค์ ๋ฐฉ์์ ํ์ฉํ์ฌ Kafka ํ๋ก๋์์ ์ปจ์๋จธ๋ฅผ ์์ฝ๊ฒ ๊ตฌ์ฑํ ์ ์์ต๋๋ค. YAML ๋๋ ํ๋กํผํฐ ํ์ผ์ ํตํด ํ์ํ ์ค์ ์ ์ง๊ด์ ์ผ๋ก ์ ์ฉํ ์ ์์ต๋๋ค.
KafkaTemplate ์ ๊ณต:
KafkaTemplate
ํด๋์ค๋ฅผ ํตํด ๋ฉ์์ง ๋ฐํ์ ๊ฐ์ํํ์ฌ ์์ฐ์ ์ฝ๋๋ฅผ ๋จ์ํํ ์ ์์ต๋๋ค.@KafkaListener ์ง์: ์ด๋ ธํ ์ด์ ๊ธฐ๋ฐ์ ๋ฆฌ์ค๋๋ฅผ ์ฌ์ฉํ์ฌ ์ปจ์๋จธ๋ฅผ ๊ตฌํํ๊ณ , ํน์ ํ ํฝ์ ๋ฉ์์ง๋ฅผ ๋น๋๊ธฐ๋ก ์์ ํ ์ ์์ต๋๋ค.
ํธ๋์ญ์ ๊ด๋ฆฌ: Kafka์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฐ์ ํธ๋์ญ์ ์ ํตํฉํ์ฌ ๋ฐ์ดํฐ ์ผ๊ด์ฑ์ ๋ณด์ฅํ๊ณ , ๋ฉ์์ง ์ฒ๋ฆฌ์ ์์์ฑ์ ์ ์งํ ์ ์์ต๋๋ค.
์๋ฌ ์ฒ๋ฆฌ ๋ฐ ์ฌ์๋ ๋ฉ์ปค๋์ฆ: ๋ฉ์์ง ์ฒ๋ฆฌ ์ค ๋ฐ์ํ๋ ์์ธ ์ํฉ์ ๋ํ ์ฒด๊ณ์ ์ธ ์๋ฌ ์ฒ๋ฆฌ์ ์ฌ์๋ ๋ก์ง์ ์ ๊ณตํ์ฌ ์์ ์ฑ์ ๋์ ๋๋ค.
๋ชจ๋ํฐ๋ง๊ณผ ๊ด๋ฆฌ: ์คํ๋ง์ Actuator์ ํตํฉํ์ฌ Kafka ๊ด๋ จ ๋ฉํธ๋ฆญ์ ๋ชจ๋ํฐ๋งํ๊ณ ๊ด๋ฆฌํ ์ ์์ต๋๋ค.
ํ๋ก๋์ ์ค์
@EnableKafka
@Configuration
class KafkaConfiguration {
@Bean
fun kafkaTemplate():KafkaTemplate<String, Any> = KafkaTemplate(kafkaProperties())
@Bean
fun kafkaProperties(): DefaultKafkaProducerFactory<String, Any> = DefaultKafkaProducerFactory<String,Any>(mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "{IP}",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ProducerConfig.ACKS_CONFIG to "all",
ConsumerConfig.GROUP_ID_CONFIG to "kafka-group"
))
}
์ปจ์๋จธ ์ค์
@EnableKafka
@Configuration
class KafkaConsumerConfiguration {
@Bean
fun kafkaConsumerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> =
ConcurrentKafkaListenerContainerFactory<String, Any>()
.also {
it.setConcurrency(10)
it.consumerFactory = consumerFactory()
it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
it.containerProperties.listenerTaskExecutor = executor()
}
@Bean
fun consumerFactory(): ConsumerFactory<String, Any> = DefaultKafkaConsumerFactory(consumerConfig())
@Bean
fun executor(): ThreadPoolTaskExecutor = ThreadPoolTaskExecutor()
.also {
it.corePoolSize = 10
it.maxPoolSize = 200
it.queueCapacity = 250
it.setThreadFactory(CustomizableThreadFactory("kafka-thread")) // ์ด๋ฆ prefix
}
@Bean
fun consumerConfig() = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "{IP}",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.GROUP_ID_CONFIG to "kafka-group",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "1000"
)
}
Last updated