Kafka/Spring Kafka

높은 버전의 kafka 라이브러리에서 압축된 메시지를 낮은 버전의 라이브러리에서 consuming할 수 없는 이슈

재심 2023. 6. 20. 14:26

목차

    [개요]

    토픽을 구독하려고 했는데 아래같은 오류가 발생한다.

    해당 오류가 반복적으로 발생하면서 메시지 처리가 되지 않는다.

     

    해당 토픽은 Spring Kafka 2.6 버전을 사용하는 producer에서 발급되었고, compression.type = zstd을 사용하고 있다. 

    Consumer는 아래 환경에서 동작한다.

    • SpinrgBoot Version: 2.1.6
    • Spring Kafka Version: 2.2.7
    • Apache Kafka Client Version: 2.0.1

     

    [문제 현황]

    consumer-group에는 정상적으로 추가되지만 오류가 발생하면서 메시지를 계속해서 처리하지 못한다. 

    라이브러리 버전을 올릴 경우 정상적으로 동작한다. 

     

    [디버깅]

    Fetcher.java의 fetchRecords 메서드에보면 아래 구문이 있고, 여기서 걸려서 발생한 오류로 보인다. 

    if (corruptLastRecord)
                    throw new KafkaException("Received exception when fetching the next record from " + partition
                                                 + ". If needed, please seek past the record to "
                                                 + "continue consumption.", cachedRecordException);

    하위 메서드들을 계속 추적

    ...생략
     
    if (isFetched)
        return Collections.emptyList();
     
    List<ConsumerRecord<K, V>> records = new ArrayList<>();
    try {
        for (int i = 0; i < maxRecords; i++) {
            // Only move to next record if there was no exception in the last fetch. Otherwise we should
            // use the last record to do deserialization again.
            if (cachedRecordException == null) {
                corruptLastRecord = true;
                lastRecord = nextFetchedRecord();
                corruptLastRecord = false;
            }
            if (lastRecord == null)
                break;
            records.add(parseRecord(partition, currentBatch, lastRecord));
            recordsRead++;
            bytesRead += lastRecord.sizeInBytes();
            nextFetchOffset = lastRecord.offset() + 1;
            // In some cases, the deserialization may have thrown an exception and the retry may succeed,
            // we allow user to move forward in this case.
            cachedRecordException = null;
        }
    ...생략

    이 중 nextFetchedRecord 메서드 내부에서 아슈가 발생하였다.

    열어보면 쭉쭉 넘어가다가  streamingIterator 내부에서 오류가 발생한다.

    if (corruptLastRecord)
                    throw new KafkaException("Received exception when fetching the next record from " + partition
                                                 + ". If needed, please seek past the record to "
                                                 + "continue consumption.", cachedRecordException);
     
                if (isFetched)
                    return Collections.emptyList();
     
                List<ConsumerRecord<K, V>> records = new ArrayList<>();
                try {
                    for (int i = 0; i < maxRecords; i++) {
                        // Only move to next record if there was no exception in the last fetch. Otherwise we should
                        // use the last record to do deserialization again.
                        if (cachedRecordException == null) {
                            corruptLastRecord = true;
                            lastRecord = nextFetchedRecord();private Record nextFetchedRecord() {
                while (true) {
                    if (records == null || !records.hasNext()) {
                        maybeCloseRecordStream();
     
                        if (!batches.hasNext()) {
                            // Message format v2 preserves the last offset in a batch even if the last record is removed
                            // through compaction. By using the next offset computed from the last offset in the batch,
                            // we ensure that the offset of the next fetch will point to the next batch, which avoids
                            // unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck
                            // fetching the same batch repeatedly).
                            if (currentBatch != null)
                                nextFetchOffset = currentBatch.nextOffset();
                            drain();
                            return null;
                        }
     
                        currentBatch = batches.next();
                        maybeEnsureValid(currentBatch);
     
                        if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
                            // remove from the aborted transaction queue all aborted transactions which have begun
                            // before the current batch's last offset and add the associated producerIds to the
                            // aborted producer set
                            consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
     
                            long producerId = currentBatch.producerId();
                            if (containsAbortMarker(currentBatch)) {
                                abortedProducerIds.remove(producerId);
                            } else if (isBatchAborted(currentBatch)) {
                                log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
                                              "offsets {} to {}",
                                          partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
                                nextFetchOffset = currentBatch.nextOffset();
                                continue;
                            }
                        }
     
                        records = currentBatch.streamingIterator(decompressionBufferSupplier);
                    } else {
                        Record record = records.next();
                        // skip any records out of range
                            corruptLastRecord = false;
                        }
                        if (lastRecord == null)
                            break;
                        records.add(parseRecord(partition, currentBatch, lastRecord));
                        recordsRead++;
                        bytesRead += lastRecord.sizeInBytes();
                        nextFetchOffset = lastRecord.offset() + 1;
                        // In some cases, the deserialization may have thrown an exception and the retry may succeed,
                        // we allow user to move forward in this case.
                        cachedRecordException = null;
                    }
            ...생략

    그리고 이곳의 isCompression에서 익셉션이 발생했다. 

    @Override
    public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
        if (isCompressed())
            return compressedIterator(bufferSupplier);
        else
            return uncompressedIterator();
    }
    @Override
    public boolean isCompressed() {

        return compressionType() != CompressionType.NONE;

    }

    여기서 CompressionType은 enum으로 되어있고, 이걸 열어보면 지원하는 압축타입이 나오게 된다. 

    NONE(0, "none", 1.0f) {
            @Override
            public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
                return buffer;
            }
     
            @Override
            public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
                return new ByteBufferInputStream(buffer);
            }
        },
     
        GZIP(1, "gzip", 1.0f) {
            @Override
            public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
                try {
                    // Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to
                    // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small
                    // number of bytes to write (potentially a single byte)
                    return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
                } catch (Exception e) {
                    throw new KafkaException(e);
                }
            }
     
            @Override
            public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
                try {
                    // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
                    // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
                    // number of bytes (potentially a single byte)
                    return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
                            16 * 1024);
                } catch (Exception e) {
                    throw new KafkaException(e);
                }
            }
        },
     
        SNAPPY(2, "snappy", 1.0f) {
            @Override
            public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
                try {
                    return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
                } catch (Throwable e) {
                    throw new KafkaException(e);
                }
            }
     
            @Override
            public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
                try {
                    return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
                } catch (Throwable e) {
                    throw new KafkaException(e);
                }
            }
        },
     
        LZ4(3, "lz4", 1.0f) {
            @Override
            public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
                try {
                    return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0);
                } catch (Throwable e) {
                    throw new KafkaException(e);
                }
            }
     
            @Override
            public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
                try {
                    return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
                                                        messageVersion == RecordBatch.MAGIC_VALUE_V0);
                } catch (Throwable e) {
                    throw new KafkaException(e);
                }
            }
        };

    [문제점]

    위의 enum에 보면 지원하는 압축 타입이 lz4까지밖에 없다. 

    하지만 문제가 되었던 토픽은 zstd로 압축되어 발급되었는데, 이 메시지를 해독하려다보니 구버전 라이브러리에서는 불가능하여 예외가 발생하여 오류가 난 것이다.

     

    [해결방법]

    zstd는 apache kafka 2.1부터 지원하기 시작하였고, 이에 맞는 springboot 버전과 spring-kafka 버전으로 버전업 한다. 

     

    KIP-110: https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression

    Spring Kafka Version Compatibility: https://spring.io/projects/spring-kafka (springboot 2.2 이상 버전이어야 될 듯 하다)

     

    참고. kafka-client가 브로커 0.10버전 이상에서는 정상적으로 동작하여 호환성에는 이슈가 없지만 해당 케이스는 메시지 포맷에서 발생한 것으로 해당 내용과는 관련이 없다.

    'Kafka > Spring Kafka' 카테고리의 다른 글

    Spring Kafka - Producer  (0) 2023.05.29
    Spring Kafka  (0) 2023.05.29