Kafka

MirrorMaker2 - 테스트

재심 2023. 7. 24. 10:51

목차

     

    MirrorMaker2 설정 정리 참조: https://godekdls.github.io/Apache%20Kafka/geo-replication/  (MirrorMaker2 부분)

    [MirrorSourceConnector]

    기본 조건

    active-passive 상황으로 가정한다.

    • source: source-dev (active)
    • target: target-dev (passive)
    • 토픽명: jaeshim-mirror-20230724 (파티션5)

    Configurations

    설정값 설명
    Default
    refresh.topics.enabled 주기적으로 새 토픽을 확인할 지 여부  true
    refresh.topics.interval.seconds 새 토픽 확인 주기 6000 (10분)
    source.cluster.alias source 클러스터의 별칭  
    source.cluster.bootstrap.servers source 클러스터  
    replication.policy.separator source 클러스터와 현재 클러스터간 구분자  
    target.cluster.alias target 클러스터의 별칭  
    target.cluster.bootstrap.servers target 클러스터  
    topics 대상 토픽. 
    ex)
    jaeshim_test.*
     
    key.converter Key Converter  
    value.converter Value Converter  
    header.converter Header Converter  
    tasks.max connector의 task수 1

     

    테스트 정리

    상황 행위 확인사항
    메시지 정상 복제 여부 토픽 복제 key, value, header 모두 정상 복제됨
    토픽 설정 정보 복제 여부 source 토픽은 파티션5
    target 토픽은 브로커 기본설정 (파티션2)
    target에서도 파티션5로 생성되고 복제됨.
    단 아래 값들이 true여야함.
    • sync.topic.configs.enabled: 토픽 설정 복제 여부 (Default: true)
    • refresh.topics.enabled: 새 토픽 주기적으로 확인여부 (Default: true)
    • refresh.topics.interval.seconds: 새 토픽 확인 주기 (Default: 6000, 10분)
    동일한 파티션에 복제되는지 여부
    1. source에서 파티션5개로 토픽생성
    2. target에서 파티션2개로 토픽생성
    3. source에서 파티션4 (target에는 없는) 파티션에 메시지 생성
    4. 메시지가 복제되지 않는지 확인
    5. target 토픽 파티션을 5로 늘림 
    6. 메시지 정상 복제 여부 확인
    동일한 파티션 번호에 동일하게 복제함.
    target에서 토픽 미리 생성할 경우 잘 복제되는지 확인
    1. source에서 파티션5로 토픽생성
    2. target에서 파티션5로 토픽생성
    3. source에서 메시지 1개 생성
    4. connector 가동 
    5. target에 메시지 복제 정상적으로 되는지 확인
    잘 복제됨
    target에서 토픽을 삭제하고 다시 만들었을 때 메시지 복제는 어떻게 되는지 보기 
    1. source에서 메시지 2개 생성
    2. connector 가동 
    3. target에서 메시지 2개 복제 확인
    4. target에서 토픽 삭제
    5. source에서 메시지 1개 생성
    6. source에서 메시지 복제 어떻게 되는지 확인
    추가된 메시지만 복제됨.
    connector내부적으로 offset을 관리하므로 처음부터 복제하고 싶을 경우 신규 connector이름으로 가동하여야 함.
    target과 source의 오프셋 번호가 다른 경우 어떻게 되는지 보기
    1. source에서 메시지2개 생성
    2. target에 메시지 2개 복제 여부 확인
    3. source에서 토픽 삭제 후 재생성
    4. target에서는 어떻게 되는지 확인
    target에서는 기존 오프셋에 추가로 쌓도록 함. 

    target 클러스터와 source 클러스터의 파티션 수가 다를 때 어떻게 되는지 보기
    1. source에서 파티션1개로 토픽 생성
    2. target에서 파티션5개로 토픽 생성
    3. source에서 메시지1개 생성
    4. target에서 파티션0에 메시지 유입되는지 확인
    target에는 source와 동일한 파티션번호로 복제된다.
    이 때 오프셋 번호가 다른 경우 OffsetSync 토픽으로 싱크를 맞추려고 한다. 그러므로 CheckpointConnector도 함께 구동해주는 것이 좋아보인다. 


    메시지 예시) source는 파티션0에 2번메시지가, target에서는 3번 메시지라는 것을 의미함. 
    OffsetSync{topicPartition=jaeshim-mirror-20230731-0, upstreamOffset=2, downstreamOffset=3}
    connector를 삭제하고 "동일한 이름"으로 생성할 때 전체 메시지 복제 여부
    1. target에서 토픽 삭제
    2. connector 삭제 
    3. source에서 메시지 1개 추가 생성
    4. connector 재생성 (동일한 이름으로)
    5. target에서 토픽 재생성 확인 및 전체 복제 여부 확인  
    추가된 메시지만 복제됨.
    connector이름으로 복제된 offset을 확인하므로 전체복제가 안됨. 
    connector를 삭제하고 "다른 이름"으로 생성할 때 전체 메시지 복제 여부 
    1. connector 삭제
    2. 새로운 이름으로 connector 생성 (설정은 동일)
    3. target에서 메시지 전체 복제 어떻게 되는지 여부
    전체 메시지 복제됨. 
    전체 싱크를 위해서는 기존 connector이름과 달라야 하는듯하니 connector 이름을 


    XXXXConnector-v1-YYYYMMDDD 과 같이 버전과 구동일자 이런식으로 구분하는 것이 좋을 듯하다. 

     

    상세 내용

    MirrorSourceConnector 생성

    http://{target-connect}:8083/connectors 

    http method: POST

     

    {
      "name": "MirrorSourceConnectorTest",
      "config": {   
        "name": "MirrorSourceConnectorTest",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "topics": "jaeshim-mirror-20230724",
        "source.cluster.alias": "source-dev",
        "replication.policy.separator":"_",
        "source.cluster.bootstrap.servers": "{source-broker}:9092",
        "sync.topic.configs.enabled": "true",
        "target.cluster.alias": "target-dev",
        "target.cluster.bootstrap.servers": "{target-broker}:9092",   
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "tasks.max": "1"
      }
    }
     
    # 상태 체크
    GET http://{target-connect}:8083/connectors/MirrorSourceConnectorTest/status
     
    # describe
    GET http://{target-connect}:8083/connectors/MirrorSourceConnectorTest

    메시지 생성

    source 클러스터에서 메시지를 생성한다. 

    $ kafka-console-producer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --property parse.key=true --property key.separator=":"
    >a:1
    >a:2

    source에서 몇번 파티션에 생성되었는지 확인.

    아래는 5번 파티션에 2개의 메시지가 유입되었음. 

    $ watch 'sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list {source-broker}:9092 --topic jaeshim-mirror-20230724'
    jaeshim-mirror-20230724:0:0
    jaeshim-mirror-20230724:1:0
    jaeshim-mirror-20230724:2:0
    jaeshim-mirror-20230724:3:0
    jaeshim-mirror-20230724:4:2

    메시지 복제 여부 확인

    토픽은 생성되었으나 메시지는 복제되지 않음.  

    이유는 파티션이 2개밖에 존재하지 않아 동일한 파티션이 없어 복제되지 않는것.

    $ sudo kafka-topics --describe --bootstrap-server {target-dev}:9092 --topic source-dev_jaeshim-mirror-20230724
    Topic: source-dev_jaeshim-mirror-20230724       TopicId: IhH6TgeXTvuCjhgrO1VpZw PartitionCount: 2       ReplicationFactor: 2    Configs: min.insync.replicas=1,flush.ms=10000,segment.bytes=1073741824,flush.messages=5000,unclean.leader.election.enable=false,retention.bytes=34359738368
            Topic: source-dev_jaeshim-mirror-20230724       Partition: 0    Leader: 104     Replicas: 104,105       Isr: 104,105
            Topic: source-dev_jaeshim-mirror-20230724       Partition: 1    Leader: 105     Replicas: 105,106       Isr: 105,106

    파티션개수를 늘리고 확인

    $ sudo kafka-topics \
        --bootstrap-server {target-broker}:9092 \
        --alter \
        --topic source-dev_jaeshim-mirror-20230724 \
        --partitions 5
     
    # 확인
    $ sudo kafka-top--describe --bootstrap-server {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724        Topic: source-dev_jaeshim-mirror-20230724       TopicId: IhH6TgeXTvuCjhgrO1VpZw PartitionCount: 5       ReplicationFactor: 2    Configs: min.insync.replicas=1,flush.ms=10000,segment.bytes=1073741824,flush.messages=5000,unclean.leader.election.enable=false,retention.bytes=34359738368
            Topic: source-dev_jaeshim-mirror-20230724       Partition: 0    Leader: 104     Replicas: 104,105       Isr: 104,105
            Topic: source-dev_jaeshim-mirror-20230724       Partition: 1    Leader: 105     Replicas: 105,106       Isr: 105,106
            Topic: source-dev_jaeshim-mirror-20230724       Partition: 2    Leader: 106     Replicas: 106,104       Isr: 106,104
            Topic: source-dev_jaeshim-mirror-20230724       Partition: 3    Leader: 101     Replicas: 101,105       Isr: 101,105
            Topic: source-dev_jaeshim-mirror-20230724       Partition: 4    Leader: 102     Replicas: 102,106       Isr: 102,106

    메시지 복제 여부 확인

    $ sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724
    source-dev_jaeshim-mirror-20230724:0:0
    source-dev_jaeshim-mirror-20230724:1:0
    source-dev_jaeshim-mirror-20230724:2:0
    source-dev_jaeshim-mirror-20230724:3:0
    source-dev_jaeshim-mirror-20230724:4:2

    key와 value도 정상적으로 복제되었음.

    사용한 커맨드

    ## SOURCE
    sudo kafka-topics --create --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --partitions 5 
    sudo kafka-console-producer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --property parse.key=true --property key.separator=":"
    sudo kafka-topics --bootstrap-server {source-broker}:9092 --delete --topic jaeshim-mirror-20230724
    
    
    ## TARGET
    #토픽 생성
    sudo kafka-topics --create --bootstrap-server {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724 --partitions 5
     
    #토픽 확인
    sudo kafka-topics --describe --bootstrap-server  {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724
     
    #토픽 삭제
    sudo kafka-topics --bootstrap-server {target-broker}:9092 --delete --topic source-dev_jaeshim-mirror-20230724
      
    # offset 확인
    sudo kafka-run-class kafka.tools.GetOffsetShell   --broker-list {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724

     

    [MirrorCheckpointConnector]

    Configurations

    설정값
    설명 Default
    emit.checkpoints.enabled 체크포인트 활성화 여부 true
    emit.checkpoints.interval.seconds 체크포인트 활성화 주기 60 (1분)
    checkpoints.topic.replication.factor 체크포인트 관련 토픽의 Replication Factor 3
    sync.group.offsets.enabled Consumer-Group 오프셋 동기화 여부 true
    sync.group.offsets.interval.seconds Consumer-Group 오프셋 동기화 주기 60 (1분)
    offset-syncs.topic.replication.factor Consumer-Group 오프셋 토픽의 Replication Factor 3
    groups 복제 대상 consumer-group
    ex) jaeshim-mirror-group-.*,yubin-.*
     
    groups.exclude 복제 제외할 consumer-group
    ex) console-consumer-.*, connect-.*, __.*,_confluent-.*
     

    테스트 정리

    상황 행위 확인사항
    consumer-group 정상 복제 여부 consumer-group 복제 정상 복제됨. 이 때 내부 토픽을 통해 offset upstream, downstream 정보를 활용함. 
    target 클러스터에 동일한 이름의 consumer-group이 있는 경우 어떻게 되는지   
    해당되는 파티션 정보만 overwrite된다. 
    ex) 
    기존에 5개 파티션으로 되어있다가 1개 파티션에 대해 복제해올경우 0번 파티션만 정보가 바뀐다. 
    아래 보면 파티션0만 overwrite됨. 


    SOURCE
    Consumer group 'jaeshim-mirror-group-20230724' has no active members.
    GROUP                         TOPIC                   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            C
    LIENT-ID
    jaeshim-mirror-group-20230724 jaeshim-mirror-20230724 0          6               11              5               -               -               -



    TARGET 
    GROUP                         TOPIC                              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST
              CLIENT-ID
    jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 1          2               2               0               -               -
              -
    jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 2          1               1               0               -               -
              -
    jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 3          1               1               0               -               -
              -
    jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 4          4               4               0               -               -
              -
    jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 0          6               11              5               -               -
              -



    sync.group.offsets.enabled를 false로 두면 consumer group은 복제하지 않는 것인지 확인하기
    1. source에서 메시지 생성
    2. source에서 consumer-group 구동 
    3. target에 복제안되는지 확인하기
    이미 존재하던 Consumer-Group은 계속 복제함. 
    신규 Consumer-group은 false일 때는 복제하지 않음
    true로 변경하면 Consumer-group도 복제함. 
     토픽A와 컨슈머그룹 C1을 복제 및 컨슈머 렉 테스트 1. source에서 토픽A 생성
    2. soruce에서 컨슈머그룹 생성
    3. 토픽, 컨슈머 그룹 복제 시작
    4. soruce에서 토픽A에 메시지 10000개 생성 (렉 10000개 쌓임)
    5. target에 메시지 복제 확인, 컨슈머 그룹 복제 확인 (렉 10000개 쌓이는 채로 복제되는지 확인)
    6. target에서 컨슈머 그룹 구동하여 렉 소모 확인
    7. target에서 컨슈머 그룹 중지
    8. source에서 컨슈머 그룹 구동하여 렉 소모.
    9. target에서 컨슈머 그룹 렉 소모되는지 확인 (overwrite)
    source에서 생성되는 내용이 계속해서 복제됨. (overwrite). 이 때 target에서 컨슈머를 구동할 경우 복제하는대로 메시지를 소모할 수 있음. (END-OFFSET이 계속해서 싱크되서 이슈가 없는것으로 보임)
    토픽A, 토픽B를 복제하고, 컨슈머그룹 C1만 복제할 때 어떻게 되는지  (O:복제, X: 복제안함)
    대상토픽
    A: O
    B: O

    대상그룹
    C1-A: O
    C2-B: X
     
    토픽만 복제됨 
    토픽A, 토픽B 중 토픽A만 복제하고 컨슈머그룹 C1,C2를 둘 다 복제할때 어떻게 되는지 (O:복제, X: 복제안함)
    A: O
    B: X

    C1-A: O
    C2-B: O
     
    토픽A의 내용과 C1-A 그룹만 복제됨
    토픽A, 토픽B를 컨슈머그룹C1이 둘다 구독하고 모두 복제하는 경우 (O:복제, X: 복제안함)
    A:O
    B:O

    C1-A,B: O 
     
    모든 정보가 복제됨
    토픽A, 토픽B를 컨슈머그룹C1이 둘다 구독하는데 토픽A만 복제하는 경우 (O:복제, X: 복제안함)
    A:O
    B:X

    C1-A,B: O
     
    토픽A에 대한 컨슈밍 정보만 복제됨
    토픽A, 토픽B를 컨슈머 그룹C1이 둘다 구독하는데 C1을 복제하지 않는 경우 (O:복제, X: 복제안함)
    A:O
    B:O

    C1-A,B: X
     
    consumer-group정보는 복제되지 않고, 토픽만 복제됨.

    상세 내용

    토픽 생성 (Source)

    sudo kafka-topics --create --bootstrap-server {source-broker}:9092 --topic source-dev_jaeshim-mirror-20230724 --partitions 5

    Consumer 구동 (Source)

    sudo kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --group jaeshim-mirror-group-20230724
    sudo kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --group yubin-mirror-group-20230724
     
    # consumer group 확인
    kafka-consumer-groups --describe --group jaeshim-mirror-group-20230724 --bootstrap-server {source-broker}:9092
    kafka-consumer-groups --describe --group yubin-mirror-group-20230724 --bootstrap-server {source-broker}:9092

    MirrorSourceConnector 생성 (Target)

    http://{target-connect}:8083/connectors 

    http method: POST

    # Connector 생성
    {
      "name": "MirrorCheckpointConnectTest",
      "config": {   
        "name": "MirrorCheckpointConnectTest",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",  
        "source.cluster.alias": "source-dev",
        "replication.policy.separator":"_",
        "source.cluster.bootstrap.servers": "{source-broker}:9092",
        "sync.topic.configs.enabled": "true",
        "target.cluster.alias": "target-dev",
        "target.cluster.bootstrap.servers": "{target-broker}:9092",
        "sync.group.offsets.enabled": "true",
        "sync.group.offsets.interval.seconds": "10",
        "emit.checkpoints.interval.seconds": "20",
        "emit.checkpoints.enabled": "true",
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "groups": "jaeshim-mirror-group-20230724,yubin-.*",
        "groups.exclude": "console-consumer-.*, connect-.*, __.*,_confluent-.*"
      }
    }
    # 상태 체크
    GET http://{target-connect}:8083/connectors/MirrorSourceConnectorTest/status
     
    # describe
    GET http://{target-connect}:8083/connectors/MirrorSourceConnectorTest

    consumer checkpoint 복제 여부 확인

    kafka-consumer-groups --describe --group jaeshim-mirror-group-20230724 --bootstrap-server {target-broker}:9092
    kafka-consumer-groups --describe --group yubin-mirror-group-20230724 --bootstrap-server {target-broker}:9092
     
     
     
    # group1
    GROUP                       TOPIC                              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST
            CLIENT-ID
    yubin-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 0          11              11              0               -               -
            -
    # group2
    jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 0          6               11              5               -               -
              -

    checkpoint 관리 방법

    토픽 2개를 봐야한다.

    • "source".checkpoints.internal (TARGET 클러스터에 있음)
      • ex: mm2-offset-syncs_target-dev_internal 
      • source consumer group 정보, partition, upstream/downstream offset 정보를 관리 
    • mm2-offset-syncs."target".internal (SOURCE 클러스터에 있음)
      • ex: source-dev_checkpoints_internal
      • source와 target 카프카의 오프셋 매칭 정보를 관리 
    # 체크포인트 조회 (Consumer group 정보)
    # consumerGroupId=consumer group명
    # topicPartition=해당 consumer group과 매칭되는 토픽-파티션 정보
    # upstream=source 클러스터의 오프셋
    # downstream=target 클러스터의 오프셋
    $ kafka-console-consumer --bootstrap-server {target-broker}:9092 --topic source-dev_checkpoints_internal --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter
    Checkpoint{consumerGroupId=yubin-mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=12, downstreamOffset=12, metatadata=}
    Checkpoint{consumerGroupId=yubin--mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=7, downstreamOffset=7, metatadata=}
    Checkpoint{consumerGroupId=jaeshim-mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=6, downstreamOffset=6, metatadata=}
    Checkpoint{consumerGroupId=yubin-mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=12, downstreamOffset=12, metatadata=}
    Checkpoint{consumerGroupId=yubin--mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=7, downstreamOffset=7, metatadata=}
    Checkpoint{consumerGroupId=jaeshim-mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=6, downstreamOffset=6, metatadata=}
    ..계속 갱신
     
    # offset-sync 조회
    # upstream= source 클러스터의 오프셋
    # downstream= target 클러스터의 오프셋
    $ kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic  mm2-offset-syncs_target-dev_internal --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter --from-beginning
    OffsetSync{topicPartition=jaeshim-mirror-20230724-4, upstreamOffset=0, downstreamOffset=0}
    OffsetSync{topicPartition=jaeshim-mirror-20230724-1, upstreamOffset=0, downstreamOffset=0}
    OffsetSync{topicPartition=jaeshim-mirror-20230724-2, upstreamOffset=0, downstreamOffset=0}
    OffsetSync{topicPartition=jaeshim-mirror-20230724-3, upstreamOffset=0, downstreamOffset=0}
    OffsetSync{topicPartition=jaeshim-mirror-20230724-0, upstreamOffset=0, downstreamOffset=0}
    OffsetSync{topicPartition=jaeshim-mirror-20230724-0, upstreamOffset=6, downstreamOffset=6}
    ..메시지 변화가 있을 경우 갱신

    사용한 커맨드

    ##SOURCE
    #토픽 생성
    sudo kafka-topics --create --bootstrap-server {source-broker}:9092 --topic source-dev_jaeshim-mirror-20230724 --partitions 5 
    
    #consumer 구동
    sudo kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --group jaeshim-mirror-group-20230724
    sudo kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --group yubin-mirror-group-20230724
    
    #consumer-group describe
    kafka-consumer-groups --describe --group jaeshim-mirror-group-20230724 --bootstrap-server {source-broker}:9092
    kafka-consumer-groups --describe --group yubin-mirror-group-20230724 --bootstrap-server {source-broker}:9092
    
    #OffsetSync
    kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic  mm2-offset-syncs_target-dev_internal --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter --from-beginning
    
    ##TARGET
    #consumer-group describe
    kafka-consumer-groups --describe --group jaeshim-mirror-group-20230724 --bootstrap-server {target-broker}:9092
    kafka-consumer-groups --describe --group yubin-mirror-group-20230724 --bootstrap-server {target-broker}:9092
    
    #Checkpoint
    kafka-console-consumer --bootstrap-server {target-broker}:9092 --topic source-dev_checkpoints_internal --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter

     

    [MirrorHeartbeatConnector]

    MirrorHeartbeatConnector는 heartbeat 데이터를 기록하며 target(remote) 카프카가 정상 동작하고 있는지 주기적으로 체크한다.

     

    관련토픽:

    • heartbeats (target 클러스터에 있음)
    • emit.heartbeats.interval.seconds 설정으로 얼마마다 heartbeat를 체크할 지 선언할 수 있다. 

     

    Configurations

    설정값
    설명 Default
    emit.heartbeats.enabled 주기적으로 하트 비트를 방출할지 여부  true
    emit.heartbeats.interval.seconds 하트비트를 방출할 주기 (디폴트: 5, 5초 간격) 5초
    heartbeats.topic.replication.factor heartbeats 토픽의 Replication Factor  3

     

    상세 내용

    MirrorHeartbeatConnector 생성

    http://{target-connect}:8083/connectors 

    http method: POST

     

    Connector 생성

    {
      "name": "MirrorHeartbeatConnectorTest-20230724-7",
      "config": {   
            "name": "MirrorHeartbeatConnectorTest-20230724-7",
            "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
            "source.cluster.alias": "source-dev",
            "replication.policy.separator":"_",
            "source.cluster.bootstrap.servers": "{source-broker}:9092",
            "sync.topic.configs.enabled": "false",
            "target.cluster.alias": "target-dev",
            "target.cluster.bootstrap.servers": "{target-broker}:9092",
            "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
            "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          	"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
      }
    }

    사용한 커맨드

    ##SOURCE
    kafka-console-consumer --bootstrap-server {terget-broker}:9092 --topic heartbeats --formatter org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777081004}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777082004}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777083005}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777084005}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777085005}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777086005}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777087005}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777088005}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777089005}
    Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777090006}

     

     

    'Kafka' 카테고리의 다른 글

    Kafka Cruise-Control 사용해보기  (0) 2023.08.31
    MirrorMaker2 Basic  (0) 2023.07.24
    클러스터간 메시지 복제  (0) 2023.07.24
    Kafka KRaft Protocol 정리  (0) 2023.05.28
    Performance Management  (0) 2023.04.16