Spring Kafka - Producer
목차
[개요]
스프링 카프카 프로듀서는 KafkaTemplate 이라는 클래스를 사용한다.
KafkaTemplate은 ProducerFactory 클래스를 통해 생성된다.
KafkaTemplate을 생성하는 방법은 "기본 템플릿" or "사용자 정의 템플릿"을 ProducerFactory로 생성하는 방법이다.
[Basic KafkaTemplate]
기본 카프카 템플릿은 application.yaml에 프로듀서 옵션을 넣기만 하면 옵션이 적용된다.
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: all
enable.idempotence: true
retries: 3
....
예제
기본 템플릿을 통해 "jaeshim-test"라는 토픽에 메시지를 전송하는 코드.
@SpringBootApplication
public class SpringProducerApplication implements CommandLineRunner {
private static String TOPIC_NAME = "jaeshim-test";
@Autowired
private KafkaTemplate<String, String> template;
public static void main(String[] args){
System.out.println("RUN");
SpringApplication application=new SpringApplication(SpringProducerApplication.class);
application.run(args);
}
//단순 send
@Override
public void run(String... args){
System.out.println("running");
for(int i=0;i<10;i++){
template.send(TOPIC_NAME,"test"+i);
System.out.println(TOPIC_NAME+":"+i+"has been sended");
}
//
System.exit(0);
}
//콜백함수를 추가하여 전송여부를 비동기식으로 알 수도 있다.
@Override
public void run(String... args){
System.out.println("running");
ListenableFuture<SendResult<String,String>> future = template.send(TOPIC_NAME,"basic");
future.addCallback(new KafkaSendCallback<String,String>(){
@Override
public void onSuccess(SendResult<String,String> result){
System.out.println("sent topic: "+result.getRecordMetadata().topic());
System.out.println("sent offset: "+result.getRecordMetadata().offset());
System.out.println("sent partition: "+result.getRecordMetadata().partition());
}
@Override
public void onFailure(KafkaProducerException ex){
}
});
System.exit(0);
}
}
실행결과
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.9)
...
2021-07-26 11:49:54.158 INFO 35684 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2021-07-26 11:49:54.205 INFO 35684 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.2
2021-07-26 11:49:54.205 INFO 35684 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: da65af02e5856e34
2021-07-26 11:49:54.205 INFO 35684 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1627267794205
2021-07-26 11:49:54.346 INFO 35684 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: ZL-0SzhnRwCYxBnL69ZofA
2021-07-26 11:49:54.361 INFO 35684 --- [extShutdownHook] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
sent topic: jaeshim-test
sent offset: 95
sent partition: 1
Process finished with exit code 0
send 오버로딩 메서드들
- send(String topic,K key, V data): 키, 값을 전달
- send(String topic,Integer partition,K key, V data): 키, 값과 특정 파티션으로 전달
- send(String topic,Integer partition, Long timestamp,K key, V data): 타임스탬프까지 포함
- send(ProducerRecord<K,V> record): ProducerRecord 객체를 전송
[Custom KafkaTemplate]
커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 객체를 빈으로 등록하여 사용하는 것.
하나의 애플리케이션 안에 여러 종류의 템플릿이 필요하다면 커스텀으로 정의해서 사용한다고 한다.
ex) 하나의 애플리케이션에서 2개의 클러스터를 호출해야할 때 여러개의 커스텀을 만들고 용도에 맞게 주입받아 사용
예제
@Configuration
public class KafkaTemplateConfiguration {
//KafkaTemplate객체를 리턴
@Bean
public KafkaTemplate<String,String> customKafkaTemplate(){
Map<String,Object> props = new HashMap<>();
//필요한 config를 직접 입력
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG,"all");
//팩토리를 사용하여 KafkaTemplate을 생성한다.
ProducerFactory<String,String> pf = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(pf);
}
}
@SpringBootApplication
public class SpringProducerCustomApplication implements CommandLineRunner {
private static String TOPIC_NAME = "jaeshim-test";
@Autowired
private KafkaTemplate<String,String> customKafkaTemplate;
public static void main(String[] args){
SpringApplication application = new SpringApplication(SpringProducerCustomApplication.class);
application.run(args);
}
@Override
public void run(String... args){
//custom으로 정의한 카프카 템플릿을 이용한다.
ListenableFuture<SendResult<String,String>> future = customKafkaTemplate.send(TOPIC_NAME,"custom");
System.out.println("RUN CUSTOM APPLICATION");
//콜백 함수를 통해 프로듀서가 보낸 데이터의 브로커 적재여부를 확인할 수 있다.
future.addCallback(new KafkaSendCallback<String,String>(){
@Override
public void onSuccess(SendResult<String,String> result){
System.out.println("sent topic: "+result.getRecordMetadata().topic());
System.out.println("sent offset: "+result.getRecordMetadata().offset());
System.out.println("sent partition: "+result.getRecordMetadata().partition());
}
@Override
public void onFailure(KafkaProducerException ex){
}
});
}
}
실행결과
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.9)
...
2021-07-26 11:45:27.408 INFO 51268 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2021-07-26 11:45:27.455 INFO 51268 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.2
2021-07-26 11:45:27.455 INFO 51268 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: da65af02e5856e34
2021-07-26 11:45:27.455 INFO 51268 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1627267527455
2021-07-26 11:45:27.596 INFO 51268 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: ZL-0SzhnRwCYxBnL69ZofA
RUN CUSTOM APPLICATION
2021-07-26 11:45:27.611 INFO 51268 --- [extShutdownHook] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
sent topic: jaeshim-test
sent offset: 92
sent partition: 0
[JSON Serializer로 객체단위의 이벤트 처리]
참조: https://sup2is.github.io/2020/06/03/spring-boot-with-kafka-cluster.html
json serializer를 적용하여 객체단위로 이벤트를 발행하고, 리스너에서도 객체 단위로 읽고 처리할 수 있도록 하려는 목적.
객체 단위로 메시지를 발행하면 C3에서 아래처럼 구분되어 조회된다.
예제: OrderEvent 라는 객체를 구성하고 OrderEvent단위로 이벤트를 발행해본다
OrderEvent는 주문번호로 구성된다고 가정한다.
OrderEvent
@Value
@Builder
@JsonDeserialize(builder = OrderEvent.OrderEventBuilder.class)
public class OrderEvent {
@NotBlank(message = "orderNo is empty")
private String orderNo;
@JsonPOJOBuilder(withPrefix = "")
public static class OrderEventBuilder {
}
}
ProducerConfig
@Configuration
public class KafkaProducerTemplateConfiguration {
@Value("${spring.kafka.producer.bootstrap-servers}")
public String servers;
@Bean
public ProducerFactory<String, OrderEvent> producerFactory(){
Map<String,Object> configProps=producerFactoryConfig();
return new DefaultKafkaProducerFactory<>(configProps);
}
private Map<String,Object> producerFactoryConfig(){
Map<String,Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configProps;
}
@Bean
public KafkaTemplate<String,OrderEvent> customJsontemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Producer
@SpringBootApplication
public class SpringProducerWithSerialize implements CommandLineRunner {
private static String TOPIC_NAME = "jaeshim-test-json";
@Autowired
private KafkaTemplate<String,OrderEvent> customJsontemplate;
public static void main(String[] args) {
System.out.println("RUN");
SpringApplication application = new SpringApplication(SpringProducerWithSerialize.class);
application.run(args);
}
@Override
public void run(String... args) {
#샘플 데이터
final var orderNo = "211225";
#위에서 선언한 템플릿을 사용하여 send
ListenableFuture<SendResult<String, OrderEvent>> future = customJsontemplate.send(
MessageBuilder.withPayload( OrderEvent.builder().orderNo(orderNo)
.build())
.setHeader(KafkaHeaders.MESSAGE_KEY, null)
.setHeader(KafkaHeaders.TOPIC, TOPIC_NAME)
.build());
#콜백함수로 결과받기
future.addCallback(new ListenableFutureCallback<SendResult<String, OrderEvent>>() {
@Override
public void onSuccess(SendResult<String, OrderEvent> result) {
System.out.println("sent topic: " + result.getRecordMetadata().topic());
System.out.println("sent offset: " + result.getRecordMetadata().offset());
System.out.println("sent partition: " + result.getRecordMetadata().partition());
}
@Override
public void onFailure(Throwable ex) {
System.err.println(ex.getMessage());
}
});
System.out.println("message processed");
}
}
결과
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer로 적용된 것을 알 수 있다.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.9)
...
2021-08-05 16:00:07.412 INFO 2056 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
2021-08-05 16:00:07.474 INFO 2056 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.2
2021-08-05 16:00:07.474 INFO 2056 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: da65af02e5856e34
2021-08-05 16:00:07.474 INFO 2056 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1628146807474
2021-08-05 16:00:07.599 INFO 2056 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: ZL-0SzhnRwCYxBnL69ZofA
message processed
2021-08-05 16:00:07.631 INFO 2056 --- [extShutdownHook] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
sent topic: jaeshim-test-json
sent offset: 18
sent partition: 0
Process finished with exit code 0