Producer

Kafka ํ”„๋กœ๋“€์„œ๋ž€?

Kafka ํ”„๋กœ๋“€์„œ๋Š” Kafka ์‹œ์Šคํ…œ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•˜์—ฌ Kafka ํ† ํ”ฝ์œผ๋กœ ์ „์†กํ•˜๋Š” ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค. ํ”„๋กœ๋“€์„œ๋Š” ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๋ฐœ์ƒํ•œ ์ด๋ฒคํŠธ๋‚˜ ๋ฉ”์‹œ์ง€๋ฅผ Kafka ํด๋Ÿฌ์Šคํ„ฐ๋กœ ๋ณด๋‚ด๋ฉฐ, ์ด ๋ฉ”์‹œ์ง€๋Š” ๋‚˜์ค‘์— ์ปจ์Šˆ๋จธ์— ์˜ํ•ด ์ฒ˜๋ฆฌ๋ฉ๋‹ˆ๋‹ค. Kafka์˜ ๋น„๋™๊ธฐ ์ „์†ก ๋ฐฉ์‹๊ณผ ๊ณ ์„ฑ๋Šฅ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ๋•๋ถ„์—, ํ”„๋กœ๋“€์„œ๋Š” ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ์‹œ์Šคํ…œ์˜ ํ•ต์‹ฌ ์š”์†Œ์ž…๋‹ˆ๋‹ค.

Kafka ํ”„๋กœ๋“€์„œ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜๊ณผ Kafka ๋ธŒ๋กœ์ปค ๊ฐ„์˜ ํ†ต์‹ ์„ ๋‹ด๋‹นํ•ฉ๋‹ˆ๋‹ค. ํ”„๋กœ๋“€์„œ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ํ† ํ”ฝ์œผ๋กœ ์ „์†กํ•˜๊ณ , ๊ฐ ๋ฉ”์‹œ์ง€๋Š” ํŒŒํ‹ฐ์…˜์— ์ €์žฅ๋ฉ๋‹ˆ๋‹ค.

๋ฉ”์‹œ์ง€์˜ ์ˆœ์„œ์™€ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ๋ฅผ ํšจ๊ณผ์ ์œผ๋กœ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ํ‚ค(Key)์™€ ์˜คํ”„์…‹(Offset)์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

Kafka ํ”„๋กœ๋“€์„œ๋Š” ๋น„๋™๊ธฐ ๋ฐฉ์‹์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•˜๋ฉฐ, ๋ธŒ๋กœ์ปค์™€ ์ƒํ˜ธ์ž‘์šฉํ•˜๋Š” ๋™์•ˆ ๋ณต์ œ ์„ค์ • ๋ฐ ํ™•์ธ ์‘๋‹ต์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€ ์ „์†ก์˜ ์„ฑ๊ณต ์—ฌ๋ถ€๋ฅผ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Drawing

๋™์ž‘ ์›๋ฆฌ

๋ฐ์ดํ„ฐ ์ƒ์„ฑ ๋ฐ ๋ฉ”์‹œ์ง€ ๊ตฌ์„ฑ: ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๋ฐœ์ƒํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ ˆ์ฝ”๋“œ(Record) ํ˜•ํƒœ๋กœ Kafka ํ”„๋กœ๋“€์„œ๊ฐ€ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ๋ ˆ์ฝ”๋“œ๋Š” ํ‚ค์™€ ๊ฐ’์œผ๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค.

ํ† ํ”ฝ ๋ฐ ํŒŒํ‹ฐ์…˜ ์ง€์ •: ํ”„๋กœ๋“€์„œ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•  ํ† ํ”ฝ์„ ์ง€์ •ํ•˜๊ณ , Kafka๋Š” ๋ฉ”์‹œ์ง€์˜ ํ‚ค๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์–ด๋А ํŒŒํ‹ฐ์…˜์— ์ €์žฅํ• ์ง€ ๊ฒฐ์ •ํ•ฉ๋‹ˆ๋‹ค. ํ‚ค๊ฐ€ ์—†์œผ๋ฉด Kafka๋Š” ๋ผ์šด๋“œ ๋กœ๋นˆ ๋ฐฉ์‹์œผ๋กœ ํŒŒํ‹ฐ์…˜์„ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค.

