Kafka/Producer

Kafka Producer Basic Architecture

재심 2023. 3. 3. 23:27

목차

    [기본 동작 방식]

    Apache Kafka wiki

    [Producer 컴포넌트]

    PDF파일 (출처: 본인)

    Producer Architecture.pdf
    0.07MB

    Overview

    전체 구성요소

    Record Accumulator

    Sender Thread

     

     

    [컴포넌트별 상세설명]

    Serializer

    레코드로부터 전달된 Key, Value가 지정된 serializer에 의해 변환된다.

     

    Partitoner

    partitioner에 의해 어떤 파티션으로 보내질 지 결정된다. 

    Key값이 있는 경우 Key의 Hash 값을 이용해서 파티션을 할당한다.

    Key값이 없는 경우 Stickey 방식으로 할당된다.

     

    Compression

    전송하려는 레코드를 압축할 수 있다. 레코드는 배치에 채워진 후 그 배치가 압축되어 RecordAccumulator에 저장된다고 한다. 

    Compression.type

    • gzip
    • snappy
    • lz4
    • zstd

     

    RecrodAccumulator append()

    전송하려는 Record는 전송전에 RecordAccumulator에 저장된다. RecrodAccumulator 는 batchs라는 Map을 가지고 있는데, 이 Map의 key는 topic partition이고, Value는 Deque<RecordBatch> 이다.

    출처: naver d2 블로그

    max.request.size, buffer.memory

    레코드가 RecordAccumulator에 저장되기전에 사이즈 체크를 먼저하는데 이때 max.request.size 혹은 buffer.memory 값보다 크면 RecordTooLargeException이 발생하면서 실패하게 된다.

     

    Deque

    출처: naver d2 블로그

    RecordAccumulator의 append() 가 호출되면 추가될 레코드의 해당하는 토픽-파티션의 Deque를 찾게된다

    이 Deque의 Last에서 RecordBatch 하나를 꺼내서 여유공간을 확인한 후 여유공간이 있으면 해당 RecordBatch에 레코드를 추가하고 없으면 새로운 RecordBatch를 생성하여 추가한다.

     

    새로운 RecordBatch를 생성할 때는 BufferPool에서 받아오게 되며 이 BufferPool의 크기는 buffer.memory 설정으로 결정한다.

    BufferSize가 부족하면 용량이 확보될 때 까지 max.block.ms 만큼 기다리고, 그 시간도 초과하면 TimeoutException이 발생한다.

     

    만약 레코드의 크기가 batch.size보다 크면 하나의 RecordBatch 크기는 레코드 크기로 할당된 후 한 개의 레코드만 저장된다.

    레코드의 크기가 batch.size보다 작다면 여러개의 레코드가 RecordBatch에 저장된다.

     

    Sender Thread

    Sender Thread는 RecordAccumulator에 저장된 RecordBatch를 꺼낸다.

    RecordAccumulator.drain()을 통해 얻어오게 된다. 

    출처: naver d2 블로그

     

    drain()과정에서는 각 Broker Node에 속하는 TopicPartition 목록을 불러온다.

    각 Node에 속한 TopicPartition을 보면서 Deque First쪽의 RecordBatch 하나를 꺼내서 RecordBatch List에 추가한다.

    max.request.size를 넘지 않을 정도로 RecordBatch를 꺼내와서 RecordBatchList를 완성한다. 

    출처: naver d2 블로그

    이 리스트가 하나의 전송단위인 In.flight.reqeusts가 된다.

    이 때 max.in.flight.per.connection 값을 보게 되는데, 이 값은 브로커에 전송할 수 있는 배치 수가 되는 것이며 이 값이 1이상일 경우 실패했을 때 시도하게되면서 배치 순서가 뒤바뀌어 결론적으로 메시지 순서가 바뀔 수 있다.

     

    Broker는 하나의 Connection에 대해서 요청이 들어온 순서대로 처리해서 응답한다.

    응답의 순서가 보장되기 때문에, KafkaProducer Client는 Broker로부터 응답이 오면 항상 InFlightRequests Deque의 가장 오래된 요청을 완료 처리한다.

    ProduceRequest가 완료되면 요청에 포함되었던 모든 RecordBatch의 콜백을 실행하고 Broker로부터의 응답을 Future를 통해서 사용자에게 전달한다.

    그리고 RecordBatch에서 사용한 ByteBuffer를 BufferPool로 반환하면서 Record 전송 처리가 모두 마무리된다.

     

    [참조]

    참고자료

    https://www.linkedin.com/pulse/kafka-producer-overview-sylvester-daniel

    https://d2.naver.com/helloworld/6560422

     

    상세그림

    https://blog.clairvoyantsoft.com/unleash-kafka-producers-architecture-and-internal-working-f33cba6c43aa

    https://www.confluent.io/blog/configure-kafka-to-minimize-latency/

    https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer