Kafka/Consumer

Auto Commit 사용시 메시지 유실 테스트

재심 2023. 2. 4. 23:03

목차

    [목표]

    Auto-Commit을 할 때 방금 가져온 메시지 오프셋을 커밋하는지 마지막에 처리한 오프셋을 커밋하는지 확인.

    Consumer에서 Auto-Commit 을 사용할 때 메시지 유실 가능성이 있는지 확인. 

     

    [메시지 유실될 가능성?]

    구글링을 해보면 아래같은 케이스에서 메시지 유실 가능성이 있다고 한다. 

    이러한 상황이 발생하려면 auto commit을 켰을 때 방금 읽어온 메시지 오프셋을 커밋해야 한다.  

    [테스트 환경]

    Java - zulu JDK11

    Apache Kafka Client - 2.8.1

    Spring Kafka - 2.6.13

    [테스트 방법]

    Consumer Properties

    • enable.auto.commit = true
    • max.poll.records = 10
    • auto.commit.interval.ms = 1000 (1초)

    메시지 처리방식

    메시지는 처리는 샘플 API를 호출하되, API에서 지연시간을 10초로 설정한다. (ex: polling 결과로 3개의 메시지를 처리해야 한다면 전체 메시지가 처리되는 시간은 30초이다) 

    메시지를 처리하는데 10초가 걸리므로 auto.commit.interval.ms 설정값인 1초를 당연히 초과할 것이다. 이런식으로 메시지를 처리하는 과정에서 커밋이 발생하는지 검증해본다. 

     

    소스코드

    Consumer

    public class ConsumerApplication {
    
    	private static final String TOPIC_NAME="jaeshim-topic-20220801";
    	private static final String GROUP_ID = "jaeshim-test-group2";
    
    	private static final String API_URI = "http://localhost:8080";
    
    	private static final String API_PATH = "/test/sleep";
    
    	public static void main(String[] args) throws URISyntaxException {
    
    		Logger logger = LoggerFactory.getLogger(ConsumerApplication.class);
    		Properties properties = new Properties();
    		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.30.1.1:9092");
    		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    		properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5); //max.poll.records
    		properties.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
    		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    		properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); //auto commit interval
    		properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
    
    
    		KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    		consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    
    		String message = null;
    
    		try{
    			int ct = 1;
    			do{
    				ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100000L));
    				ct = 1;
    				logger.info("message consumed " +records.count());
    
    				for(ConsumerRecord<String,String> record : records){
    					//API 호출
    					URL url = new URL(API_URI+API_PATH);
    					HttpURLConnection con = (HttpURLConnection) url.openConnection();
    					con.setConnectTimeout(60000);
    					con.setReadTimeout(60000);
    					con.setRequestMethod("GET");
    					con.setDoOutput(false);
    					if(con.getResponseCode() == HttpURLConnection.HTTP_OK){
    						logger.info("api called");
    					}
    
    					//처리한 오프셋 번호 찍기
    					logger.info("offset: "+record.offset()+", count: "+(ct++)+" of "+records.count());
    				}
    
    			}while(true);
    		}
    		catch (Exception e){
    
    		}finally {
    			consumer.close();
    		}
    
    	}
    
    }

    API

    10초간 sleep

     

    [검증사항]

    검증1. 메시지 처리 과정에서 auto-commit이 발생하는가?

    1. console-producer로 메시지 3개를 발행한다.
    2. consumer에서 3개의 메시지를 polling하는지 확인한다.
    3. 메시지가 처리되는 과정에서 auto-commit-interval 시간이 무조건 경과할 것이고(1초), 이 때 auto commit이 발생하는지 확인해본다

     

    auto-commit이 발생하는 경우 아래 로그들이 보인다.

    [2022-07-29 19:33:32,133] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Sending asynchronous auto-commit of offsets {sample-basic-topic-0=OffsetAndMetadata{offset=21396227, leaderEpoch=2409, metadata=''}}
    
    [2022-07-29 19:33:32,138] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Committed offset 21396227 for partition sample-basic-topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2022-07-29 19:33:32,138] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Completed asynchronous auto-commit of offsets {sample-basic-topic-0=OffsetAndMetadata{offset=21396227, leaderEpoch=2409, metadata=''}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

    검증2.메시지 처리 후 auto-commit을 할 때 처리한 메시지의 offset을 커밋하는가? 

    1. console-producer로 메시지 3개를 발행한다.
    2. consumer에서 3개의 메시지를 polling하는지 확인한다.
    3. 처리하는 과정에서 추가로 5개의 메시지를 발행한다.
      1. 만약 방금 polling한 메시지를 commit한다면 3개의 메시지를 모두 처리한 후 5개의 메시지를 읽어올 때 commit을 쳐서 Lag이 바로 0이 될것이다.

    [검증과정 및 결과]

    검증1.메시지 처리 과정에서 auto-commit이 발생하는가?

    # 1. console-producer에서 메시지 3개를 발행
    >3
    >4
    >5
     
    #2. consumer에서 3개의 메시지 polling 확인
    [2022-07-29 19:33:02,107] INFO message consumed 3 (com.gmarket.kafka.consumer.ConsumerApplication)
     
    #3. 메시지가 처리되는 과정에서 auto-commit이 발생하는지 확인
    [2022-07-29 19:33:12,125] INFO api called (com.gmarket.kafka.consumer.ConsumerApplication)
    [2022-07-29 19:33:12,125] INFO offset=21396224, count: 1 of 3 (com.gmarket.kafka.consumer.ConsumerApplication)
    ...
    [2022-07-29 19:33:22,125] INFO api called (com.gmarket.kafka.consumer.ConsumerApplication)
    [2022-07-29 19:33:22,125] INFO offset=21396225, count: 2 of 3 (com.gmarket.kafka.consumer.ConsumerApplication)
    ...
    [2022-07-29 19:33:32,125] INFO api called (com.gmarket.kafka.consumer.ConsumerApplication)
    [2022-07-29 19:33:32,125] INFO offset=21396226, count: 3 of 3 (com.gmarket.kafka.consumer.ConsumerApplication)
     
    [2022-07-29 19:33:32,133] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Sending asynchronous auto-commit of offsets {sample-basic-topic-0=OffsetAndMetadata{offset=21396227, leaderEpoch=2409, metadata=''}}
    [2022-07-29 19:33:32,138] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Committed offset 21396227 for partition sample-basic-topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    [2022-07-29 19:33:32,138] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Completed asynchronous auto-commit of offsets {sample-basic-topic-0=OffsetAndMetadata{offset=21396227, leaderEpoch=2409, metadata=''}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
     
    #처리가 모두 끝난 후에는 auto-commit-interval에 따라서 1초마다 auto-commit을 시도한다
    메시지를 처리하지 않을 때는 1초마다 auto-commit을 시도한다.
     
    [2022-07-29 19:33:38,184] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Sending asynchronous auto-commit of offsets {sample-basic-topic-0=OffsetAndMetadata{offset=21396227, leaderEpoch=2409, metadata=''}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    [2022-07-29 19:33:38,191] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Completed asynchronous auto-commit of offsets {sample-basic-topic-0=OffsetAndMetadata{offset=21396227, leaderEpoch=2409, metadata=''}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
     
    [2022-07-29 19:33:39,194] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Sending asynchronous auto-commit of offsets {sample-basic-topic-0=OffsetAndMetadata{offset=21396227, leaderEpoch=2409, metadata=''}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    [2022-07-29 19:33:39,198] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Completed asynchronous auto-commit of offsets

    검증2.메시지 처리 후 auto-commit을 할 때 처리한 메시지의 offset을 커밋하는가? 

    #3개의 메시지 생성
    >1
    >2
    >3
     
    [2022-08-02 09:03:38,510] INFO message consumed 3 (com.gmarket.kafka.consumer.ConsumerApplication)
     
    [2022-08-02 09:03:48,612] INFO api called (com.gmarket.kafka.consumer.ConsumerApplication)
    [2022-08-02 09:03:48,620] INFO offset: 1, count: 1 of 3 (com.gmarket.kafka.consumer.ConsumerApplication)
     
    #처리 도중 5개의 메시지 생성
    >1
    >2
    >3
    >4
    >5
     
    [2022-08-02 09:03:58,629] INFO api called (com.gmarket.kafka.consumer.ConsumerApplication)
    [2022-08-02 09:03:58,629] INFO offset: 2, count: 2 of 3 (com.gmarket.kafka.consumer.ConsumerApplication)
    [2022-08-02 09:04:08,650] INFO api called (com.gmarket.kafka.consumer.ConsumerApplication)
    [2022-08-02 09:04:08,650] INFO offset: 3, count: 3 of 3 (com.gmarket.kafka.consumer.ConsumerApplication)
     
    //처리완료 후 5개 메시지 polling
    [2022-08-02 09:04:08,655] INFO message consumed 5 (com.gmarket.kafka.consumer.ConsumerApplication)
     
    //auto commit이 발생하는데, offset번호를 최근 처리한 오프셋 번호로 함.
    [2022-08-02 09:04:08,651] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Sending asynchronous auto-commit of offsets {jaeshim-topic-20220801-0=OffsetAndMetadata{offset=4, leaderEpoch=0, metadata=''}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    [2022-08-02 09:04:08,676] DEBUG [Consumer clientId=consumer-jaeshim-test-group2-1, groupId=jaeshim-test-group2] Committed offset 4 for partition jaeshim-topic-20220801-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

     

    [결론]

    • Auto Commit을 켜도 메시지를 처리하는 과정에서는 commit이 발생하지 않는다.
    • 메시지를 모두 처리한 후 다음번 polling할 때 commit이 발생한다. 
      • 이 때 offset번호는 방금 처리한 오프셋번호이다.
    • 스프링 카프카의 경우에도 auto.commit을 켰을때 동일하게 동작한다. 
      • 다만 스프링의 경우 ackmode를 다양하게 제공하므로 각각의 ack모드에 따라 동작을 확인할 필요가 있다. 
    • 구글에는 생각보다 잘못된 정보가 많은 것 같다. 

     

    => 메시지를 처리하는 도중에는 auto-commit이 발생하지 않고, 메시지를 처리한 후에 방금 처리한 오프셋까지 커밋한다. 

    그러므로 우려와 달리 처리할 메시지를 먼저 커밋해서 처리하다가 죽는 유실케이스는 없을 것 같다. 

     

    [참조]

    Confluent 문서 발췌

    confluent 공식문서에서도 유실걱정이 없다고 한다. 

    By default, the consumer is configured to auto-commit offsets. Using auto-commit gives you 
    “at least once”
     delivery: Kafka guarantees that
     no messages will be misse
    d, but duplicates are possible. Auto-commit basically works as a cron with a period set through the 
    auto.commit.interval.ms
     configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset. When this happens, the last committed position may be as old as the auto-commit interval itself. Any messages which have arrived since the last commit will have to be read again.
     
     
    Auto Commit 소스 코드 부분 발췌
    public boolean poll(Timer timer) {
        invokeCompletedOffsetCommitCallbacks();
      
        ...
      
        //폴링할 때 auto-commit async메서드가 호출된다.
        maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
        return true;
      
    }
     
    //ConsumerCoordinator의 maybeAutoCommitOffsetAsync method에서 autoCommitEnabled일 경우 timer가 expire되었는지 여부를 확인하고 시간이 되면 async로 topic의 aprtition offset을 commit한다.
    public void maybeAutoCommitOffsetsAsync(long now) {
        if (autoCommitEnabled) {
            nextAutoCommitTimer.update(now);
            if (nextAutoCommitTimer.isExpired()) {
                nextAutoCommitTimer.reset(autoCommitIntervalMs);
                doAutoCommitOffsetsAsync();
            }
        }
    }

    'Kafka > Consumer' 카테고리의 다른 글

    Consumer Error Handling Patterns  (0) 2023.05.09
    Kafka Consumer 성능과 고려 요소들  (0) 2023.04.23
    Kafka Simple Consumer  (0) 2023.03.03
    Rebalancing  (0) 2023.03.03
    Consumer Assignment Strategies  (0) 2023.02.04