๋ธŒ๋กœ์ปค๋กœ ๋ฉ”์‹œ์ง€ ์ „์†ก: ํ”„๋กœ๋“€์„œ๋Š” ์ง€์ •๋œ ํ† ํ”ฝ๊ณผ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ธŒ๋กœ์ปค์— ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•ฉ๋‹ˆ๋‹ค. ๋ฉ”์‹œ์ง€๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ „์†ก๋˜๋ฉด, ๋ธŒ๋กœ์ปค๋Š” ์„ค์ •๋œ ํ™•์ธ ์‘๋‹ต(acks) ๋ฐฉ์‹์— ๋”ฐ๋ผ ํ”„๋กœ๋“€์„œ์— ์‘๋‹ต์„ ๋ณด๋ƒ…๋‹ˆ๋‹ค.

์˜คํ”„์…‹ ๊ด€๋ฆฌ: ๋ฉ”์‹œ์ง€๊ฐ€ ๋ธŒ๋กœ์ปค์— ์ €์žฅ๋  ๋•Œ ์˜คํ”„์…‹์ด ๋ถ€์—ฌ๋˜๋ฉฐ, ์ด๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€์˜ ์ˆœ์„œ๊ฐ€ ๊ด€๋ฆฌ๋ฉ๋‹ˆ๋‹ค. ์ดํ›„, ์ปจ์Šˆ๋จธ๋Š” ์ด ์˜คํ”„์…‹์„ ๊ธฐ๋ฐ˜์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์Šต๋‹ˆ๋‹ค.

class KafkaClientTest {
    val topic = "test"

    @Test
    fun sendTest() {
        val producer = KafkaProducer<String, String>(ProducerConfiguration().config())
        producer.send(ProducerRecord(topic, "key", "value"))

        producer.flush()
        producer.close()
    }

    class ProducerConfiguration {
        fun config() = Properties().apply {
            put(BOOTSTRAP_SERVERS_CONFIG, "{host}")
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        }
    }
}

์˜ต์…˜

acks (Acknowledgment)

  • acks=0: ๋ธŒ๋กœ์ปค์˜ ์‘๋‹ต์„ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š๊ณ  ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•ฉ๋‹ˆ๋‹ค. ์„ฑ๋Šฅ์€ ๋†’์ง€๋งŒ, ๋ฐ์ดํ„ฐ ์†์‹ค ์œ„ํ—˜์ด ์žˆ์Šต๋‹ˆ๋‹ค.

  • acks=1: ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜์— ์ €์žฅ๋˜๋ฉด ํ™•์ธ ์‘๋‹ต์„ ๋ฐ›์Šต๋‹ˆ๋‹ค. ๋ฆฌ๋”๋งŒ ์ €์žฅํ•œ ๊ฒฝ์šฐ ์„ฑ๋Šฅ๊ณผ ์•ˆ์ •์„ฑ ์‚ฌ์ด์˜ ์ ˆ์ถฉ์•ˆ์ž…๋‹ˆ๋‹ค.

  • acks=all: ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฆฌ๋”์™€ ๋ชจ๋“  ํŒ”๋กœ์›Œ ํŒŒํ‹ฐ์…˜์— ์ €์žฅ๋˜๋ฉด ์‘๋‹ต์„ ๋ฐ›์Šต๋‹ˆ๋‹ค. ์„ฑ๋Šฅ์€ ๋‹ค์†Œ ๋А๋ฆฌ์ง€๋งŒ, ๋ฐ์ดํ„ฐ ๋‚ด๊ตฌ์„ฑ์„ ๋ณด์žฅํ•ฉ๋‹ˆ๋‹ค.

retries (์žฌ์‹œ๋„)

๋ฉ”์‹œ์ง€ ์ „์†ก ์‹คํŒจ ์‹œ ์žฌ์‹œ๋„ ํšŸ์ˆ˜๋ฅผ ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ธฐ๋Šฅ์„ ํ†ตํ•ด ๋„คํŠธ์›Œํฌ ์˜ค๋ฅ˜๋‚˜ ์ผ์‹œ์ ์ธ ์žฅ์• ๋กœ ์ธํ•ด ๋ฐ์ดํ„ฐ๊ฐ€ ์†์‹ค๋˜์ง€ ์•Š๋„๋ก ๋ฐฉ์ง€ํ•ฉ๋‹ˆ๋‹ค.

