파티션이 3개가 있다고 가정하자. 이 때 파티션 배치는 아래와 같고, linger.ms=0, batch.size=16384를 사용한다고 가정한다.
메시지 "A"가 파티션0의 배치에 추가되었다.
linger.ms=0이라서 즉시 전송하려고 하였으나 브로커0의 네트워크 상황이 좋지않아 바로 보내지 못하고 좀 더 대기하게 되었다.
메시지 "B"가 유입되었다.
OnNewBatch가 호출된 상황이 아니므로 파티션0의 배치에 추가되었다.
메시지 "C"가 유입되었다.
마찬가지로 OnNewBatch가 호출된 상황이 아니므로 파티션0의 배치에 추가되었다.
1초가 지난 후 브로커0이 해당 배치를 받아주었다.
파티션0에는 3개의 메시지가 전송되었다.
메시지 "D"가 유입되었다.
이번에는 OnNewBatch가 호출된 상황 (배치가 방금 전송된 경우) 이므로 파티션1의 배치에 메시지 "D"가 추가되었다.
linger.ms=0이므로 즉시 전송시도를 하였고, 브로커1은 상태가 좋아서 바로 받아주었다.
파티션1에는 1개의 메시지가 유입되었다.
메시지 "E"가 유입되었다.
마찬가지로 OnNewBatch가 호출되었고, 파티션2의 배치에 쌓이며, 브로커2의 상태도 좋아서 바로 전송되었다.
파티션2에도 1개의 메시지가 유입되었다.
메시지 "F"가 유입되었다.
OnNewBatch가 호출되었고, 이번에는 파티션0의 배치에 쌓였다.
linger.ms=0 이지만 브로커0이 여전히 느려서 바로 전송하지 못하였다.
메시지 "G"가 유입되었다.
OnNewBatch 호출상황이 아니므로 파티션0의 배치에 쌓였다.
브로커0이 배치를 수신하였다.
=>결과적으로 가장 느린 브로커0 에는 메시지가 5개있고, 빠른 브로커인 1,2는 메시지가 각각 1개씩 밖에 없는 상황이 연출되었다.
[Version Compatibility]
Custom Partitioner
기존의 Partitioner 인터페이스를 구현하는 방식으로는 3.3+ 에서 개선된 sticky + adaptive partitioning 로직을 반영하는 것은 불가능
BuiltInPartitioner를 customizing 하는 것도 거의 불가능하다고 판단(RecordAccumulator 를 수정하더라도 배포하여 사용할 수 없다고 판단)
Confluent Version
Apache Kafka Client 3.3에 대응되는 Confluent Kafka 7.3 버전을 사용하더라도 3.3 이하의 Apache Kafka client를 사용하는 것에 문제가 없음
추후 deprecated 인 Partitioner는 삭제될 것 같다는 내용 전달 받음
[Configuration 변경사항 (기본값, custom partitioner를 지정한 경우엔 아래 설정은 무시됨)]
UniformStickyPartitioner를 대체하는 기본 구성
partitioner.class = null
paritioner.ignore.keys = true
partitioner.adaptive.partitioning.enable = true
→ producer가 broker 성능에 adaptive하게 동작. 더 빠른 broker에서 호스팅되는 paritition에 더 많은 메시지 전송. false인 경우, partition을 임의로 지정
partitioner.availability.timeout.ms= 0
→ 파티션이 가용한지 판단하는 시간.
→partitioner.adaptive.partitioning.enable=true &&partitioner.availability.timeout.ms> 0 인 경우,partitioner.availability.timeout.ms동안 어떤 partition이 unavailable하면 사용할 수 없는 partition이 됨(리더 broker의 latency로 계산)
→ partitioner.adaptive.partitioning.enable = false이면 이 설정은 무시됨
[변경사항]
key가 없는 레코드에 대한 기본적인 partitioning임.
partitioner.ignore.keys = true이면, key가 있는 레코드도 균등하게 분배됨.
UniformSticky 방식 리뉴얼
배치를 생성할 때 마다 파티션을 전환하는 대신(기존 방식:onNewBatch()), batch.size 만큼 파티션에 생성될 때마다 파티션을 전환한다.
→linger.ms가 작을 때는 batch.size에 못미치는 batch도 즉시 전송하고 partition switch를 했는데, batch.size만큼 생성되고나서 파티션 전환
→ sticky하면서도 레코드 불균형을 방지
strictly uniform partitioning의 잠재적 단점
어떤 broker가 throughput 감당을 못하면 records가 Accumulator에 쌓이고
결국 buffer pool memory가 고갈됨
production 속도가 가장 느린 broker의 capacity로 조정됨
Adaptive Partitioning
RecordAccumulator에서 전송 대기중인 배치의 대기열 크기(broker load)를 알 수 있으므로 partitioner가 지정되지 않으면 RecordAccumulator에서 partitioning이 수행됨
대기열 크기로 파티션 선정
다음 partition 선정에 대기열 크기를 고려함(파티션 선정 확률은 대기열 크기의 역수에 비례함 : 대기열이 작은 파티션이 선정될 확률이 큼)
대기열 이란 - ready 상태의 partition에 적재된 batch의 개수가 적을 수 록 대기열이 짧음
partitioner.availability.timeout.ms 로 파티션 선정
전송할 배치가 있는 파티션이 위 값내에 준비가 안되면 unavailable 파티션으로 표시되고
전송 가능할 때까지 선정이 안됨
Partition 인터페이스를 구현한 기존 partitioner는 queue size나 broker readiness를 모름
DefaultPartitioner, UniformStickyPartitioner를 명시한 경우 사용 중단 경고가 표시되지만 동작에는 이상 없음
업데이트된 동작을 가져오려면 그에 따라 구성을 업데이트해야 한다(UniformStickyPartitioner를 대체하려는 경우,partitioner.class 설정을 제거하고 선택적으로 partitioner.ignore.keys 를 'true'로 설정 )
Partitioner.onNewBatch()는 더 이상 사용되지 않음
[Partitioner 수준에서 구현 시도]
Partitioner 수준에서 BuiltInPartitioner 구현을 시도하였으나 아래 사유로 Accumulator 수준에서 구현으로 변경하였다고 한다.
partitioner 단에서는 partition에 대기중인 batch size를 알지 못함
해당 파티션에 produce된 recordSize를 get하는 getRecordSize(), partition load에 따라서 next partition을 지정하는 nextPartition() callback을 구상(producer에서 다시 호출하도록)
producer와 partitioner를 분리하려는 시도였으나, 인터페이스의 복잡성 증가 + 이해하기 어려움
Partition 인터페이스의 캡슐화가 깨짐을 알았고 partitioning 논리의 일부를 producer에게도 위임
public interface Partitioner extends Configurable, Closeable {
/**
* Callbacks from the producer
*/
interface Callbacks {
/**
* Get record size in bytes. The keyBytes and valueBytes may present skewed view of the number
* of bytes produced to the partition. In addition, the callback takes into account the following:
* 1. Headers
* 2. Record overhead
* 3. Batch overhead
* 4. Compression
*
* @param partition The partition we need the record size for
* @return The record size in bytes
*/
int getRecordSize(int partition);
/**
* Calculate the partition number. The producer keeps stats on partition load
* and can use it as a signal for picking up the next partition.
*
* @return The partition number, or -1 if not implemented or not known
*/
default int nextPartition() {
return -1;
}
}
// ... <skip> ...
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param callbacks The record size and partition callbacks (see {@link Partitioner#Callbacks})
* @param cluster The current cluster metadata
*/
default int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Callbacks callbacks, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster);
}
// ... <skip> ...
}
[코드 분석]
Partition 지정하지 않으면 partitioner=null → RecordAccumulator내부에서 생성하는 BuiltInPartitioner 인스턴스가 사용됨
sticky partition이 있다면(topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster)) 해당 partition에 대기중인 batch에 append 시도
public RecordAppendResult append(String topic, int partition, long timestamp, byte[] key, byte[] value, Header[] headers, AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs, Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k - > new TopicInfo(logContext, k, batchSize)); // We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try { // Loop to retry in case we encounter partitioner's race conditions.
while (true) { // If the message doesn't have any partition affinity, so we pick a partition based on the broker
// availability and performance. Note, that here we peek current partition before we hold the
// deque lock, so we'll need to make sure that it's not changed while we were waiting for the
// deque lock.
final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
final int effectivePartition;
if (partition == RecordMetadata.UNKNOWN_PARTITION) {
partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
effectivePartition = partitionInfo.partition();
} else {
partitionInfo = null;
effectivePartition = partition;
} // Now that we know the effective partition, let the caller know.
setPartition(callbacks, effectivePartition); // check if we have an in-progress batch
Deque < ProducerBatch > dq = topicInfo.batches.computeIfAbsent(effectivePartition, k - > new ArrayDeque < > ());
synchronized(dq) { // After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster)) continue;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) { // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
} // we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) { // Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true, 0);
}
if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); // This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock); // Update the current time in case the buffer allocation blocked above.
// NOTE: getting time may be expensive, so calling it under a lock
// should be avoided.
nowMs = time.milliseconds();
}
synchronized(dq) { // After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster)) continue;
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs); // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated) buffer = null; // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
} finally {
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
→ batch가 꽉 찼다면 builtInPartitioner.updatePartitionInfo() 에서 partition을 바꿀지 결정하는데
BuiltInPartitioner에서 StickyPartitionInfo라는 인스턴스에서 현재 sticky partition 정보를 가지고 있음
partition 번호와 partition에 produce된 messageByte
void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster, boolean enableSwitch) { // partitionInfo may be null if the caller didn't use built-in partitioner.
if (partitionInfo == null) return;
assert partitionInfo == stickyPartitionInfo.get();
int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes); // We're trying to switch partition once we produce stickyBatchSize bytes to a partition
if (producedBytes >= stickyBatchSize * 2) {
log.trace("Produced {} bytes, exceeding twice the batch size of {} bytes, with switching set to {}", producedBytes, stickyBatchSize, enableSwitch);
}
if (producedBytes >= stickyBatchSize && enableSwitch || producedBytes >= stickyBatchSize * 2) { // We've produced enough to this partition, switch to next.
StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(nextPartition(cluster));
stickyPartitionInfo.set(newPartitionInfo);
}
}
PartitionLoadStats
해당 토픽내의 partition에 대기중인 batch 정보를 가지고 있음(대기열)
대기중인 batch가 적을 수 록 partition이 nextPartition이 될 확률이 가장 크게하는 로직
private int nextPartition(Cluster cluster) {
int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); // Cache volatile variable in local variable.
PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
int partition;
if (partitionLoadStats == null) { // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
// partition based on uniform distribution.
List < PartitionInfo > availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
partition = availablePartitions.get(random % availablePartitions.size()).partition();
} else { // We don't have available partitions, just pick one among all partitions.
List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);
partition = random % partitions.size();
}
} else { // Calculate next partition based on load distribution.
// Note that partitions without leader are excluded from the partitionLoadStats.
assert partitionLoadStats.length > 0;
int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1]; // By construction, the cumulative frequency table is sorted, so we can use binary
// search to find the desired index.
int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom); // binarySearch results the index of the found element, or -(insertion_point) - 1
// (where insertion_point is the index of the first element greater than the key).
// We need to get the index of the first value that is strictly greater, which
// would be the insertion point, except if we found the element that's equal to
// the searched value (in this case we need to get next). For example, if we have
// 4 5 8
// and we're looking for 3, then we'd get the insertion_point = 0, and the function
// would return -0 - 1 = -1, by adding 1 we'd get 0. If we're looking for 4, we'd
// get 0, and we need the next one, so adding 1 works here as well.
int partitionIndex = Math.abs(searchResult + 1);
assert partitionIndex < partitionLoadStats.length;
partition = partitionLoadStats.partitionIds[partitionIndex];
}
log.trace("Switching to partition {} in topic {}", partition, topic);
return partition;
}
batch를 전송할 다음 partition 번호를 계산하는 logic
현재 batch size :각 파티션에 에 담겨있는 batch size(파티션 위치는 랜덤이고, 이 배열에 대응되는 partition 번호를 관리하는 배열이 따로 있음)
0, 3, 1, 6, 2
찾아야 하는 것 : 다음 데이터가 어느 partition의 batch 에 들어가야할지 찾아야함 .
현재 batch size 중 가장 큰 값은 6 이고, 파티션들의 batch size를 고르게 하려면 모두 최소 7 의 batch size를 가지면 됨 (Max value (6) + 들어갈 위치 (1) )
가용가능한 batch size: (파티션들의 batch size를 고르게 하는 최소 batch size - 각 파티션의 batch size )
7, 4, 6, 1, 5 ( 데이터가 들어갈 수 있는 batch 크기 )
위 배열을 왼쪽부터 누적하여 더해 하나의 배열로 만들면(batch 크기 개념은 유지하되, 정렬된 상태를 만들기 위함) idx는 아래와 같이 됨 ( 0 + 7 / 7 + 4 / 11 + 6 ... 이전까지의 Idx + 현재 batch size )
7, 11, 17, 18, 23
Random % 23 하였을때, 이번 batch가 cumulativeFrequencyTables의 첫번째 파티션 에 들어갈 확률 = 7/23 ( 0 ~ 6 idx) 두번째 파티션에 들어갈 확률 = 4/23 ( 7 ~ 10 Idx)
따라서 위 연산을 이용하여 batch를 가장 적게 가진 파티션에 다음 batch가 들어갈 확률이 높다.
→ 더 많은 레코드를 생산함에 따라서 uniform한 batch가 생성된다
private int nextPartition(Cluster cluster) {
int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); // Cache volatile variable in local variable.
PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
int partition;
if (partitionLoadStats == null) { // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
// partition based on uniform distribution.
List < PartitionInfo > availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
partition = availablePartitions.get(random % availablePartitions.size()).partition();
} else { // We don't have available partitions, just pick one among all partitions.
List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);
partition = random % partitions.size();
}
} else { // Calculate next partition based on load distribution.
// Note that partitions without leader are excluded from the partitionLoadStats.
assert partitionLoadStats.length > 0;
int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1]; // By construction, the cumulative frequency table is sorted, so we can use binary
// search to find the desired index.
int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom); // binarySearch results the index of the found element, or -(insertion_point) - 1
// (where insertion_point is the index of the first element greater than the key).
// We need to get the index of the first value that is strictly greater, which
// would be the insertion point, except if we found the element that's equal to
// the searched value (in this case we need to get next). For example, if we have
// 4 5 8
// and we're looking for 3, then we'd get the insertion_point = 0, and the function
// would return -0 - 1 = -1, by adding 1 we'd get 0. If we're looking for 4, we'd
// get 0, and we need the next one, so adding 1 works here as well.
int partitionIndex = Math.abs(searchResult + 1);
assert partitionIndex < partitionLoadStats.length;
partition = partitionLoadStats.partitionIds[partitionIndex];
}
log.trace("Switching to partition {} in topic {}", partition, topic);
return partition;
}
위의 과정까지가 메시지를 전송할 partition 선정, batch 생성 과정이고
위 과정이 끝나고 실제로 전송하는 Sender 스레드의 run()이 호출하는 메서드에서 ready 상태인 partition을 구하는 단계가 있음
: NodeLatency(Broker latency) > partitioner.availability.timeout.ms 인 node에 속한 partition에 대기중인 batch는 전송에서 제외하는 로직이 있음