Kafka/Producer

Sticky Partitioner

재심 2022. 12. 3. 22:06

[개요]

Apache Kafka 2.4 버전 이후부터 사용가능한 파티셔너이다.

기존에는 RR방식으로 동작했는데, 이 방식에 대한 단점이 있어 이를 보완한 방법

 

[파티셔닝?]

Kafka의 메시지는 key/value 의 형태로 구성된다. 이 때 key값이 있을 수도 있고, null값으로 존재할 수도 있다.

그리고 producer는 batch.size, linger.ms 옵션을 통해 메시지를 바로 보내지 않고 배치에 일정기간 저장했다가 보내는 특성을 가진다.

 

Key값이 있는 경우

modular 연산을 통해 어떤 파티션에 메시지가 적재될지 결정한다.

 

Key값이 없는 경우

기존에는 RR 방식을 사용하여 메시지를 파티션에 배분했다. 

아래 그림은 1~9 까지의 메시지를 생성했을 때 RR정책에 의해 파티셔닝 된 모습 

출처: Confluent

RoundRobin 파티셔너의 단점

위 그림에서 메시지 전송을 위해 3개의 배치가 사용되었다. 하지만 네트워크 관점에서 여러개의 배치를 보내는 것보다 배치 수를 줄일 수록 트래픽을 줄일 수 있고, 브로커에 대한 부하도 줄일 수 있게 될 것이다. 

 

[Sticky Partitioner의 도입]

출처: Confluent

 

RR의 단점을 개선한 방식으로 메시지당 RR을 적용하는 대신 하나의 배치가 전송될 때 까지 동일한 파티션에 메시지를 할당한다.

배치를 보낸 후 Sticky Partitoner는 다음 배치에 사용할 파티션값을 변경한다. 

 

[실제로 해보기]

Apache Kafka 2.4 이상 버전부터 적용되었다고하고, 디폴트로 적용되는듯 하다. 

# 파티션10개짜리의 토픽 생성
 
$ kafka-topics --create --zookeeper localhost:2181 --topic jaeshim-20220511 --replication-factor 2 --partitions 10
 
# key값 없이 메시지 전송
$ sudo kafka-console-producer --broker-list localhost:9092 --topic jaeshim-20220511
>1
>2
>3
>4
>5
 
# 확인
$ sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list localhost:9092 --topic jaeshim-20220511
jaeshim-20220511:0:0
jaeshim-20220511:1:5
jaeshim-20220511:2:0
jaeshim-20220511:3:0
jaeshim-20220511:4:0
 
# 추가생성
>6
>7
>8
>9
>10
 
# 확인
$ sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list localhost:9092 --topic jaeshim-20220511
jaeshim-20220511:0:5
jaeshim-20220511:1:5
jaeshim-20220511:2:0
jaeshim-20220511:3:0
jaeshim-20220511:4:0

 

이상한 점

이상한 점

위의 테스트 과정을 보면 linger.ms, batch.size 값을 별도로 지정해주지 않았다. 

그렇다면 기본값이 쓰였을텐데, linger.ms = 0, batch.size = 16384이다. 

 

즉, linger.ms = 0 에 의해 1개의 메시지가 하나의 배치에 담겨서 전송됐어야 하지 않나라는 의문이 들게됨. 

관련하여 문서를 찾아봄.

https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

 

Apache Kafka Producer Improvements: Sticky Partitioner

Apache Kafka 2.4 introduces sticky partitioning, allowing Kafka producers to assign keyless messages to partitions for data processing at lower latency.

www.confluent.io

 

A batch is completed either when it reaches a certain size (batch.size) or after a period of time (linger.ms) is up. Both batch.size and linger.ms are configured in the producer. The default for batch.size is 16,384 bytes, and the default of linger.ms is 0 milliseconds. Once batch.size is reached or at least linger.ms time has passed, the system will send the batch as soon as it is able.
At first glance, it might seem like setting linger.ms to 0 would only lead to the production of single-record batches. However, this is usually not the case. Even when linger.ms is 0, the producer will group records into batches when they are produced to the same partition around the same time. This is because the system needs a bit of time to handle each request, and batches form when the system cannot attend to them all right away.