compression.type (์••์ถ• ๋ฐฉ์‹)

๋ฉ”์‹œ์ง€๋ฅผ ์••์ถ•ํ•˜์—ฌ ์ „์†กํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. gzip, snappy, lz4 ๋“ฑ์˜ ์••์ถ• ๋ฐฉ์‹์„ ์ง€์›ํ•˜๋ฉฐ, ์ด๋ฅผ ํ†ตํ•ด ๋„คํŠธ์›Œํฌ ๋Œ€์—ญํญ ์ ˆ์•ฝ๊ณผ ์ „์†ก ์†๋„ ํ–ฅ์ƒ์„ ๊ธฐ๋Œ€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

linger.ms

๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ”๋กœ ์ „์†กํ•˜์ง€ ์•Š๊ณ , ์ผ์ • ์‹œ๊ฐ„ ๋™์•ˆ ๋Œ€๊ธฐํ•˜์—ฌ ๋” ๋งŽ์€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ชจ์•„ ๋ฐฐ์น˜(batch)๋กœ ์ „์†กํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ๋„คํŠธ์›Œํฌ ํšจ์œจ์„ฑ์„ ๋†’์ด๊ณ  ์ „์†ก ์„ฑ๋Šฅ์„ ํ–ฅ์ƒ์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

batch.size

ํ•œ ๋ฒˆ์— ์ „์†กํ•  ๋ฐฐ์น˜ ํฌ๊ธฐ๋ฅผ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค. ํฌ๊ธฐ๊ฐ€ ํด์ˆ˜๋ก ๋” ๋งŽ์€ ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ ๋ฒˆ์— ์ „์†กํ•˜์—ฌ ์„ฑ๋Šฅ ์ตœ์ ํ™”๋ฅผ ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์ด ์ฆ๊ฐ€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋งž์Šต๋‹ˆ๋‹ค. Kafka์—์„œ ack (acknowledgment)๋Š” ๋งค์šฐ ์ค‘์š”ํ•œ ๊ฐœ๋…์ด๋ฉฐ, ๋ฉ”์‹œ์ง€๊ฐ€ ์•ˆ์ „ํ•˜๊ฒŒ ์ „์†ก๋˜๊ณ  ์ฒ˜๋ฆฌ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. ํ”„๋กœ๋“€์„œ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ธŒ๋กœ์ปค์— ์ „์†กํ•œ ํ›„, ๋ธŒ๋กœ์ปค๋Š” ํ”„๋กœ๋“€์„œ์—๊ฒŒ ๋ฉ”์‹œ์ง€๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ฒ˜๋ฆฌ๋˜์—ˆ๋Š”์ง€ ์‘๋‹ต(acknowledgment)์„ ๋ณด๋ƒ…๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ ๋‚ด๊ตฌ์„ฑ๊ณผ ์‹ ๋ขฐ์„ฑ์„ ๋ณด์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

acks๋ž€?

acks๋Š” Kafka ํ”„๋กœ๋“€์„œ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ Kafka ๋ธŒ๋กœ์ปค๋กœ ๋ณด๋‚ผ ๋•Œ, ๋ธŒ๋กœ์ปค๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ •์ƒ์ ์œผ๋กœ ์ˆ˜์‹ ํ–ˆ๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๋ฐฉ์‹์„ ์„ค์ •ํ•˜๋Š” ์˜ต์…˜์ž…๋‹ˆ๋‹ค.

์ด ์„ค์ •์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๊ฐ€ ๋ธŒ๋กœ์ปค์—์„œ ์•ˆ์ „ํ•˜๊ฒŒ ์ฒ˜๋ฆฌ๋˜์—ˆ๋Š”์ง€์— ๋Œ€ํ•œ ์‹ ๋ขฐ์„ฑ์„ ๋ณด์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Kafka์—์„œ ์ œ๊ณตํ•˜๋Š” acks ์„ค์ •์€ ์„ธ ๊ฐ€์ง€ ์ˆ˜์ค€์œผ๋กœ ๋‚˜๋‰ฉ๋‹ˆ๋‹ค:

