Kafka

์Šคํ”„๋ง ์นดํ”„์นด(Spring Kafka)๋Š” ์Šคํ”„๋ง ํ”„๋ ˆ์ž„์›Œํฌ์™€ Apache Kafka๋ฅผ ํ†ตํ•ฉํ•˜์—ฌ ๋ฉ”์‹œ์ง€ ๊ธฐ๋ฐ˜ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‰ฝ๊ฒŒ ๊ฐœ๋ฐœํ•  ์ˆ˜ ์žˆ๋„๋ก ์ง€์›ํ•˜๋Š” ํ”„๋กœ์ ํŠธ์ž…๋‹ˆ๋‹ค. Apache Kafka๋Š” ๋Œ€์šฉ๋Ÿ‰์˜ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๋ถ„์‚ฐํ˜• ๋ฉ”์‹œ์ง• ์‹œ์Šคํ…œ์œผ๋กœ, ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ๊ณผ ์ŠคํŠธ๋ฆฌ๋ฐ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ตฌ์ถ•์— ๋„๋ฆฌ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.

  1. ๊ฐ„ํŽธํ•œ ์„ค์ •๊ณผ ๊ตฌ์„ฑ: ์Šคํ”„๋ง์˜ ์„ค์ • ๋ฐฉ์‹์„ ํ™œ์šฉํ•˜์—ฌ Kafka ํ”„๋กœ๋“€์„œ์™€ ์ปจ์Šˆ๋จธ๋ฅผ ์†์‰ฝ๊ฒŒ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. YAML ๋˜๋Š” ํ”„๋กœํผํ‹ฐ ํŒŒ์ผ์„ ํ†ตํ•ด ํ•„์š”ํ•œ ์„ค์ •์„ ์ง๊ด€์ ์œผ๋กœ ์ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  2. KafkaTemplate ์ œ๊ณต: KafkaTemplate ํด๋ž˜์Šค๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰์„ ๊ฐ„์†Œํ™”ํ•˜์—ฌ ์ƒ์‚ฐ์ž ์ฝ”๋“œ๋ฅผ ๋‹จ์ˆœํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  3. @KafkaListener ์ง€์›: ์–ด๋…ธํ…Œ์ด์…˜ ๊ธฐ๋ฐ˜์˜ ๋ฆฌ์Šค๋„ˆ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ปจ์Šˆ๋จธ๋ฅผ ๊ตฌํ˜„ํ•˜๊ณ , ํŠน์ • ํ† ํ”ฝ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ๋น„๋™๊ธฐ๋กœ ์ˆ˜์‹ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  4. ํŠธ๋žœ์žญ์…˜ ๊ด€๋ฆฌ: Kafka์™€ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๊ฐ„์˜ ํŠธ๋žœ์žญ์…˜์„ ํ†ตํ•ฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ผ๊ด€์„ฑ์„ ๋ณด์žฅํ•˜๊ณ , ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ์˜ ์›์ž์„ฑ์„ ์œ ์ง€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  5. ์—๋Ÿฌ ์ฒ˜๋ฆฌ ๋ฐ ์žฌ์‹œ๋„ ๋ฉ”์ปค๋‹ˆ์ฆ˜: ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘ ๋ฐœ์ƒํ•˜๋Š” ์˜ˆ์™ธ ์ƒํ™ฉ์— ๋Œ€ํ•œ ์ฒด๊ณ„์ ์ธ ์—๋Ÿฌ ์ฒ˜๋ฆฌ์™€ ์žฌ์‹œ๋„ ๋กœ์ง์„ ์ œ๊ณตํ•˜์—ฌ ์•ˆ์ •์„ฑ์„ ๋†’์ž…๋‹ˆ๋‹ค.

  6. ๋ชจ๋‹ˆํ„ฐ๋ง๊ณผ ๊ด€๋ฆฌ: ์Šคํ”„๋ง์˜ Actuator์™€ ํ†ตํ•ฉํ•˜์—ฌ Kafka ๊ด€๋ จ ๋ฉ”ํŠธ๋ฆญ์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๊ณ  ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํ”„๋กœ๋“€์„œ ์„ค์ •

@EnableKafka
@Configuration
class KafkaConfiguration {

    @Bean
    fun kafkaTemplate():KafkaTemplate<String, Any> = KafkaTemplate(kafkaProperties())

    @Bean
    fun kafkaProperties(): DefaultKafkaProducerFactory<String, Any> = DefaultKafkaProducerFactory<String,Any>(mapOf(
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "{IP}",
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
        ProducerConfig.ACKS_CONFIG to "all",
        ConsumerConfig.GROUP_ID_CONFIG to "kafka-group"
        ))
}

์ปจ์Šˆ๋จธ ์„ค์ •

@EnableKafka
@Configuration
class KafkaConsumerConfiguration {

    @Bean
    fun kafkaConsumerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> =
        ConcurrentKafkaListenerContainerFactory<String, Any>()
            .also {
                it.setConcurrency(10)
                it.consumerFactory = consumerFactory()
                it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
                it.containerProperties.listenerTaskExecutor = executor()
            }

    @Bean
    fun consumerFactory(): ConsumerFactory<String, Any> = DefaultKafkaConsumerFactory(consumerConfig())

    @Bean
    fun executor(): ThreadPoolTaskExecutor = ThreadPoolTaskExecutor()
        .also {
            it.corePoolSize = 10
            it.maxPoolSize = 200
            it.queueCapacity = 250
            it.setThreadFactory(CustomizableThreadFactory("kafka-thread")) // ์ด๋ฆ„ prefix
        }


    @Bean
    fun consumerConfig() = mapOf(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "{IP}",
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
        ConsumerConfig.GROUP_ID_CONFIG to "kafka-group",
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
        ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "1000"
    )
}

Last updated