#번역기
배치는 특정 크기(batch.size)에 도달하거나 일정 시간(linger.ms)이 지나면 완료됩니다. batch.size와 linger.ms는 모두 생산자에서 구성됩니다. batch.size의 기본값은 16,384바이트이고 linger.ms의 기본값은 0밀리초입니다. batch.size에 도달하거나 최소한 linger.ms 시간이 지나면 시스템은 가능한 한 빨리 배치를 보냅니다.

언뜻 보면 linger.ms를 0으로 설정하면 단일 레코드 배치만 생성되는 것처럼 보일 수 있습니다. 그러나 이것은 일반적으로 그렇지 않습니다. linger.ms가 0인 경우에도 동일한 시간에 동일한 파티션에 생성될 때 생산자는 레코드를 일괄 처리로 그룹화합니다. 이는 시스템이 각 요청을 처리하는 데 약간의 시간이 필요하고 시스템이 즉시 처리할 수 없을 때 배치가 형성되기 때문입니다.

 

=> linger.ms 값이 0이라도 배치를 생성하는 과정에서 메시지가 쌓이게 되므로 동일한 파티션에 생성될 수 있다는 뜻으로 보인다.

 

어느정도 시차를 두고 테스트.

 

#토픽 생성
$ sudo kafka-topics --create --zookeeper localhost:2181 --topic jaeshim-20220518 --replication-factor 2 --partitions 5 
 
#10초 간격으로 메시지 생성
$ sudo kafka-console-producer --broker-list localhost:9092 --topic jaeshim-20220518
>1
 
#확인
$ sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list localhost:9092 --topic jaeshim-20220518
jaeshim-20220518:0:1
jaeshim-20220518:1:0
jaeshim-20220518:2:0
jaeshim-20220518:3:0
jaeshim-20220518:4:0
>2
 
#확인
$ sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list localhost:9092 --topic jaeshim-20220518
jaeshim-20220518:0:1
jaeshim-20220518:1:0
jaeshim-20220518:2:0
jaeshim-20220518:3:0
jaeshim-20220518:4:1
>3
 
#확인
$ sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list localhost:9092 --topic jaeshim-20220518
jaeshim-20220518:0:1
jaeshim-20220518:1:1
jaeshim-20220518:2:0
jaeshim-20220518:3:0
jaeshim-20220518:4:1
>4
 
#확인
$ sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list localhost:9092 --topic jaeshim-20220518
jaeshim-20220518:0:1
jaeshim-20220518:1:1
jaeshim-20220518:2:1
jaeshim-20220518:3:0
jaeshim-20220518:4:1
 
>5
#확인
$ sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list localhost:9092 --topic jaeshim-20220518
jaeshim-20220518:0:1
jaeshim-20220518:1:1
jaeshim-20220518:2:1
jaeshim-20220518:3:1
jaeshim-20220518:4:1

 

라운드로빈처럼 생성되었음.

 

[소스코드 파헤치기]

문서로 최대한 작성해보려 하였으나, 문서만으로는 소스를 따라가기 힘든 것 같다. IDE를 열고 추적하는 것을 추천..!

UniformStickyPartitioner

//UniformStickyPartitioner
public class UniformStickyPartitioner implements Partitioner {
 
    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
 
    public void configure(Map<String, ?> configs) {}
 
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return stickyPartitionCache.partition(topic, cluster);
    }
 
    public void close() {}
     
    /**
     * If a batch completed for the current sticky partition, change the sticky partition.
     * Alternately, if no sticky partition has been determined, set one.
     */
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}
 
//StickyPartitionCache
public class StickyPartitionCache {
    private final ConcurrentMap<String, Integer> indexCache;
    public StickyPartitionCache() {
        this.indexCache = new ConcurrentHashMap<>();
    }
 