acks=0:

  • ๋ธŒ๋กœ์ปค๊ฐ€ ์‘๋‹ตํ•˜์ง€ ์•Š์Œ: ํ”„๋กœ๋“€์„œ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ธŒ๋กœ์ปค๋กœ ๋ณด๋‚ธ ํ›„ ํ™•์ธ ์‘๋‹ต(ACK)์„ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š๊ณ  ๋ฐ”๋กœ ๋‹ค์Œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋ƒ…๋‹ˆ๋‹ค.

  • ์žฅ์ : ๋งค์šฐ ๋น ๋ฅด๊ฒŒ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๋‹จ์ : ๋ฐ์ดํ„ฐ ์†์‹ค ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ์Šต๋‹ˆ๋‹ค. ๋ธŒ๋กœ์ปค์— ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฑฐ๋‚˜ ๋„คํŠธ์›Œํฌ ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธฐ๋ฉด ๋ฉ”์‹œ์ง€๊ฐ€ ์œ ์‹ค๋  ์ˆ˜ ์žˆ์ง€๋งŒ, ํ”„๋กœ๋“€์„œ๋Š” ์ด๋ฅผ ์•Œ์ง€ ๋ชปํ•ฉ๋‹ˆ๋‹ค.

acks=1:

  • ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜์—์„œ๋งŒ ํ™•์ธ: ํ”„๋กœ๋“€์„œ๋Š” ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜์ด ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ๊ธฐ๋กํ•˜๋ฉด ํ™•์ธ ์‘๋‹ต(ACK)์„ ๋ฐ›์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ํŒ”๋กœ์›Œ ํŒŒํ‹ฐ์…˜์ด ์ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ณต์ œํ•˜์ง€ ์•Š์€ ์ƒํƒœ์—์„œ ๋ฆฌ๋”๊ฐ€ ์žฅ์• ๋ฅผ ์ผ์œผํ‚ค๋ฉด ๋ฐ์ดํ„ฐ๊ฐ€ ์†์‹ค๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ์žฅ์ : ๋น ๋ฅธ ์„ฑ๋Šฅ๊ณผ ์–ด๋А ์ •๋„์˜ ์‹ ๋ขฐ์„ฑ์„ ๋ชจ๋‘ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

  • ๋‹จ์ : ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜์˜ ์žฅ์•  ์‹œ ๋ฐ์ดํ„ฐ ์†์‹ค ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

acks=all (๋˜๋Š” acks=-1):

  • ๋ชจ๋“  ๋ณต์ œ๋ณธ์—์„œ ํ™•์ธ: ํ”„๋กœ๋“€์„œ๋Š” ๋ฆฌ๋”๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ๋ชจ๋“  ํŒ”๋กœ์›Œ ํŒŒํ‹ฐ์…˜์ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ณต์ œํ•˜๊ณ  ๊ธฐ๋กํ•œ ํ›„์— ํ™•์ธ ์‘๋‹ต(ACK)์„ ๋ฐ›์Šต๋‹ˆ๋‹ค. ์ด๋Š” ๊ฐ€์žฅ ๋†’์€ ์ˆ˜์ค€์˜ ๋‚ด๊ตฌ์„ฑ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

  • ์žฅ์ : ๋ฐ์ดํ„ฐ ์†์‹ค ๊ฐ€๋Šฅ์„ฑ์ด ๊ฑฐ์˜ ์—†์Šต๋‹ˆ๋‹ค. ๋ฆฌ๋”๊ฐ€ ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•ด๋„, ํŒ”๋กœ์›Œ ํŒŒํ‹ฐ์…˜์ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ณต์ œํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ๋Š” ์•ˆ์ „ํ•˜๊ฒŒ ์ €์žฅ๋ฉ๋‹ˆ๋‹ค.

  • ๋‹จ์ : ์„ฑ๋Šฅ์ด ๋А๋ ค์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ชจ๋“  ํŒ”๋กœ์›Œ ํŒŒํ‹ฐ์…˜์ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ณต์ œํ•  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์•ผ ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ ๋А๋ ค์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Kafka ํ”„๋กœ๋“€์„œ ์„ค์ •์—์„œ acks ์˜ต์…˜์„ ์„ค์ •ํ•˜์—ฌ ๋ฉ”์‹œ์ง€ ์ „์†ก์˜ ์‹ ๋ขฐ์„ฑ ์ˆ˜์ค€์„ ์กฐ์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ๋ฉ”์‹œ์ง€ ์†์‹ค์„ ๋ฐฉ์ง€ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด acks=all๋กœ ์„ค์ •ํ•˜๊ณ , ์„ฑ๋Šฅ์„ ์šฐ์„ ์‹œํ•  ๊ฒฝ์šฐ acks=0 ๋˜๋Š” acks=1์„ ์„ ํƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ์€ acks ์„ค์ •์„ ์‚ฌ์šฉํ•˜๋Š” ์˜ˆ์‹œ์ž…๋‹ˆ๋‹ค:

