Multi Thread Consumer

Kafka์—์„œ ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ์ปจ์Šˆ๋จธ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์€ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด ๋งค์šฐ ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค. Kafka๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ๊ฐ ์ปจ์Šˆ๋จธ๊ฐ€ ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋˜๋„๋ก ์„ค๊ณ„๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์—, ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์„ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•˜๋ ค๋ฉด ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ๋ฐฉ์‹์„ ์ ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

ํ•˜์ง€๋งŒ Kafka๋Š” ์Šค๋ ˆ๋“œ ์•ˆ์ „์„ฑ์„ ์ œ๊ณตํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์—, ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ๋ฅผ ์ ์šฉํ•  ๋•Œ ์ฃผ์˜ํ•  ์‚ฌํ•ญ๊ณผ ํŠน์ • ํŒจํ„ด์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ์ปจ์Šˆ๋จธ์—์„œ ๊ณ ๋ คํ•ด์•ผ ํ•  ์‚ฌํ•ญ

Kafka์—์„œ ์ปจ์Šˆ๋จธ๋Š” ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ, ๊ฐ ์Šค๋ ˆ๋“œ์—์„œ ๋…๋ฆฝ๋œ KafkaConsumer ์ธ์Šคํ„ด์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ, ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ๋ฅผ ๊ตฌํ˜„ํ•  ๋•Œ๋Š” ์ผ๋ฐ˜์ ์œผ๋กœ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋ฐฉ์‹์ด ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค:

  1. ์ปจ์Šˆ๋จธ ์Šค๋ ˆ๋“œ๋ฅผ ์—ฌ๋Ÿฌ ๊ฐœ ์ƒ์„ฑํ•˜์—ฌ ๊ฐ ์Šค๋ ˆ๋“œ์—์„œ ๋…๋ฆฝ์ ์œผ๋กœ KafkaConsumer๋ฅผ ์‹คํ–‰.

  2. ํ•˜๋‚˜์˜ ์ปจ์Šˆ๋จธ ์Šค๋ ˆ๋“œ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์›Œ์ปค ์Šค๋ ˆ๋“œ๊ฐ€ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹.

๋‹ค์ค‘ ์ปจ์Šˆ๋จธ ์Šค๋ ˆ๋“œ (Multiple Consumer Threads)

์ด ๋ฐฉ์‹์€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์Šค๋ ˆ๋“œ์—์„œ ๋…๋ฆฝ์ ์ธ Kafka ์ปจ์Šˆ๋จธ๋ฅผ ๊ฐ๊ฐ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ์‹์ž…๋‹ˆ๋‹ค. ๊ฐ ์ปจ์Šˆ๋จธ๋Š” ์ž์‹ ์—๊ฒŒ ํ• ๋‹น๋œ ํŒŒํ‹ฐ์…˜์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ต๋‹ˆ๋‹ค. Kafka๋Š” ํŒŒํ‹ฐ์…˜ ๋‹น ํ•˜๋‚˜์˜ ์ปจ์Šˆ๋จธ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๋„๋ก ์„ค๊ณ„๋˜์–ด ์žˆ๊ธฐ ๋•Œ๋ฌธ์—, ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน ๋‚ด์—์„œ ๊ฐ ํŒŒํ‹ฐ์…˜์ด ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์— ํ• ๋‹น๋ฉ๋‹ˆ๋‹ค.

class ConsumerRunnable(val topic: String, val consumerProps: Properties) : Runnable {
    override fun run() {
        val consumer = KafkaConsumer<String, String>(consumerProps)
        consumer.subscribe(listOf(topic))

        try {
            while (true) {
                val records = consumer.poll(Duration.ofMillis(100))
                records.forEach { record ->
                    println("Consumed message: ${record.value()} from partition ${record.partition()}")
                }
            }
        } catch (e: WakeupException) {
            println("Consumer is waking up...")
        } finally {
            consumer.close()
        }
    }
}

fun main() {
    val consumerProps = Properties().apply {
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group")
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
    }

    // ์Šค๋ ˆ๋“œ ํ’€ ์ƒ์„ฑ
    val executor = Executors.newFixedThreadPool(3)
    
    for (i in 1..3) {
        // ๊ฐ ์Šค๋ ˆ๋“œ์— ๋…๋ฆฝ์ ์ธ KafkaConsumer ์ธ์Šคํ„ด์Šค ํ• ๋‹น
        executor.submit(ConsumerRunnable("my-topic", consumerProps))
    }

    // ์ผ์ • ์‹œ๊ฐ„ ํ›„์— ์ปจ์Šˆ๋จธ๋ฅผ ์ข…๋ฃŒํ•˜๋Š” ๋กœ์ง ์ถ”๊ฐ€ ๊ฐ€๋Šฅ
}

์žฅ์ 

๋…๋ฆฝ์ ์ธ ์ปจ์Šˆ๋จธ: ๊ฐ ์Šค๋ ˆ๋“œ๊ฐ€ ์ž์ฒด KafkaConsumer๋ฅผ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ, ์Šค๋ ˆ๋“œ ๊ฐ„ ์ถฉ๋Œ์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š์Œ.

๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ: ๊ฐ ์ปจ์Šˆ๋จธ๊ฐ€ ํ• ๋‹น๋œ ํŒŒํ‹ฐ์…˜์—์„œ ๋ณ‘๋ ฌ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Œ.

๋‹จ์ 

์Šค๋ ˆ๋“œ ์ˆ˜๊ฐ€ ๋งŽ์•„์งˆ ๊ฒฝ์šฐ: ์ปจ์Šˆ๋จธ ์ˆ˜์™€ ํŒŒํ‹ฐ์…˜ ์ˆ˜๊ฐ€ ๋งž์ง€ ์•Š์œผ๋ฉด ๋น„ํšจ์œจ์ ์ธ ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Œ. ๊ฐ ํŒŒํ‹ฐ์…˜์— ํ•˜๋‚˜์˜ ์ปจ์Šˆ๋จธ๋งŒ ํ• ๋‹น๋˜๋ฏ€๋กœ, ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ณด๋‹ค ๋งŽ์€ ์ปจ์Šˆ๋จธ๊ฐ€ ์žˆ์„ ๋•Œ๋Š” ์ปจ์Šˆ๋จธ๊ฐ€ ๋†€๊ฒŒ ๋จ.


ํ•˜๋‚˜์˜ ์ปจ์Šˆ๋จธ + ์›Œ์ปค ์Šค๋ ˆ๋“œ (Single Consumer with Worker Threads)

์ด ๋ฐฉ์‹์—์„œ๋Š” ํ•˜๋‚˜์˜ ์ปจ์Šˆ๋จธ ์Šค๋ ˆ๋“œ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ , ๊ฐ€์ ธ์˜จ ๋ฉ”์‹œ์ง€๋ฅผ ์—ฌ๋Ÿฌ ์›Œ์ปค ์Šค๋ ˆ๋“œ๊ฐ€ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ตฌ์กฐ์ž…๋‹ˆ๋‹ค.

KafkaConsumer๋Š” ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ, KafkaConsumer๋Š” ๋‹จ์ผ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋˜๊ณ , ์›Œ์ปค ์Šค๋ ˆ๋“œ๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ๋ฅผ ๊ตฌํ˜„ํ•ฉ๋‹ˆ๋‹ค

class WorkerRunnable(val record: ConsumerRecord<String, String>) : Runnable {
    override fun run() {
        println("Processing message: ${record.value()} from partition ${record.partition()}")
        // ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ๋กœ์ง ์ถ”๊ฐ€
    }
}

fun main() {
    val consumerProps = Properties().apply {
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group")
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
    }

    val consumer = KafkaConsumer<String, String>(consumerProps)
    consumer.subscribe(listOf("my-topic"))

    // ์Šค๋ ˆ๋“œ ํ’€ ์ƒ์„ฑ (์›Œ์ปค ์Šค๋ ˆ๋“œ)
    val executor = Executors.newFixedThreadPool(10)

    try {
        while (true) {
            val records = consumer.poll(Duration.ofMillis(100))
            records.forEach { record ->
                // ์›Œ์ปค ์Šค๋ ˆ๋“œ์—์„œ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ
                executor.submit(WorkerRunnable(record))
            }
        }
    } catch (e: WakeupException) {
        println("Consumer is waking up...")
    } finally {
        consumer.close()
        executor.shutdown()  // ์›Œ์ปค ์Šค๋ ˆ๋“œ ์ข…๋ฃŒ
    }
}