    public int partition(String topic, Cluster cluster) {
        Integer part = indexCache.get(topic);
        if (part == null) {
            return nextPartition(topic, cluster, -1);
        }
        return part;
    }
 
    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // Check that the current sticky partition for the topic is either not set or that the partition that
        // triggered the new batch matches the sticky partition that needs to be changed.
        if (oldPart == null || oldPart == prevPartition) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = availablePartitions.get(0).partition();
            } else {
                while (newPart == null || newPart.equals(oldPart)) {
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }
 
}
  • UniformStickyPartitioner: StickyPartitioner의 구현.
  • StickyPartitonCache: 토픽별로 파티션 번호를 관리하기 위한 Map을 가지고 있다. 

 

KafkaProducer

메시지가 배치에 쌓이게되려면 doSend메서드를 거쳐야 하고, 이 메서드 내부에서 배치에 append 하는 코드가 구현되어 있다. 

이 과정에서 StickyPartitoner인 경우 어떻게 동작하는지 확인할 수 있다.

Producer.doSend에서 호출하는 메서드들을 순서대로 확인해보면서 로직을 파악해본다. 

 

Producer.doSend

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
            ...생략
            //몇 번 파티션에 넣어야 하는지 정보를 받아온다.
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
             
            ...생략
            //RecordAccumulator에 메시지를 추가하도록 요청하고 결과를 받는다.
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
 
            //배치를 새로 만들 필요가 있을 경우
            if (result.abortForNewBatch) {
                int prevPartition = partition;
                //onNewBatch를 호출하여 새로운 파티션번호가 StickyPartitionCache에 할당될 수 있도록 한다.
                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                //파티션 번호가 변경되었을 것이니 파티션 번호를 다시 받아온다.
                partition = partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (log.isTraceEnabled()) {
                    log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                }
                // producer callback will make sure to call both 'callback' and interceptor callback
                interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
                result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
            }
 
            ...생략
}

Producer.partition

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    if (partition != null) {
        return partition;
    }
 
    //파티셔너에 구현된 partition 메서드를 호출하여 파티션 번호를 받아온다.
    int customPartition = partitioner.partition(
            record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    if (customPartition < 0) {
        throw new IllegalArgumentException(String.format(
                "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
    }
    return customPartition;
}

여기서 partitioner.partition을 호출하여 해당되는 partitioner에서 구현한 partition을 호출하여 파티션번호를 받아온다.

UniformSticky의 partition을 다시 살펴보면 Map에 있는 파티션번호를 단순히 받아오기만 한다. (최초 메시지인 경우는 nextPartition을 호출하여 받아온다.)

 

UniformStikicyPartitioner.partiton

public int partition(String topic, Cluster cluster) {
    Integer part = indexCache.get(topic);
    if (part == null) {
        return nextPartition(topic, cluster, -1);
    }
    return part;
}

RecordAccumulator.append

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // check if we have an in-progress batch
            //진행중인 배치가 있다면 받아오고, 없다면 새로운 배치를 생성한다.
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                //배치에 새로운 레코드를 추가하려는 시도를 한다.
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                //배치에 메시지를 넣은 결과가 null이 아니면 배치에 추가 공간이 있어서 성공적으로 넣었다는 뜻이다. 잘 넣었으므로 그 결과를 그대로 리턴한다.
                if (appendResult != null)
                    return appendResult;
            }
 
            //여기까지 왔다는 것은 배치에 공간이 없다는 뜻이다. 새로운 배치가 필요하다는 결과를 리턴해준다.                
            // we don't have an in-progress record batch try to allocate a new batch
            if (abortOnNewBatch) {
                // Return a result that will cause another call to append.
                //생성자 설명 (future: null, batchIsFull: false, newBatchCreated: false, abortForNewBatch: true)
                return new RecordAppendResult(null, false, false, true);
            }
 
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
            buffer = free.allocate(size, maxTimeToBlock);
 
            // Update the current time in case the buffer allocation blocked above.
            nowMs = time.milliseconds();
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
 
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }
 
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                        callback, nowMs));
 
                dq.addLast(batch);
                incomplete.add(batch);
 
                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
            }
        } finally {
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }
    //새로운 메시지를 배치에 넣으려고 시도한다.
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque, long nowMs) {
        //큐의 마지막요소를 검색하고 큐가 비어있으면 null을 반환한다.
        ProducerBatch last = deque.peekLast();
        //null이 아니면 즉, 배치에 메시지가 이미 있는 경우
        if (last != null) {
            //배치에 메시지를 넣으려는 시도를 한다.
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
            //null이 리턴되었다는 것은 빈공간이 없다는 뜻이다.
            if (future == null)
                last.closeForRecordAppends();
            //배치에 빈공간이 있다면 AppendResult를 리턴해준다.         
            else
                //생성자 파라미터 (future, batchIsFull: dequeue.size()>1 || last.isFull(), newBatchCreated: false, abortForNewBatch: false)
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
        }
        return null;
    }