val props = Properties()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
props[ProducerConfig.ACKS_CONFIG] = "all"  // ๋ชจ๋“  ๋ณต์ œ๋ณธ์ด ๊ธฐ๋กํ•  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆผ

val producer = KafkaProducer<String, String>(props)
val record = ProducerRecord("my-topic", "key", "value")

// ๋ฉ”์‹œ์ง€ ์ „์†ก ํ›„ ์ฝœ๋ฐฑ์œผ๋กœ ๊ฒฐ๊ณผ ์ฒ˜๋ฆฌ
producer.send(record) { metadata, exception ->
    if (exception != null) {
        println("Error sending message: ${exception.message}")
    } else {
        println("Message sent successfully to partition ${metadata.partition()}, offset ${metadata.offset()}")
    }
}

producer.close()

์œ„ ์ฝ”๋“œ์—์„œ acks=all๋กœ ์„ค์ •ํ•˜์˜€๊ธฐ ๋•Œ๋ฌธ์—, ํ”„๋กœ๋“€์„œ๋Š” ๋ชจ๋“  ๋ณต์ œ๋ณธ์ด ๋ฉ”์‹œ์ง€๋ฅผ ๊ธฐ๋กํ•œ ํ›„์—๋งŒ ๋ฉ”์‹œ์ง€ ์ „์†ก์ด ์™„๋ฃŒ๋ฉ๋‹ˆ๋‹ค. ์ด๋Š” ๋ฐ์ดํ„ฐ์˜ ๋‚ด๊ตฌ์„ฑ์„ ๋ณด์žฅํ•˜๋Š” ๋ฐฉ๋ฒ•์ž…๋‹ˆ๋‹ค.


acks์™€ ๋ฐ์ดํ„ฐ ๋‚ด๊ตฌ์„ฑ์˜ ๊ด€๊ณ„

  • ๋‚ด๊ตฌ์„ฑ ๋ณด์žฅ: acks=all์€ ๋ฐ์ดํ„ฐ ๋‚ด๊ตฌ์„ฑ์„ ๊ฐ€์žฅ ๊ฐ•๋ ฅํ•˜๊ฒŒ ๋ณด์žฅํ•ฉ๋‹ˆ๋‹ค. ํ”„๋กœ๋“€์„œ๋Š” ๋ฆฌ๋”์™€ ๋ชจ๋“  ํŒ”๋กœ์›Œ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๊ธฐ๋กํ•œ ํ›„์— ์‘๋‹ต์„ ๋ฐ›๊ธฐ ๋•Œ๋ฌธ์—, ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜๋”๋ผ๋„ ๋ฐ์ดํ„ฐ๊ฐ€ ์†์‹ค๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

  • ์„ฑ๋Šฅ๊ณผ ์‹ ๋ขฐ์„ฑ์˜ ๊ท ํ˜•: acks=1์€ ์„ฑ๋Šฅ๊ณผ ์‹ ๋ขฐ์„ฑ ์‚ฌ์ด์—์„œ ๊ท ํ˜•์„ ์ฐพ๋Š” ์„ค์ •์ž…๋‹ˆ๋‹ค. ๋ฆฌ๋”๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ๊ธฐ๋กํ•˜๋ฉด ๊ณง๋ฐ”๋กœ ์‘๋‹ต์„ ๋ฐ›์œผ๋ฏ€๋กœ, ํŒ”๋กœ์›Œ ๋ณต์ œ๊ฐ€ ์™„๋ฃŒ๋˜๊ธฐ ์ „์— ๋ฆฌ๋”๊ฐ€ ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ์ผ๋ถ€ ๋ฐ์ดํ„ฐ๊ฐ€ ์†์‹ค๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๋น ๋ฅธ ์„ฑ๋Šฅ, ๋‚ฎ์€ ์‹ ๋ขฐ์„ฑ: acks=0์€ ๋ฉ”์‹œ์ง€ ์ „์†ก ์†๋„๋ฅผ ๊ทน๋Œ€ํ™”ํ•˜์ง€๋งŒ, ๋ฉ”์‹œ์ง€๊ฐ€ ์œ ์‹ค๋  ์œ„ํ—˜์ด ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ์˜ต์…˜์€ ๋งค์šฐ ๋น ๋ฅธ ์„ฑ๋Šฅ์„ ์š”๊ตฌํ•˜์ง€๋งŒ, ๋ฉ”์‹œ์ง€ ์†์‹ค์— ๋Œ€ํ•œ ์‹ ๋ขฐ์„ฑ์€ ํ•„์š”ํ•˜์ง€ ์•Š์€ ํ™˜๊ฒฝ์—์„œ ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค.