์žฅ์ :

  • ์•ˆ์ „ํ•œ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ: KafkaConsumer๋Š” ๋‹จ์ผ ์Šค๋ ˆ๋“œ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ , ์‹ค์ œ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ๋Š” ์—ฌ๋Ÿฌ ์›Œ์ปค ์Šค๋ ˆ๋“œ์—์„œ ์ง„ํ–‰๋ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ์•ˆ์ „ํ•œ ์Šค๋ ˆ๋“œ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

  • ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ๋ณ‘๋ ฌํ™”: ๋ฉ”์‹œ์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ์ž‘์—…๊ณผ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ž‘์—…์„ ๋ถ„๋ฆฌํ•˜์—ฌ, ์ฒ˜๋ฆฌ ์ž‘์—…์„ ๋ณ‘๋ ฌํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹จ์ :

  • ๋‹จ์ผ ์ปจ์Šˆ๋จธ ๋ณ‘๋ชฉ: ๋ฉ”์‹œ์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ์ž‘์—…์ด ๋‹จ์ผ ์ปจ์Šˆ๋จธ์— ์˜์กดํ•˜๋ฏ€๋กœ, Kafka ํŒŒํ‹ฐ์…˜ ์ˆ˜๊ฐ€ ๋งŽ์„ ๊ฒฝ์šฐ ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ ํ•œ์ •๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.


๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ์ปจ์Šˆ๋จธ ์„ค๊ณ„ ์‹œ ๊ณ ๋ ค ์‚ฌํ•ญ

  1. ์Šค๋ ˆ๋“œ ์•ˆ์ „์„ฑ: KafkaConsumer๋Š” ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์—, ๊ฐ ์Šค๋ ˆ๋“œ๋Š” ๋…๋ฆฝ๋œ KafkaConsumer ์ธ์Šคํ„ด์Šค๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. assign()์ด๋‚˜ subscribe()๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒํ‹ฐ์…˜์„ ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์— ํ• ๋‹นํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  2. ์˜คํ”„์…‹ ์ปค๋ฐ‹ ๊ด€๋ฆฌ: ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ํ™˜๊ฒฝ์—์„œ๋Š” ์˜คํ”„์…‹ ์ปค๋ฐ‹ ๊ด€๋ฆฌ๊ฐ€ ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค. ๋งŒ์•ฝ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ณ  ์žˆ๋‹ค๋ฉด, ์˜คํ”„์…‹์„ ์–ธ์ œ ์ปค๋ฐ‹ํ• ์ง€์— ๋Œ€ํ•œ ๋ช…ํ™•ํ•œ ์ „๋žต์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ํŠนํžˆ ๋ฉ”์‹œ์ง€๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์ฒ˜๋ฆฌ๋œ ํ›„์—๋งŒ ์ปค๋ฐ‹ํ•˜๋„๋ก ํ•ด์•ผ ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  3. ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ: ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์—์„œ ์ปจ์Šˆ๋จธ๊ฐ€ ์ถ”๊ฐ€๋˜๊ฑฐ๋‚˜ ์ œ๊ฑฐ๋  ๋•Œ ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ์ด ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค. ์ด๋•Œ ๊ฐ ์ปจ์Šˆ๋จธ์— ํ• ๋‹น๋œ ํŒŒํ‹ฐ์…˜์ด ๋ณ€๊ฒฝ๋  ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ, ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ํ™˜๊ฒฝ์—์„œ๋Š” ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ์ด ์ผ์–ด๋‚  ๋•Œ ๊ฐ ์Šค๋ ˆ๋“œ๊ฐ€ ์ƒˆ๋กœ ํ• ๋‹น๋œ ํŒŒํ‹ฐ์…˜์„ ์ ์ ˆํžˆ ์ฒ˜๋ฆฌํ•˜๋„๋ก ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

Last updated