Kafka/Spring Kafka

높은 버전의 kafka 라이브러리에서 압축된 메시지를 낮은 버전의 라이브러리에서 consuming할 수 없는 이슈

재심 2023. 6. 20. 14:26



    토픽을 구독하려고 했는데 아래같은 오류가 발생한다.

    해당 오류가 반복적으로 발생하면서 메시지 처리가 되지 않는다.


    해당 토픽은 Spring Kafka 2.6 버전을 사용하는 producer에서 발급되었고, compression.type = zstd을 사용하고 있다. 

    Consumer는 아래 환경에서 동작한다.

    • SpinrgBoot Version: 2.1.6
    • Spring Kafka Version: 2.2.7
    • Apache Kafka Client Version: 2.0.1


    [문제 현황]

    consumer-group에는 정상적으로 추가되지만 오류가 발생하면서 메시지를 계속해서 처리하지 못한다. 

    라이브러리 버전을 올릴 경우 정상적으로 동작한다. 



    Fetcher.java의 fetchRecords 메서드에보면 아래 구문이 있고, 여기서 걸려서 발생한 오류로 보인다. 

    if (corruptLastRecord)
                    throw new KafkaException("Received exception when fetching the next record from " + partition
                                                 + ". If needed, please seek past the record to "
                                                 + "continue consumption.", cachedRecordException);

    하위 메서드들을 계속 추적

    if (isFetched)
        return Collections.emptyList();
    List<ConsumerRecord<K, V>> records = new ArrayList<>();
    try {
        for (int i = 0; i < maxRecords; i++) {
            // Only move to next record if there was no exception in the last fetch. Otherwise we should
            // use the last record to do deserialization again.
            if (cachedRecordException == null) {
                corruptLastRecord = true;
                lastRecord = nextFetchedRecord();
                corruptLastRecord = false;
            if (lastRecord == null)
            records.add(parseRecord(partition, currentBatch, lastRecord));
            bytesRead += lastRecord.sizeInBytes();
            nextFetchOffset = lastRecord.offset() + 1;
            // In some cases, the deserialization may have thrown an exception and the retry may succeed,
            // we allow user to move forward in this case.
            cachedRecordException = null;

    이 중 nextFetchedRecord 메서드 내부에서 아슈가 발생하였다.

    열어보면 쭉쭉 넘어가다가  streamingIterator 내부에서 오류가 발생한다.

    그리고 이곳의 isCompression에서 익셉션이 발생했다. 

    public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
        if (isCompressed())
            return compressedIterator(bufferSupplier);
            return uncompressedIterator();
    public boolean isCompressed() {

        return compressionType() != CompressionType.NONE;


    여기서 CompressionType은 enum으로 되어있고, 이걸 열어보면 지원하는 압축타입이 나오게 된다. 

    NONE(0, "none", 1.0f) {
            public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
                return buffer;
            public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
                return new ByteBufferInputStream(buffer);
        GZIP(1, "gzip", 1.0f) {
            public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
                try {
                    // Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to
                    // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small
                    // number of bytes to write (potentially a single byte)
                    return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
                } catch (Exception e) {
                    throw new KafkaException(e);
            public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
                try {
                    // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
                    // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
                    // number of bytes (potentially a single byte)
                    return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
                            16 * 1024);
                } catch (Exception e) {
                    throw new KafkaException(e);
        SNAPPY(2, "snappy", 1.0f) {
            public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
                try {
                    return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
                } catch (Throwable e) {
                    throw new KafkaException(e);
            public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
                try {
                    return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
                } catch (Throwable e) {
                    throw new KafkaException(e);
        LZ4(3, "lz4", 1.0f) {
            public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
                try {
                    return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0);
                } catch (Throwable e) {
                    throw new KafkaException(e);
            public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
                try {
                    return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
                                                        messageVersion == RecordBatch.MAGIC_VALUE_V0);
                } catch (Throwable e) {
                    throw new KafkaException(e);


    위의 enum에 보면 지원하는 압축 타입이 lz4까지밖에 없다. 

    하지만 문제가 되었던 토픽은 zstd로 압축되어 발급되었는데, 이 메시지를 해독하려다보니 구버전 라이브러리에서는 불가능하여 예외가 발생하여 오류가 난 것이다.



    zstd는 apache kafka 2.1부터 지원하기 시작하였고, 이에 맞는 springboot 버전과 spring-kafka 버전으로 버전업 한다. 


    KIP-110: https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression

    Spring Kafka Version Compatibility: https://spring.io/projects/spring-kafka (springboot 2.2 이상 버전이어야 될 듯 하다)


    참고. kafka-client가 브로커 0.10버전 이상에서는 정상적으로 동작하여 호환성에는 이슈가 없지만 해당 케이스는 메시지 포맷에서 발생한 것으로 해당 내용과는 관련이 없다.