Partioner

Kafka Partitioner๋Š” Kafka ํ”„๋กœ๋“€์„œ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•  ๋•Œ, ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋ฅผ ์–ด๋А ํŒŒํ‹ฐ์…˜์— ์ €์žฅํ• ์ง€ ๊ฒฐ์ •ํ•˜๋Š” ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค.

ํ† ํ”ฝ(Topic)์€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜(Partition)์œผ๋กœ ๋‚˜๋ˆ„์–ด์ง€๋ฉฐ, Partitioner๋Š” ๋ฉ”์‹œ์ง€๊ฐ€ ํŠน์ • ํŒŒํ‹ฐ์…˜์— ๋ถ„๋ฐฐ๋˜๋„๋ก ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด Kafka๋Š” ๋ฐ์ดํ„ฐ ๋ถ„์‚ฐ๊ณผ ํ™•์žฅ์„ฑ์„ ํšจ๊ณผ์ ์œผ๋กœ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ž‘๋™ ์›๋ฆฌ

2.1 ๋ฉ”์‹œ์ง€์— ํ‚ค(Key)๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ:

๋ฉ”์‹œ์ง€์— ํ‚ค๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์œผ๋ฉด, Kafka๋Š” ์ด ํ‚ค๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•ด์‹œ ํ•จ์ˆ˜๋ฅผ ์ ์šฉํ•˜์—ฌ ํŠน์ • ํŒŒํ‹ฐ์…˜์„ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค. ๊ฐ™์€ ํ‚ค๋ฅผ ๊ฐ€์ง„ ๋ฉ”์‹œ์ง€๋Š” ํ•ญ์ƒ ๋™์ผํ•œ ํŒŒํ‹ฐ์…˜์— ์ €์žฅ๋˜๋ฏ€๋กœ, ํŒŒํ‹ฐ์…˜ ๋‚ด์—์„œ ๋ฉ”์‹œ์ง€ ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ๋ฉ๋‹ˆ๋‹ค.

2.2 ๋ฉ”์‹œ์ง€์— ํ‚ค๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ:

๋ฉ”์‹œ์ง€์— ํ‚ค๊ฐ€ ์—†์„ ๋•Œ๋Š”, Kafka๋Š” ๋ผ์šด๋“œ ๋กœ๋นˆ ๋ฐฉ์‹ ๋˜๋Š” ๊ธฐํƒ€ ๊ธฐ๋ณธ ๋ถ„๋ฐฐ ๊ทœ์น™์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๊ท ๋“ฑํ•˜๊ฒŒ ํŒŒํ‹ฐ์…˜์— ๋ถ„๋ฐฐํ•ฉ๋‹ˆ๋‹ค. ์ด๋Š” ๋ชจ๋“  ํŒŒํ‹ฐ์…˜์— ๋ฐ์ดํ„ฐ๊ฐ€ ๊ณ ๋ฅด๊ฒŒ ๋ถ„์‚ฐ๋˜๋„๋ก ํ•˜์—ฌ, Kafka ํด๋Ÿฌ์Šคํ„ฐ์˜ ๋ถ€ํ•˜ ๋ถ„์‚ฐ์„ ๋•์Šต๋‹ˆ๋‹ค. ์ด ๋ฐฉ์‹์—์„œ๋Š” ๋ฉ”์‹œ์ง€ ์ˆœ์„œ ๋ณด์žฅ์ด ํ•„์š”ํ•˜์ง€ ์•Š์€ ๊ฒฝ์šฐ์— ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค.

