Listener
Record Listener
ํน์ง
๊ฐ๋ณ ๋ฉ์์ง ์ฒ๋ฆฌ: ๊ฐ ๋ฉ์์ง(๋ ์ฝ๋)๋ฅผ ํ๋์ฉ ์ฒ๋ฆฌํฉ๋๋ค.
๊ฐ๋จํ ๊ตฌํ: ๊ธฐ๋ณธ์ ์ธ ๋ฉ์์ง ์ฒ๋ฆฌ๋ฅผ ์ํ ๊ฐ์ฅ ์ผ๋ฐ์ ์ธ ๋ฆฌ์ค๋์ ๋๋ค.
๋น๋๊ธฐ ์ฒ๋ฆฌ: ๋ฉ์์ง๋ฅผ ๋น๋๊ธฐ์ ์ผ๋ก ์์ ํ๊ณ ์ฒ๋ฆฌํฉ๋๋ค.
๊ตฌํ ๋ฐฉ๋ฒ
@KafkaListener
์ด๋ ธํ ์ด์ ์ ์ฌ์ฉํ์ฌ ๋ฆฌ์ค๋ ๋ฉ์๋๋ฅผ ์ ์ํฉ๋๋ค.๋ฉ์๋ ํ๋ผ๋ฏธํฐ๋ก ๋ฉ์์ง์ ๊ฐ ๋๋
ConsumerRecord
๋ฅผ ๋ฐ์ ์ ์์ต๋๋ค.
์ฝ๋ ์ํ
์ต์
topics: ๋ฆฌ์ค๋ํ ํ ํฝ์ ์ง์ ํฉ๋๋ค.
groupId: ์ปจ์๋จธ ๊ทธ๋ฃน ID๋ฅผ ์ค์ ํฉ๋๋ค.
concurrency: ๋์์ ์คํ๋ ์ค๋ ๋ ์๋ฅผ ์ค์ ํ์ฌ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ง์ํฉ๋๋ค.
errorHandler: ๋ฉ์์ง ์ฒ๋ฆฌ ์ค ์์ธ ๋ฐ์ ์ ์ฌ์ฉํ ์๋ฌ ํธ๋ค๋ฌ๋ฅผ ์ง์ ํฉ๋๋ค.
Batch Listener
ํน์ง
์ฌ๋ฌ ๋ฉ์์ง ์ผ๊ด ์ฒ๋ฆฌ: ํ ๋ฒ์ ์ฌ๋ฌ ๋ฉ์์ง๋ฅผ ๋ฆฌ์คํธ๋ก ๋ฐ์ ์ฒ๋ฆฌํฉ๋๋ค.
์ฑ๋ฅ ํฅ์: ๋๋์ ๋ฉ์์ง๋ฅผ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
๋ฐฐ์น ์์ ์ ์ ํฉ: ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ผ๊ด ์ ๋ ฅ ๋ฑ ๋ฐฐ์น ์์ ์ ์ ์ฉํฉ๋๋ค.
๊ตฌํ ๋ฐฉ๋ฒ
ConcurrentKafkaListenerContainerFactory
์isBatchListener
๋ฅผtrue
๋ก ์ค์ ํฉ๋๋ค.๋ฆฌ์ค๋ ๋ฉ์๋์ ํ๋ผ๋ฏธํฐ๋ฅผ
List<ConsumerRecord<K, V>>
๋๋List<V>
๋ก ๋ฐ์ต๋๋ค.
์ฝ๋ ์ํ
์ค์ ํด๋์ค
๋ฐฐ์น ๋ฆฌ์ค๋ ๊ตฌํ
์ต์
batchSize: ํ ๋ฒ์ ๊ฐ์ ธ์ฌ ๋ฉ์์ง ์๋ฅผ ์ค์ ํฉ๋๋ค.
pollTimeout: ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ฌ ๋ ๋๊ธฐํ ์ต๋ ์๊ฐ์ ์ง์ ํฉ๋๋ค.
concurrency: ๋ณ๋ ฌ๋ก ์คํ๋ ์ปจ์๋จธ ์ค๋ ๋ ์๋ฅผ ์ค์ ํฉ๋๋ค.
Acknowledgment Mode Listener
ํน์ง
์๋ ์คํ์ ์ปค๋ฐ: ๋ฉ์์ง ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋ ํ ์๋์ผ๋ก ์คํ์ ์ ์ปค๋ฐํฉ๋๋ค.
์ ํํ ์ฒ๋ฆฌ ๋ณด์ฅ: ๋ฉ์์ง์ ์ค๋ณต ์ฒ๋ฆฌ๋ ์์ค์ ๋ฐฉ์งํ ์ ์์ต๋๋ค.
ํธ๋์ญ์ ๊ด๋ฆฌ: ๋ฐ์ดํฐ๋ฒ ์ด์ค ํธ๋์ญ์ ๊ณผ ์ฐ๊ณํ์ฌ ์ฌ์ฉ ๊ฐ๋ฅํฉ๋๋ค.
๊ตฌํ ๋ฐฉ๋ฒ
๋ฆฌ์ค๋ ๋ฉ์๋์ ํ๋ผ๋ฏธํฐ๋ก
Acknowledgment
๊ฐ์ฒด๋ฅผ ๋ฐ์ต๋๋ค.๋ฉ์์ง ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋ ํ
acknowledge()
๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ์คํ์ ์ ์ปค๋ฐํฉ๋๋ค.
์ฝ๋ ์ํ
๋ฆฌ์ค๋ ๊ตฌํ
์ค์ ํด๋์ค
์ต์
ackMode: ์๋ ์ปค๋ฐ ๋ชจ๋๋ฅผ ์ค์ ํฉ๋๋ค (
MANUAL
,MANUAL_IMMEDIATE
๋ฑ).syncCommits: ์ปค๋ฐ์ ๋๊ธฐํํ ์ง ์ฌ๋ถ๋ฅผ ์ค์ ํฉ๋๋ค.
errorHandler: ์์ธ ๋ฐ์ ์ ์ฒ๋ฆฌ ๋ก์ง์ ์ ์ํฉ๋๋ค.
Consumer Aware Listener
ํน์ง
Consumer ๊ฐ์ฒด ์ ๊ทผ: ๋ฆฌ์ค๋ ๋ฉ์๋์์
Consumer
๊ฐ์ฒด๋ฅผ ๋ฐ์ ์ปจ์๋จธ์ ๋ํ ์ ์ด๊ฐ ๊ฐ๋ฅํฉ๋๋ค.๋์ ์ ์ด: ๋ฐํ์์ ์ปจ์๋จธ ์ค์ ๋ณ๊ฒฝ์ด๋ ํํฐ์ ํ ๋น ๋ฑ์ด ๊ฐ๋ฅํฉ๋๋ค.
๊ณ ๊ธ ๊ธฐ๋ฅ ํ์ฉ: ๋ฎ์ ์์ค์ Kafka API๋ฅผ ํ์ฉํ ์ ์์ต๋๋ค.
๊ตฌํ ๋ฐฉ๋ฒ
๋ฆฌ์ค๋ ๋ฉ์๋์ ํ๋ผ๋ฏธํฐ๋ก
Consumer
๊ฐ์ฒด๋ฅผ ๋ฐ์ต๋๋ค.Consumer
API๋ฅผ ์ฌ์ฉํ์ฌ ์ปจ์๋จธ๋ฅผ ์ ์ดํฉ๋๋ค.
์ฝ๋ ์ํ
์ต์
pollTimeout: ๋ฉ์์ง๋ฅผ ํด๋งํ ๋ ์ฌ์ฉํ ํ์์์์ ์ค์ ํฉ๋๋ค.
listenerType: ๋ฆฌ์ค๋์ ํ์ ์ ์ง์ ํ์ฌ ์ํ๋ ํํ์ ๋ฆฌ์ค๋๋ฅผ ๊ตฌํํ ์ ์์ต๋๋ค.
concurrency: ์ปจ์๋จธ์ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ํ ์ค๋ ๋ ์๋ฅผ ์ง์ ํฉ๋๋ค.
Message Listener Container
ํน์ง
์ปค์คํ ๋ฆฌ์ค๋ ๊ตฌํ: ์ด๋ ธํ ์ด์ ์์ด ์ง์ ๋ฆฌ์ค๋ ์ปจํ ์ด๋๋ฅผ ์์ฑํ์ฌ ์ฌ์ฉํฉ๋๋ค.
๊ณ ๊ธ ์ค์ ์ง์: ์ธ๋ถ์ ์ธ ์ค์ ๊ณผ ์ ์ด๊ฐ ๊ฐ๋ฅํฉ๋๋ค.
์ ์ฐ์ฑ: ๋ณต์กํ ์๊ตฌ ์ฌํญ์ด๋ ํน์ํ ์ผ์ด์ค์ ์ ํฉํฉ๋๋ค.
๊ตฌํ ๋ฐฉ๋ฒ
MessageListenerContainer
๋ฅผ ์ง์ ์์ฑํ๊ณ ์ค์ ํฉ๋๋ค.ContainerProperties
๋ฅผ ํตํด ํ์ํ ์ค์ ์ ์ ์ฉํฉ๋๋ค.
์ฝ๋ ์ํ
์ต์
ContainerProperties: ์คํ์ ๊ด๋ฆฌ, ์๋ฌ ์ฒ๋ฆฌ, ์ค๋ ๋ ์ ๋ฑ ๋ค์ํ ์ค์ ์ด ๊ฐ๋ฅํฉ๋๋ค.
setAutoStartup: ์ปจํ ์ด๋์ ์๋ ์์ ์ฌ๋ถ๋ฅผ ์ค์ ํฉ๋๋ค.
setConcurrency: ์ปจ์๋จธ ์ค๋ ๋ ์๋ฅผ ์ง์ ํ์ฌ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ง์ํฉ๋๋ค.
์ถ๊ฐ ์ต์
๋ฐ ๊ณ ๋ ค ์ฌํญ
์๋ฌ ์ฒ๋ฆฌ (Error Handling)
errorHandler
๋๋seekToCurrentErrorHandler
๋ฅผ ์ฌ์ฉํ์ฌ ์์ธ ๋ฐ์ ์ ์ฌ์๋ ๋ก์ง์ ๊ตฌํํฉ๋๋ค.๋ฐ๋ ๋ ํฐ ํ(DLQ)๋ฅผ ์ค์ ํ์ฌ ์ฒ๋ฆฌ ์คํจํ ๋ฉ์์ง๋ฅผ ๋ณ๋์ ํ ํฝ์ ์ ์ฅํ ์ ์์ต๋๋ค.
๋ณ๋ ฌ ์ฒ๋ฆฌ ๋ฐ ์ฑ๋ฅ ํ๋
concurrency: ๋ฆฌ์ค๋ ์ปจํ ์ด๋ ํฉํ ๋ฆฌ์์ ์ค์ ํ์ฌ ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ค๋ ๋ ์๋ฅผ ๋๋ฆฝ๋๋ค.
ํํฐ์ ์ ์ฆ๊ฐ: ํ ํฝ์ ํํฐ์ ์๋ฅผ ๋๋ ค ๋ณ๋ ฌ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋๋ก ํฉ๋๋ค.
๋ณด์ ์ค์
SSL/TLS ์ํธํ:
ssl.keystore.location
๋ฑ์ ์ค์ ์ ํตํด ํต์ ์ ์ํธํํฉ๋๋ค.์ธ์ฆ ๋ฐ ๊ถํ ๋ถ์ฌ: SASL ๋๋ OAuth๋ฅผ ์ฌ์ฉํ์ฌ ์ธ์ฆ๊ณผ ๊ถํ ๊ด๋ฆฌ๋ฅผ ๊ตฌํํฉ๋๋ค.
ํธ๋์ญ์ ๊ด๋ฆฌ
ChainedKafkaTransactionManager
๋ฅผ ์ฌ์ฉํ์ฌ Kafka์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฐ์ ํธ๋์ญ์ ์ ํตํฉํ ์ ์์ต๋๋ค.
๋ชจ๋ํฐ๋ง ๋ฐ ๋ก๊น
์คํ๋ง ์ก์ถ์์ดํฐ(Actuator)์ ํตํฉํ์ฌ ์ ํ๋ฆฌ์ผ์ด์ ์ํ๋ฅผ ๋ชจ๋ํฐ๋งํฉ๋๋ค.
Kafka์ JMX ๋ฉํธ๋ฆญ์ ํ์ฉํ์ฌ ์ฑ๋ฅ ๋ฐ ์ํ๋ฅผ ์ถ์ ํฉ๋๋ค.
Last updated
Was this helpful?