ProducerBatch.tryAppend

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    //배치에 빈 공간이 있는지 확인하고, 빈공간이 없다면 null을 리턴한다.
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {
        this.recordsBuilder.append(timestamp, key, value, headers);
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                recordsBuilder.compressionType(), key, value, headers));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                               timestamp,
                                                               key == null ? -1 : key.length,
                                                               value == null ? -1 : value.length,
                                                               Time.SYSTEM);
        // we have to keep every future returned to the users in case the batch needs to be
        // split to several new batches and resent.
        thunks.add(new Thunk(callback, future));
        this.recordCount++;
        return future;
    }
}

[성능테스트 자료]

https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

 

Apache Kafka Producer Improvements: Sticky Partitioner

Apache Kafka 2.4 introduces sticky partitioning, allowing Kafka producers to assign keyless messages to partitions for data processing at lower latency.

www.confluent.io

 

Round-Robin과 성능 비교. 

Duration of test 12 minutes
Number of brokers 3
Number of producers 1–3
Replication factor 3
Topics 4
linger.ms 0
acks all
keyGenerator {"type":"null"}
useConfiguredPartitioner true
No flushing on throttle (skipFlush) true
  • useConfiguredPartitioner: DefaultPartitioner (Sticky)로 지정됨
  • No flushing on throttle: Flush가 아닌 batch.size, linger.ms 옵션이 동작하게 함.

 

테스트1. P16, TPS 1000, 초당 1000개의 메시지. 

출처: Confluent

 

테스트2. 파티션 개수를 늘릴 때 대기시간

출처: Confluent

 

테스트3. linger.ms 옵션에 따른 성능 차이 

출처: Confluent

linger.ms 옵션을 통해 배치가 채워지기까지 기다렸을 때 큰 차이를 보였음.

 

 

테스트4. Key값이 있는 메시지와 없는 메시지가 혼용될 때

출처: Confluent

 

[결론]

  • Apache Kafka 2.4 이후 도입된 Sticky Partitoner는 Key가 없는 메시지를 전송할 때 기존 RR에 비해 지연시간 개선폭이 크다.
  • linger.ms 옵션과 섞어서 사용할 경우 더 큰 성능 증가를 기대할 수 있다.
  • Key가 있는 메시지를 처리할 때는 기존 Partitioner와 큰 성능차이를 기대하기는 힘들다. 

 

[참조]

https://www.confluent.io/blog/5-things-every-kafka-developer-should-know/#tip-2-new-sticky-partitioner

 

Top 5 Things Every Apache Kafka Developer Should Know

Here’s everything software engineers should know about Kafka - from architectural concepts and performance improvements, to command line tools and other expert tips.

www.confluent.io

https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

 

Apache Kafka Producer Improvements: Sticky Partitioner

Apache Kafka 2.4 introduces sticky partitioning, allowing Kafka producers to assign keyless messages to partitions for data processing at lower latency.

www.confluent.io

https://docs.confluent.io/platform/current/clients/producer.html

 

Kafka Producer | Confluent Documentation

Home Platform Build Applications for Kafka Kafka Clients Kafka Producer Confluent Platform includes the Java producer shipped with Apache Kafka®. This section gives a high-level overview of how the producer works and an introduction to the configuration s

docs.confluent.io

https://www.confluent.io/blog/apache-kafka-2-4-latest-version-updates/