Kafka/Spring Kafka

Spring Kafka - Producer

재심 2023. 5. 29. 12:16

목차

    [개요]

    스프링 카프카 프로듀서는 KafkaTemplate 이라는 클래스를 사용한다.

    KafkaTemplate은 ProducerFactory 클래스를 통해 생성된다.

    KafkaTemplate을 생성하는 방법은 "기본 템플릿" or "사용자 정의 템플릿"을 ProducerFactory로 생성하는 방법이다.

     

    [Basic KafkaTemplate]

    기본 카프카 템플릿은 application.yaml에 프로듀서 옵션을 넣기만 하면 옵션이 적용된다.

    spring:
        kafka:
            producer:
                bootstrap-servers: localhost:9092
                acks: all
                enable.idempotence: true
                retries: 3
                ....

    예제

    기본 템플릿을 통해 "jaeshim-test"라는 토픽에 메시지를 전송하는 코드.

    @SpringBootApplication
    public class SpringProducerApplication implements CommandLineRunner {
      private static String TOPIC_NAME = "jaeshim-test";
     
      @Autowired
      private KafkaTemplate<String, String> template;
     
      public static void main(String[] args){
        System.out.println("RUN");
        SpringApplication application=new SpringApplication(SpringProducerApplication.class);
        application.run(args);
      }
     
        //단순 send
      @Override
      public void run(String... args){
        System.out.println("running");
     
        for(int i=0;i<10;i++){
          template.send(TOPIC_NAME,"test"+i);
          System.out.println(TOPIC_NAME+":"+i+"has been sended");
        }
    //
        System.exit(0);
      }
     
     
        //콜백함수를 추가하여 전송여부를 비동기식으로 알 수도 있다.
      @Override
      public void run(String... args){
        System.out.println("running");
     
        ListenableFuture<SendResult<String,String>> future = template.send(TOPIC_NAME,"basic");
     
        future.addCallback(new KafkaSendCallback<String,String>(){
          @Override
          public void onSuccess(SendResult<String,String> result){
            System.out.println("sent topic: "+result.getRecordMetadata().topic());
            System.out.println("sent offset: "+result.getRecordMetadata().offset());
            System.out.println("sent partition: "+result.getRecordMetadata().partition());
          }
          @Override
          public void onFailure(KafkaProducerException ex){
     
          }
     
        });
        System.exit(0);
      }
    }

    실행결과

      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::                (v2.4.9)
     
    ...
    2021-07-26 11:49:54.158  INFO 35684 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = -1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = true
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
     
    2021-07-26 11:49:54.205  INFO 35684 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.2
    2021-07-26 11:49:54.205  INFO 35684 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: da65af02e5856e34
    2021-07-26 11:49:54.205  INFO 35684 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1627267794205
    2021-07-26 11:49:54.346  INFO 35684 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: ZL-0SzhnRwCYxBnL69ZofA
    2021-07-26 11:49:54.361  INFO 35684 --- [extShutdownHook] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
    sent topic: jaeshim-test
    sent offset: 95
    sent partition: 1
     
    Process finished with exit code 0

    send 오버로딩 메서드들

    • send(String topic,K key, V data): 키, 값을 전달
    • send(String topic,Integer partition,K key, V data): 키, 값과 특정 파티션으로 전달
    • send(String topic,Integer partition, Long timestamp,K key, V data): 타임스탬프까지 포함
    • send(ProducerRecord<K,V> record): ProducerRecord 객체를 전송

    [Custom KafkaTemplate]

    커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 객체를 빈으로 등록하여 사용하는 것.

    하나의 애플리케이션 안에 여러 종류의 템플릿이 필요하다면 커스텀으로 정의해서 사용한다고 한다.

    ex) 하나의 애플리케이션에서 2개의 클러스터를 호출해야할 때 여러개의 커스텀을 만들고 용도에 맞게 주입받아 사용

     

    예제

    @Configuration
    public class KafkaTemplateConfiguration {
     
        //KafkaTemplate객체를 리턴
      @Bean
      public KafkaTemplate<String,String> customKafkaTemplate(){
        Map<String,Object> props = new HashMap<>();
     
        //필요한 config를 직접 입력
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     
        props.put(ProducerConfig.ACKS_CONFIG,"all");
     
        //팩토리를 사용하여 KafkaTemplate을 생성한다.
        ProducerFactory<String,String> pf = new DefaultKafkaProducerFactory<>(props);
     
        return new KafkaTemplate<>(pf);
      }
    }
    @SpringBootApplication
    public class SpringProducerCustomApplication implements CommandLineRunner {
     
      private static String TOPIC_NAME = "jaeshim-test";
     
      @Autowired
      private KafkaTemplate<String,String> customKafkaTemplate;
     
      public static void main(String[] args){
        SpringApplication application = new SpringApplication(SpringProducerCustomApplication.class);
        application.run(args);
      }
     
      @Override
     
      public void run(String... args){
        //custom으로 정의한 카프카 템플릿을 이용한다.
        ListenableFuture<SendResult<String,String>> future = customKafkaTemplate.send(TOPIC_NAME,"custom");
     
        System.out.println("RUN CUSTOM APPLICATION");
     
        //콜백 함수를 통해 프로듀서가 보낸 데이터의 브로커 적재여부를 확인할 수 있다.
        future.addCallback(new KafkaSendCallback<String,String>(){
          @Override
          public void onSuccess(SendResult<String,String> result){
            System.out.println("sent topic: "+result.getRecordMetadata().topic());
            System.out.println("sent offset: "+result.getRecordMetadata().offset());
            System.out.println("sent partition: "+result.getRecordMetadata().partition());
          }
          @Override
          public void onFailure(KafkaProducerException ex){
     
          }
     
        });
      }
     
    }

    실행결과

      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::                (v2.4.9)
     
    ...
    2021-07-26 11:45:27.408  INFO 51268 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = -1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = true
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
     
    2021-07-26 11:45:27.455  INFO 51268 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.2
    2021-07-26 11:45:27.455  INFO 51268 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: da65af02e5856e34
    2021-07-26 11:45:27.455  INFO 51268 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1627267527455
    2021-07-26 11:45:27.596  INFO 51268 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: ZL-0SzhnRwCYxBnL69ZofA
    RUN CUSTOM APPLICATION
    2021-07-26 11:45:27.611  INFO 51268 --- [extShutdownHook] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
    sent topic: jaeshim-test
    sent offset: 92
    sent partition: 0

     

    [JSON Serializer로 객체단위의 이벤트 처리]

    참조: https://sup2is.github.io/2020/06/03/spring-boot-with-kafka-cluster.html

     

    json serializer를 적용하여 객체단위로 이벤트를 발행하고, 리스너에서도 객체 단위로 읽고 처리할 수 있도록 하려는 목적.

    객체 단위로 메시지를 발행하면 C3에서 아래처럼 구분되어 조회된다.

     

    예제: OrderEvent 라는 객체를 구성하고 OrderEvent단위로 이벤트를 발행해본다

    OrderEvent는 주문번호로 구성된다고 가정한다.

     

    OrderEvent

    @Value
    @Builder
    @JsonDeserialize(builder = OrderEvent.OrderEventBuilder.class)
    public class OrderEvent {
      @NotBlank(message = "orderNo is empty")
      private String orderNo;
      
      @JsonPOJOBuilder(withPrefix = "")
      public static class OrderEventBuilder {
      }
    }

    ProducerConfig

    @Configuration
    public class KafkaProducerTemplateConfiguration {
      @Value("${spring.kafka.producer.bootstrap-servers}")
      public String servers;
     
      @Bean
      public ProducerFactory<String, OrderEvent> producerFactory(){
        Map<String,Object> configProps=producerFactoryConfig();
        return new DefaultKafkaProducerFactory<>(configProps);
      }
     
      private Map<String,Object> producerFactoryConfig(){
        Map<String,Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
     
        return configProps;
      }
     
      @Bean
      public KafkaTemplate<String,OrderEvent> customJsontemplate(){
        return new KafkaTemplate<>(producerFactory());
      }
    }

    Producer

    @SpringBootApplication
    public class SpringProducerWithSerialize implements CommandLineRunner {
      private static String TOPIC_NAME = "jaeshim-test-json";
     
     
      @Autowired
      private KafkaTemplate<String,OrderEvent> customJsontemplate;
     
      public static void main(String[] args) {
        System.out.println("RUN");
        SpringApplication application = new SpringApplication(SpringProducerWithSerialize.class);
        application.run(args);
      }
     
      @Override
      public void run(String... args) {
        #샘플 데이터
        final var orderNo = "211225";    
     
        #위에서 선언한 템플릿을 사용하여 send
        ListenableFuture<SendResult<String, OrderEvent>> future = customJsontemplate.send(
            MessageBuilder.withPayload( OrderEvent.builder().orderNo(orderNo)
                .build())
                .setHeader(KafkaHeaders.MESSAGE_KEY, null)
                .setHeader(KafkaHeaders.TOPIC, TOPIC_NAME)
                .build());
     
        #콜백함수로 결과받기
        future.addCallback(new ListenableFutureCallback<SendResult<String, OrderEvent>>() {
          @Override
          public void onSuccess(SendResult<String, OrderEvent> result) {
            System.out.println("sent topic: " + result.getRecordMetadata().topic());
            System.out.println("sent offset: " + result.getRecordMetadata().offset());
            System.out.println("sent partition: " + result.getRecordMetadata().partition());
          }
     
          @Override
          public void onFailure(Throwable ex) {
            System.err.println(ex.getMessage());
          }
     
        });
        System.out.println("message processed");
      }
    }

    결과

    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer로 적용된 것을 알 수 있다.

     .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::                (v2.4.9)
     
    ...
    2021-08-05 16:00:07.412  INFO 2056 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = -1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = true
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
     
    2021-08-05 16:00:07.474  INFO 2056 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.2
    2021-08-05 16:00:07.474  INFO 2056 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: da65af02e5856e34
    2021-08-05 16:00:07.474  INFO 2056 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1628146807474
    2021-08-05 16:00:07.599  INFO 2056 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: ZL-0SzhnRwCYxBnL69ZofA
    message processed
    2021-08-05 16:00:07.631  INFO 2056 --- [extShutdownHook] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
    sent topic: jaeshim-test-json
    sent offset: 18
    sent partition: 0
     
    Process finished with exit code 0