์ปค์Šคํ…€ ํŒŒํ‹ฐ์…”๋„ˆ(Custom Partitioner)

Kafka๋Š” ๊ธฐ๋ณธ ํŒŒํ‹ฐ์…”๋„ˆ ์™ธ์—๋„ ์‚ฌ์šฉ์ž ์ •์˜(Custom) ํŒŒํ‹ฐ์…”๋„ˆ๋ฅผ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋„๋ก ์ง€์›ํ•ฉ๋‹ˆ๋‹ค.

public class CustomPartitioner implements Partitioner {
    @Override
    public void configure(Map<String, ?> configs) {
        // ์„ค์ • ์ดˆ๊ธฐํ™”
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int partitionCount = cluster.partitionCountForTopic(topic);
        
        // ํŠน์ • ํ‚ค๋ฅผ ๊ฐ€์ง„ ๋ฉ”์‹œ์ง€๋Š” ํŒŒํ‹ฐ์…˜ 0์— ํ• ๋‹น
        if (key.equals("VIP")) {
            return 0;
        } else {
            return (key.hashCode() & Integer.MAX_VALUE) % partitionCount;
        }
    }

    @Override
    public void close() {
        // ํŒŒํ‹ฐ์…”๋„ˆ ์ข…๋ฃŒ ์‹œ ํ•„์š”ํ•œ ์ž‘์—…
    }
}

Partitioner์˜ ํ•„์š”์„ฑ

Partitioner๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ค‘์š”ํ•œ ์ด์œ ๋กœ Kafka์—์„œ ํ•„์ˆ˜์ ์ธ ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค

๋ฐ์ดํ„ฐ ์ผ๊ด€์„ฑ ๋ฐ ์ˆœ์„œ ๋ณด์žฅ: ๊ฐ™์€ ํ‚ค๋ฅผ ๊ฐ€์ง„ ๋ฉ”์‹œ์ง€๋ฅผ ๋™์ผํ•œ ํŒŒํ‹ฐ์…˜์— ์ €์žฅํ•˜์—ฌ, ํŒŒํ‹ฐ์…˜ ๋‚ด์—์„œ ์ˆœ์„œ๋ฅผ ๋ณด์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” ์˜ˆ๋ฅผ ๋“ค์–ด ์‚ฌ์šฉ์ž ID๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์‚ฌ์šฉ์ž์˜ ํ™œ๋™์„ ์ถ”์ ํ•˜๋Š” ๊ฒฝ์šฐ ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค.

ํšจ์œจ์ ์ธ ๋ถ€ํ•˜ ๋ถ„์‚ฐ: ํ‚ค๊ฐ€ ์—†๋Š” ๋ฉ”์‹œ์ง€๋Š” ๊ท ๋“ฑํ•˜๊ฒŒ ๋ถ„์‚ฐ๋˜์–ด, ํด๋Ÿฌ์Šคํ„ฐ์˜ ๋ถ€ํ•˜๋ฅผ ๊ณ ๋ฅด๊ฒŒ ๋‚˜๋ˆ„์–ด ์ฒ˜๋ฆฌ ์„ฑ๋Šฅ์„ ์ตœ์ ํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ปค์Šคํ„ฐ๋งˆ์ด์ง• ๊ฐ€๋Šฅ: ๋น„์ฆˆ๋‹ˆ์Šค ์š”๊ตฌ์— ๋”ฐ๋ผ ํŠน์ • ์กฐ๊ฑด์— ๋งž๊ฒŒ ๋ฐ์ดํ„ฐ ๋ถ„๋ฐฐ ๋ฐฉ์‹์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์˜ ์œ ์—ฐ์„ฑ์„ ๋†’์—ฌ์ค๋‹ˆ๋‹ค.

Last updated