Kafka/Producer

Log Compaction

재심 2023. 3. 5. 11:56

목차

    [Topic - cleanup.policy]

    Kafka는 데이터 리텐션 정책이 2가지 있다.

    • Delete: retention.ms를 지나거나 retention.bytes를 초과하는 경우 오래된 세그먼트를 삭제한다. 
    • Compact: Key별로 가장 최근의 value값만 남겨두고 저장한다. 

    [Log Compaction?]

    Confluent

     

    Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.It addresses use cases and scenarios such as restoring state after application crashes or system failure, or reloading caches after application restarts during operational maintenance.Let's dive into these use cases in more detail and then describe how compaction works.

    #번역기로그
    압축은 Kafka가 단일 토픽 파티션에 대한 데이터 로그 내에서 각 메시지 키에 대해 최소한 마지막으로 알려진 값을 항상 유지하도록 합니다.응용 프로그램 충돌 또는 시스템 오류 후 상태 복원 또는 운영 유지 관리 중 응용 프로그램이 다시 시작된 후 캐시 다시 로드와 같은 사용 사례 및 시나리오를 다룹니다.이러한 사용 사례를 더 자세히 살펴보고 압축이 어떻게 작동하는지 설명하겠습니다.

     

    사용예시1: 사용자 ID를 Key값으로하여 메일계정이 업데이트 되는 상황 

    123 => bill@microsoft.com
            .
            .
            .
    123 => bill@gatesfoundation.org
            .
            .
            .
    123 => bill@gmail.com

    Log Compaction은 키 값별로 최소한 마지막 업데이트 값을 유지하도록 보장한다. 이를 통해서 시스템은 변경사항에 대한 모든 로그를 보존하지 않고도 이전에 사용했던 가장 최신값을 얻어올 수 있게 된다.

     

    사용예시2: 제품의 가장 최신 가격 

    Confluent

     

    p3 Key에 대한 값이 Compaction 되면서 가장 최신 value인 $11만 유지.

    Confluent

     

    Kafka Broker 내부의 스레드가 p5, p6을 사용하여 이전 데이터 값을 제거하여 최신 형상만 남기고 제거한다.

     

    [Log Compaction 구조 파악해보기]

    Log Compaction은 최소한 마지막 업데이트를 보장한다고 했다. 이 말이 무슨말일까? 

    Compaction 관련 옵션

    • segment.ms: 데이터를 삭제하거나 압축할 수 있도록, 파일이 가득차지 않은 경우에도 segment 파일을 롤링하도록 하는 기간
    • min.cleanable.dirty.ratio (default: 0.5): 로그를 정리대상으로 올릴 수 있는 전체로그 대비 dirty로그의 최소비율. 50%이면 압축되지 않은 로그를 50% 유지한다는 뜻이다. 
    • log.cleaner.max.compaction.lag.ms (default: infinite): 메시지가 compaction 대상에 오르지않고 남아있는 최대시간.
    • log.cleaner.min.compaction.lag.ms (default: 0): 메시지를 압축하지 않고 로그에 남겨둘 최소시간. 

     

    min.cleanable.dirty.ratio는 아래 2가지 조건으로 동작한다.

    • log.cleaner.min.compaction.lag.ms를 지났고 dirty(압축하지 않은) 레코드가 있으면서 dirty 비율 임계치에 충족하는 경우 
    • log.cleaner.max.compaction.lag.ms가 지났는데 로그에 dirty(압축하지 않은) 레코드가 있는 경우

    확인

    간단히 아래 메시지를 생성하고, 소비해본다.

    #토픽 생성
    $ kafka-topics --create --zookeeper localhost:2181 --topic jaeshim-test-compact --replication-factor 2 --partitions 1 --config "cleanup.policy=compact" \
    --config "delete.retention.ms=100"  --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01"
     
    # produce
    $ kafka-console-producer --broker-list localhost:9092 --topic jaeshim-test-compact --property parse.key=true --property key.separator=:
    >p3:10$
    >p5:7$
    >p3:11$
    >p6:25$
    >p6:12$
    >p5:14$
    >p5:17$
     
    #기대
    p3:11$
    p6:12$
    p5:17$
     
    # consume
    $ kafka-console-consumer --bootstrap-server localhost:9092 --topic jaeshim-test-compact --property print.key=true --property key.separator=: --from-beginning
     
    p3:11$
    p6:12$
    p5:14$
    p5:17$

    p5에서 17$만 남는걸 기대했는데, 14$도 살아있다. 기대한 거랑 다른데..?

     

    Segments

    카프카 토픽에서 파티션은 실제로 세그먼트 단위로 저장이 된다. 

    세그먼트 파일은 .log 라는 확장자 명을 갖는다. 

    Confluent

     

    메시지들이 위와 같이 저장될 수 있다. 

    그리고 마지막 세그먼트 파일을 active segment라고 한다. 

     

    새로운 세그먼트가 생성되는 기준은 2가지 옵션이 있다.

    • segment.bytes (default 1GB)
    • segment.ms

    위의 예제에서 segment.ms를 100으로 줘서 100ms마다 세그먼트가 생성되도록 하였다. 

    0, 6번 파일이 세그먼트이고, 6번 세그먼트 파일이 지금 active 이다.

     

    Cleaning Process

    카프카는 로그 압축을 위해 cleaner 스레드를 사용한다. cleaner 스레드는 dirty.ratio를 보고 정리하려고 시작하게 된다. 

    Confluent

     

    dirty ratio = the number of bytes in the head / total number of bytes in the log(tail + head)

     

    스레드는 dirty ratio를 계산해서 이 비율보다 높은 로그를 선택해서 삭제하려고 한다. 이 때 아래 조건들은 cleaning 조건에 포함되지 않는다.

     

    • Active 세그먼트 내의 로그 → p5:14$, p5:17$는 지금 active segment안에 같이 있어서 정리대상이 되지 않았다. 
    • min.compaction.lag.ms가 0보다 크고 이 시간만큼 지나지 않았으면 정리대상이 되지 않는다.

     

    cleaner 스레드는 offset Map 이라는 것을 사용해서 가장 최신 레코드가 어떤 것인지 판별한다. 또한 값이 null인 것들도 제거한다. 

     

     

    [Log Compaction Example : Schema-Registry]

    schema-registry에서도 Log Compaction을 사용해서 schema를 관리하고 있다.

     

    사용토픽: _schemas

     

    어떤 옵션을 가지고 사용하는지 살펴본다.

    Topic: _schemas TopicId: Pj1gKd_CR3Kic_34e8sdfg PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2,cleanup.policy=compact,segment.bytes=1073741824
            Topic: _schemas Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 3,1,2      Offline:

    CLI로 확인해보니 대부분 디폴트값으로 쓰고 있어서 별다른 정보가 없는듯하다. C3에서 상세히 확인해본다.

     

    configurations

    • cleanup.policy: compact
    • retention.ms: 1 week 
    • retention.bytes: -1 

     

    • min.cleanable.dirty.ratio: 0.5
    • segment.bytes: 1GB
    • segment.ms: 1week

     

    • max.compaction.lag,ms: 9223372036854775807 (사실상 무한)
    • min.compaction.lag.ms: 0

     

    => 그냥 기본값으로 쓰고 있다. 그럼 segment로 인한 중복값은 어떻게 처리하는 걸까.. 

     

     

    # key
    { "keytype": "SCHEMA", "subject": "jaeshim-member-backward-value", "version": 4, "magic": 1 }
     
    # header
    {}
    # value
    {
      "subject": "jaeshim-member-backward-value",
      "version": 4,
      "id": 12,
      "schema": "{\"type\":\"record\",\"name\":\"MemberBackward\",\"namespace\":\"com.ebaykorea.schema.registry.tester.producer.avro\",\"fields\":[{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"age\",\"type\":\"int\"}],\"Compatibility\":\"FORWARD\"}",
      "deleted": false
    }

    버전을 명시해놓았다.

    이걸로 버전관리를 하면서 사용하는 것 같은 느낌. 

     

     

    공식 문서의 스키마 관리 방법에 대한 설명 

    Kafka is used as Schema Registry storage backend. The special Kafka topic <kafkastore.topic> (default _schemas), with a single partition, is used as a highly available write ahead log. All schemas, subject/version and ID metadata, and compatibility settings are appended as messages to this log. A Schema Registry instance therefore both produces and consumes messages under the _schemas topic. It produces messages to the log when, for example, new schemas are registered under a subject, or when updates to compatibility settings are registered. Schema Registry consumes from the _schemas log in a background thread, and updates its local caches on consumption of each new _schemas message to reflect the newly added schema or compatibility setting. Updating local state from the Kafka log in this manner ensures durability, ordering, and easy recoverability.

    #번역기
    Kafka는 스키마 레지스트리 스토리지 백엔드로 사용됩니다. 단일 파티션이 있는 특별한 Kafka 주제 <kafkastore.topic>(기본값 _schemas)는 고가용성 미리 쓰기 로그로 사용됩니다.
    모든 스키마, 제목/버전 및 ID 메타데이터, 호환성 설정이 이 로그에 메시지로 추가됩니다. 따라서 스키마 레지스트리 인스턴스는 _schemas 주제 아래에서 메시지를 생성하고 사용합니다.
    예를 들어 새 스키마가 제목 아래에 등록되거나 호환성 설정에 대한 업데이트가 등록될 때 로그에 메시지를 생성합니다.
    스키마 레지스트리는 백그라운드 스레드의 _schemas 로그를 사용하고 새로 추가된 스키마 또는 호환성 설정을 반영하기 위해 각각의 새로운 _schemas 메시지를 사용할 때 로컬 캐시를 업데이트합니다.
    이러한 방식으로 Kafka 로그에서 로컬 상태를 업데이트하면 내구성, 순서 지정 및 손쉬운 복구가 보장됩니다.

     

    소스 코드

    https://github.com/confluentinc/schema-registry

     

    GitHub - confluentinc/schema-registry: Confluent Schema Registry for Kafka

    Confluent Schema Registry for Kafka. Contribute to confluentinc/schema-registry development by creating an account on GitHub.

    github.com

     

    [참조]

    Log Compaction 설명

    - https://kafka.apache.org/documentation/#compaction

    - https://towardsdatascience.com/log-compacted-topics-in-apache-kafka-b1aa1e4665a7

     

    사용예제

    https://hevodata.com/learn/kafka-log-compaction/#meth2