높은 버전의 kafka 라이브러리에서 압축된 메시지를 낮은 버전의 라이브러리에서 consuming할 수 없는 이슈
목차
[개요]
토픽을 구독하려고 했는데 아래같은 오류가 발생한다.
해당 오류가 반복적으로 발생하면서 메시지 처리가 되지 않는다.
해당 토픽은 Spring Kafka 2.6 버전을 사용하는 producer에서 발급되었고, compression.type = zstd을 사용하고 있다.
Consumer는 아래 환경에서 동작한다.
- SpinrgBoot Version: 2.1.6
- Spring Kafka Version: 2.2.7
- Apache Kafka Client Version: 2.0.1
[문제 현황]
consumer-group에는 정상적으로 추가되지만 오류가 발생하면서 메시지를 계속해서 처리하지 못한다.
라이브러리 버전을 올릴 경우 정상적으로 동작한다.
[디버깅]
Fetcher.java의 fetchRecords 메서드에보면 아래 구문이 있고, 여기서 걸려서 발생한 오류로 보인다.
if (corruptLastRecord)
throw new KafkaException("Received exception when fetching the next record from " + partition
+ ". If needed, please seek past the record to "
+ "continue consumption.", cachedRecordException);
하위 메서드들을 계속 추적
...생략
if (isFetched)
return Collections.emptyList();
List<ConsumerRecord<K, V>> records = new ArrayList<>();
try {
for (int i = 0; i < maxRecords; i++) {
// Only move to next record if there was no exception in the last fetch. Otherwise we should
// use the last record to do deserialization again.
if (cachedRecordException == null) {
corruptLastRecord = true;
lastRecord = nextFetchedRecord();
corruptLastRecord = false;
}
if (lastRecord == null)
break;
records.add(parseRecord(partition, currentBatch, lastRecord));
recordsRead++;
bytesRead += lastRecord.sizeInBytes();
nextFetchOffset = lastRecord.offset() + 1;
// In some cases, the deserialization may have thrown an exception and the retry may succeed,
// we allow user to move forward in this case.
cachedRecordException = null;
}
...생략
이 중 nextFetchedRecord 메서드 내부에서 아슈가 발생하였다.
열어보면 쭉쭉 넘어가다가 streamingIterator 내부에서 오류가 발생한다.
if (corruptLastRecord)
throw new KafkaException("Received exception when fetching the next record from " + partition
+ ". If needed, please seek past the record to "
+ "continue consumption.", cachedRecordException);
if (isFetched)
return Collections.emptyList();
List<ConsumerRecord<K, V>> records = new ArrayList<>();
try {
for (int i = 0; i < maxRecords; i++) {
// Only move to next record if there was no exception in the last fetch. Otherwise we should
// use the last record to do deserialization again.
if (cachedRecordException == null) {
corruptLastRecord = true;
lastRecord = nextFetchedRecord();private Record nextFetchedRecord() {
while (true) {
if (records == null || !records.hasNext()) {
maybeCloseRecordStream();
if (!batches.hasNext()) {
// Message format v2 preserves the last offset in a batch even if the last record is removed
// through compaction. By using the next offset computed from the last offset in the batch,
// we ensure that the offset of the next fetch will point to the next batch, which avoids
// unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck
// fetching the same batch repeatedly).
if (currentBatch != null)
nextFetchOffset = currentBatch.nextOffset();
drain();
return null;
}
currentBatch = batches.next();
maybeEnsureValid(currentBatch);
if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
// remove from the aborted transaction queue all aborted transactions which have begun
// before the current batch's last offset and add the associated producerIds to the
// aborted producer set
consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
long producerId = currentBatch.producerId();
if (containsAbortMarker(currentBatch)) {
abortedProducerIds.remove(producerId);
} else if (isBatchAborted(currentBatch)) {
log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
"offsets {} to {}",
partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
nextFetchOffset = currentBatch.nextOffset();
continue;
}
}
records = currentBatch.streamingIterator(decompressionBufferSupplier);
} else {
Record record = records.next();
// skip any records out of range
corruptLastRecord = false;
}
if (lastRecord == null)
break;
records.add(parseRecord(partition, currentBatch, lastRecord));
recordsRead++;
bytesRead += lastRecord.sizeInBytes();
nextFetchOffset = lastRecord.offset() + 1;
// In some cases, the deserialization may have thrown an exception and the retry may succeed,
// we allow user to move forward in this case.
cachedRecordException = null;
}
...생략
그리고 이곳의 isCompression에서 익셉션이 발생했다.
@Override
public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
if (isCompressed())
return compressedIterator(bufferSupplier);
else
return uncompressedIterator();
}
@Override
public boolean isCompressed() {
return compressionType() != CompressionType.NONE;
}
여기서 CompressionType은 enum으로 되어있고, 이걸 열어보면 지원하는 압축타입이 나오게 된다.
NONE(0, "none", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
return buffer;
}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
return new ByteBufferInputStream(buffer);
}
},
GZIP(1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
// Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small
// number of bytes to write (potentially a single byte)
return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}
},
SNAPPY(2, "snappy", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
} catch (Throwable e) {
throw new KafkaException(e);
}
}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
} catch (Throwable e) {
throw new KafkaException(e);
}
}
},
LZ4(3, "lz4", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Throwable e) {
throw new KafkaException(e);
}
}
@Override
public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Throwable e) {
throw new KafkaException(e);
}
}
};
[문제점]
위의 enum에 보면 지원하는 압축 타입이 lz4까지밖에 없다.
하지만 문제가 되었던 토픽은 zstd로 압축되어 발급되었는데, 이 메시지를 해독하려다보니 구버전 라이브러리에서는 불가능하여 예외가 발생하여 오류가 난 것이다.
[해결방법]
zstd는 apache kafka 2.1부터 지원하기 시작하였고, 이에 맞는 springboot 버전과 spring-kafka 버전으로 버전업 한다.
KIP-110: https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression
Spring Kafka Version Compatibility: https://spring.io/projects/spring-kafka (springboot 2.2 이상 버전이어야 될 듯 하다)
참고. kafka-client가 브로커 0.10버전 이상에서는 정상적으로 동작하여 호환성에는 이슈가 없지만 해당 케이스는 메시지 포맷에서 발생한 것으로 해당 내용과는 관련이 없다.