목차
MirrorMaker2 설정 정리 참조: https://godekdls.github.io/Apache%20Kafka/geo-replication/ (MirrorMaker2 부분)
[MirrorSourceConnector]
기본 조건
active-passive 상황으로 가정한다.
- source: source-dev (active)
- target: target-dev (passive)
- 토픽명: jaeshim-mirror-20230724 (파티션5)
Configurations
설정값 | 설명 |
Default |
refresh.topics.enabled | 주기적으로 새 토픽을 확인할 지 여부 | true |
refresh.topics.interval.seconds | 새 토픽 확인 주기 | 6000 (10분) |
source.cluster.alias | source 클러스터의 별칭 | |
source.cluster.bootstrap.servers | source 클러스터 | |
replication.policy.separator | source 클러스터와 현재 클러스터간 구분자 | |
target.cluster.alias | target 클러스터의 별칭 | |
target.cluster.bootstrap.servers | target 클러스터 | |
topics | 대상 토픽. ex) jaeshim_test.* |
|
key.converter | Key Converter | |
value.converter | Value Converter | |
header.converter | Header Converter | |
tasks.max | connector의 task수 | 1 |
테스트 정리
상황 | 행위 | 확인사항 |
메시지 정상 복제 여부 | 토픽 복제 | key, value, header 모두 정상 복제됨 |
토픽 설정 정보 복제 여부 | source 토픽은 파티션5 target 토픽은 브로커 기본설정 (파티션2) |
target에서도 파티션5로 생성되고 복제됨. 단 아래 값들이 true여야함.
|
동일한 파티션에 복제되는지 여부 |
|
동일한 파티션 번호에 동일하게 복제함. |
target에서 토픽 미리 생성할 경우 잘 복제되는지 확인 |
|
잘 복제됨 |
target에서 토픽을 삭제하고 다시 만들었을 때 메시지 복제는 어떻게 되는지 보기 |
|
추가된 메시지만 복제됨. connector내부적으로 offset을 관리하므로 처음부터 복제하고 싶을 경우 신규 connector이름으로 가동하여야 함. |
target과 source의 오프셋 번호가 다른 경우 어떻게 되는지 보기 |
|
target에서는 기존 오프셋에 추가로 쌓도록 함. |
target 클러스터와 source 클러스터의 파티션 수가 다를 때 어떻게 되는지 보기 |
|
target에는 source와 동일한 파티션번호로 복제된다. |
connector를 삭제하고 "동일한 이름"으로 생성할 때 전체 메시지 복제 여부 |
|
추가된 메시지만 복제됨. connector이름으로 복제된 offset을 확인하므로 전체복제가 안됨. |
connector를 삭제하고 "다른 이름"으로 생성할 때 전체 메시지 복제 여부 |
|
전체 메시지 복제됨. 전체 싱크를 위해서는 기존 connector이름과 달라야 하는듯하니 connector 이름을 XXXXConnector-v1-YYYYMMDDD 과 같이 버전과 구동일자 이런식으로 구분하는 것이 좋을 듯하다. |
상세 내용
MirrorSourceConnector 생성
http://{target-connect}:8083/connectors
http method: POST
{
"name": "MirrorSourceConnectorTest",
"config": {
"name": "MirrorSourceConnectorTest",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"topics": "jaeshim-mirror-20230724",
"source.cluster.alias": "source-dev",
"replication.policy.separator":"_",
"source.cluster.bootstrap.servers": "{source-broker}:9092",
"sync.topic.configs.enabled": "true",
"target.cluster.alias": "target-dev",
"target.cluster.bootstrap.servers": "{target-broker}:9092",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max": "1"
}
}
# 상태 체크
GET http://{target-connect}:8083/connectors/MirrorSourceConnectorTest/status
# describe
GET http://{target-connect}:8083/connectors/MirrorSourceConnectorTest
메시지 생성
source 클러스터에서 메시지를 생성한다.
$ kafka-console-producer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --property parse.key=true --property key.separator=":"
>a:1
>a:2
source에서 몇번 파티션에 생성되었는지 확인.
아래는 5번 파티션에 2개의 메시지가 유입되었음.
$ watch 'sudo kafka-run-class kafka.tools.GetOffsetShell --broker-list {source-broker}:9092 --topic jaeshim-mirror-20230724'
jaeshim-mirror-20230724:0:0
jaeshim-mirror-20230724:1:0
jaeshim-mirror-20230724:2:0
jaeshim-mirror-20230724:3:0
jaeshim-mirror-20230724:4:2
메시지 복제 여부 확인
토픽은 생성되었으나 메시지는 복제되지 않음.
이유는 파티션이 2개밖에 존재하지 않아 동일한 파티션이 없어 복제되지 않는것.
$ sudo kafka-topics --describe --bootstrap-server {target-dev}:9092 --topic source-dev_jaeshim-mirror-20230724
Topic: source-dev_jaeshim-mirror-20230724 TopicId: IhH6TgeXTvuCjhgrO1VpZw PartitionCount: 2 ReplicationFactor: 2 Configs: min.insync.replicas=1,flush.ms=10000,segment.bytes=1073741824,flush.messages=5000,unclean.leader.election.enable=false,retention.bytes=34359738368
Topic: source-dev_jaeshim-mirror-20230724 Partition: 0 Leader: 104 Replicas: 104,105 Isr: 104,105
Topic: source-dev_jaeshim-mirror-20230724 Partition: 1 Leader: 105 Replicas: 105,106 Isr: 105,106
파티션개수를 늘리고 확인
$ sudo kafka-topics \
--bootstrap-server {target-broker}:9092 \
--alter \
--topic source-dev_jaeshim-mirror-20230724 \
--partitions 5
# 확인
$ sudo kafka-top--describe --bootstrap-server {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724 Topic: source-dev_jaeshim-mirror-20230724 TopicId: IhH6TgeXTvuCjhgrO1VpZw PartitionCount: 5 ReplicationFactor: 2 Configs: min.insync.replicas=1,flush.ms=10000,segment.bytes=1073741824,flush.messages=5000,unclean.leader.election.enable=false,retention.bytes=34359738368
Topic: source-dev_jaeshim-mirror-20230724 Partition: 0 Leader: 104 Replicas: 104,105 Isr: 104,105
Topic: source-dev_jaeshim-mirror-20230724 Partition: 1 Leader: 105 Replicas: 105,106 Isr: 105,106
Topic: source-dev_jaeshim-mirror-20230724 Partition: 2 Leader: 106 Replicas: 106,104 Isr: 106,104
Topic: source-dev_jaeshim-mirror-20230724 Partition: 3 Leader: 101 Replicas: 101,105 Isr: 101,105
Topic: source-dev_jaeshim-mirror-20230724 Partition: 4 Leader: 102 Replicas: 102,106 Isr: 102,106
메시지 복제 여부 확인
$ sudo kafka-run-class kafka.tools.GetOffsetShell --broker-list {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724
source-dev_jaeshim-mirror-20230724:0:0
source-dev_jaeshim-mirror-20230724:1:0
source-dev_jaeshim-mirror-20230724:2:0
source-dev_jaeshim-mirror-20230724:3:0
source-dev_jaeshim-mirror-20230724:4:2
key와 value도 정상적으로 복제되었음.
사용한 커맨드
## SOURCE
sudo kafka-topics --create --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --partitions 5
sudo kafka-console-producer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --property parse.key=true --property key.separator=":"
sudo kafka-topics --bootstrap-server {source-broker}:9092 --delete --topic jaeshim-mirror-20230724
## TARGET
#토픽 생성
sudo kafka-topics --create --bootstrap-server {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724 --partitions 5
#토픽 확인
sudo kafka-topics --describe --bootstrap-server {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724
#토픽 삭제
sudo kafka-topics --bootstrap-server {target-broker}:9092 --delete --topic source-dev_jaeshim-mirror-20230724
# offset 확인
sudo kafka-run-class kafka.tools.GetOffsetShell --broker-list {target-broker}:9092 --topic source-dev_jaeshim-mirror-20230724
[MirrorCheckpointConnector]
Configurations
설정값 |
설명 | Default |
emit.checkpoints.enabled | 체크포인트 활성화 여부 | true |
emit.checkpoints.interval.seconds | 체크포인트 활성화 주기 | 60 (1분) |
checkpoints.topic.replication.factor | 체크포인트 관련 토픽의 Replication Factor | 3 |
sync.group.offsets.enabled | Consumer-Group 오프셋 동기화 여부 | true |
sync.group.offsets.interval.seconds | Consumer-Group 오프셋 동기화 주기 | 60 (1분) |
offset-syncs.topic.replication.factor | Consumer-Group 오프셋 토픽의 Replication Factor | 3 |
groups | 복제 대상 consumer-group ex) jaeshim-mirror-group-.*,yubin-.* |
|
groups.exclude | 복제 제외할 consumer-group ex) console-consumer-.*, connect-.*, __.*,_confluent-.* |
테스트 정리
상황 | 행위 | 확인사항 |
consumer-group 정상 복제 여부 | consumer-group 복제 | 정상 복제됨. 이 때 내부 토픽을 통해 offset upstream, downstream 정보를 활용함. |
target 클러스터에 동일한 이름의 consumer-group이 있는 경우 어떻게 되는지 |
해당되는 파티션 정보만 overwrite된다.
ex) 기존에 5개 파티션으로 되어있다가 1개 파티션에 대해 복제해올경우 0번 파티션만 정보가 바뀐다. 아래 보면 파티션0만 overwrite됨. SOURCE
Consumer group 'jaeshim-mirror-group-20230724' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST C LIENT-ID jaeshim-mirror-group-20230724 jaeshim-mirror-20230724 0 6 11 5 - - - TARGET GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 1 2 2 0 - - - jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 2 1 1 0 - - - jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 3 1 1 0 - - - jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 4 4 4 0 - - - jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 0 6 11 5 - - - |
|
sync.group.offsets.enabled를 false로 두면 consumer group은 복제하지 않는 것인지 확인하기 |
|
이미 존재하던 Consumer-Group은 계속 복제함.
신규 Consumer-group은 false일 때는 복제하지 않음 true로 변경하면 Consumer-group도 복제함. |
토픽A와 컨슈머그룹 C1을 복제 및 컨슈머 렉 테스트 | 1. source에서 토픽A 생성 2. soruce에서 컨슈머그룹 생성 3. 토픽, 컨슈머 그룹 복제 시작 4. soruce에서 토픽A에 메시지 10000개 생성 (렉 10000개 쌓임) 5. target에 메시지 복제 확인, 컨슈머 그룹 복제 확인 (렉 10000개 쌓이는 채로 복제되는지 확인) 6. target에서 컨슈머 그룹 구동하여 렉 소모 확인 7. target에서 컨슈머 그룹 중지 8. source에서 컨슈머 그룹 구동하여 렉 소모. 9. target에서 컨슈머 그룹 렉 소모되는지 확인 (overwrite) |
source에서 생성되는 내용이 계속해서 복제됨. (overwrite). 이 때 target에서 컨슈머를 구동할 경우 복제하는대로 메시지를 소모할 수 있음. (END-OFFSET이 계속해서 싱크되서 이슈가 없는것으로 보임)
|
토픽A, 토픽B를 복제하고, 컨슈머그룹 C1만 복제할 때 어떻게 되는지 (O:복제, X: 복제안함) 대상토픽 A: O B: O 대상그룹 C1-A: O C2-B: X |
토픽만 복제됨
|
|
토픽A, 토픽B 중 토픽A만 복제하고 컨슈머그룹 C1,C2를 둘 다 복제할때 어떻게 되는지 (O:복제, X: 복제안함) A: O B: X C1-A: O C2-B: O |
토픽A의 내용과 C1-A 그룹만 복제됨
|
|
토픽A, 토픽B를 컨슈머그룹C1이 둘다 구독하고 모두 복제하는 경우 (O:복제, X: 복제안함) A:O B:O C1-A,B: O |
모든 정보가 복제됨
|
|
토픽A, 토픽B를 컨슈머그룹C1이 둘다 구독하는데 토픽A만 복제하는 경우 (O:복제, X: 복제안함) A:O B:X C1-A,B: O |
토픽A에 대한 컨슈밍 정보만 복제됨
|
|
토픽A, 토픽B를 컨슈머 그룹C1이 둘다 구독하는데 C1을 복제하지 않는 경우 (O:복제, X: 복제안함) A:O B:O C1-A,B: X |
consumer-group정보는 복제되지 않고, 토픽만 복제됨.
|
상세 내용
토픽 생성 (Source)
sudo kafka-topics --create --bootstrap-server {source-broker}:9092 --topic source-dev_jaeshim-mirror-20230724 --partitions 5
Consumer 구동 (Source)
sudo kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --group jaeshim-mirror-group-20230724
sudo kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --group yubin-mirror-group-20230724
# consumer group 확인
kafka-consumer-groups --describe --group jaeshim-mirror-group-20230724 --bootstrap-server {source-broker}:9092
kafka-consumer-groups --describe --group yubin-mirror-group-20230724 --bootstrap-server {source-broker}:9092
MirrorSourceConnector 생성 (Target)
http://{target-connect}:8083/connectors
http method: POST
# Connector 생성
{
"name": "MirrorCheckpointConnectTest",
"config": {
"name": "MirrorCheckpointConnectTest",
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"source.cluster.alias": "source-dev",
"replication.policy.separator":"_",
"source.cluster.bootstrap.servers": "{source-broker}:9092",
"sync.topic.configs.enabled": "true",
"target.cluster.alias": "target-dev",
"target.cluster.bootstrap.servers": "{target-broker}:9092",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "10",
"emit.checkpoints.interval.seconds": "20",
"emit.checkpoints.enabled": "true",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"groups": "jaeshim-mirror-group-20230724,yubin-.*",
"groups.exclude": "console-consumer-.*, connect-.*, __.*,_confluent-.*"
}
}
# 상태 체크
GET http://{target-connect}:8083/connectors/MirrorSourceConnectorTest/status
# describe
GET http://{target-connect}:8083/connectors/MirrorSourceConnectorTest
consumer checkpoint 복제 여부 확인
kafka-consumer-groups --describe --group jaeshim-mirror-group-20230724 --bootstrap-server {target-broker}:9092
kafka-consumer-groups --describe --group yubin-mirror-group-20230724 --bootstrap-server {target-broker}:9092
# group1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
CLIENT-ID
yubin-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 0 11 11 0 - -
-
# group2
jaeshim-mirror-group-20230724 source-dev_jaeshim-mirror-20230724 0 6 11 5 - -
-
checkpoint 관리 방법
토픽 2개를 봐야한다.
- "source".checkpoints.internal (TARGET 클러스터에 있음)
- ex: mm2-offset-syncs_target-dev_internal
- source consumer group 정보, partition, upstream/downstream offset 정보를 관리
- mm2-offset-syncs."target".internal (SOURCE 클러스터에 있음)
- ex: source-dev_checkpoints_internal
- source와 target 카프카의 오프셋 매칭 정보를 관리
# 체크포인트 조회 (Consumer group 정보)
# consumerGroupId=consumer group명
# topicPartition=해당 consumer group과 매칭되는 토픽-파티션 정보
# upstream=source 클러스터의 오프셋
# downstream=target 클러스터의 오프셋
$ kafka-console-consumer --bootstrap-server {target-broker}:9092 --topic source-dev_checkpoints_internal --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter
Checkpoint{consumerGroupId=yubin-mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=12, downstreamOffset=12, metatadata=}
Checkpoint{consumerGroupId=yubin--mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=7, downstreamOffset=7, metatadata=}
Checkpoint{consumerGroupId=jaeshim-mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=6, downstreamOffset=6, metatadata=}
Checkpoint{consumerGroupId=yubin-mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=12, downstreamOffset=12, metatadata=}
Checkpoint{consumerGroupId=yubin--mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=7, downstreamOffset=7, metatadata=}
Checkpoint{consumerGroupId=jaeshim-mirror-group-20230724, topicPartition=source-dev_jaeshim-mirror-20230724-0, upstreamOffset=6, downstreamOffset=6, metatadata=}
..계속 갱신
# offset-sync 조회
# upstream= source 클러스터의 오프셋
# downstream= target 클러스터의 오프셋
$ kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic mm2-offset-syncs_target-dev_internal --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter --from-beginning
OffsetSync{topicPartition=jaeshim-mirror-20230724-4, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartition=jaeshim-mirror-20230724-1, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartition=jaeshim-mirror-20230724-2, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartition=jaeshim-mirror-20230724-3, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartition=jaeshim-mirror-20230724-0, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartition=jaeshim-mirror-20230724-0, upstreamOffset=6, downstreamOffset=6}
..메시지 변화가 있을 경우 갱신
사용한 커맨드
##SOURCE
#토픽 생성
sudo kafka-topics --create --bootstrap-server {source-broker}:9092 --topic source-dev_jaeshim-mirror-20230724 --partitions 5
#consumer 구동
sudo kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --group jaeshim-mirror-group-20230724
sudo kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic jaeshim-mirror-20230724 --group yubin-mirror-group-20230724
#consumer-group describe
kafka-consumer-groups --describe --group jaeshim-mirror-group-20230724 --bootstrap-server {source-broker}:9092
kafka-consumer-groups --describe --group yubin-mirror-group-20230724 --bootstrap-server {source-broker}:9092
#OffsetSync
kafka-console-consumer --bootstrap-server {source-broker}:9092 --topic mm2-offset-syncs_target-dev_internal --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter --from-beginning
##TARGET
#consumer-group describe
kafka-consumer-groups --describe --group jaeshim-mirror-group-20230724 --bootstrap-server {target-broker}:9092
kafka-consumer-groups --describe --group yubin-mirror-group-20230724 --bootstrap-server {target-broker}:9092
#Checkpoint
kafka-console-consumer --bootstrap-server {target-broker}:9092 --topic source-dev_checkpoints_internal --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter
[MirrorHeartbeatConnector]
MirrorHeartbeatConnector는 heartbeat 데이터를 기록하며 target(remote) 카프카가 정상 동작하고 있는지 주기적으로 체크한다.
관련토픽:
- heartbeats (target 클러스터에 있음)
- emit.heartbeats.interval.seconds 설정으로 얼마마다 heartbeat를 체크할 지 선언할 수 있다.
Configurations
설정값 |
설명 | Default |
emit.heartbeats.enabled | 주기적으로 하트 비트를 방출할지 여부 | true |
emit.heartbeats.interval.seconds | 하트비트를 방출할 주기 (디폴트: 5, 5초 간격) | 5초 |
heartbeats.topic.replication.factor | heartbeats 토픽의 Replication Factor | 3 |
상세 내용
MirrorHeartbeatConnector 생성
http://{target-connect}:8083/connectors
http method: POST
Connector 생성
{
"name": "MirrorHeartbeatConnectorTest-20230724-7",
"config": {
"name": "MirrorHeartbeatConnectorTest-20230724-7",
"connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"source.cluster.alias": "source-dev",
"replication.policy.separator":"_",
"source.cluster.bootstrap.servers": "{source-broker}:9092",
"sync.topic.configs.enabled": "false",
"target.cluster.alias": "target-dev",
"target.cluster.bootstrap.servers": "{target-broker}:9092",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
사용한 커맨드
##SOURCE
kafka-console-consumer --bootstrap-server {terget-broker}:9092 --topic heartbeats --formatter org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777081004}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777082004}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777083005}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777084005}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777085005}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777086005}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777087005}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777088005}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777089005}
Heartbeat{sourceClusterAlias=source-dev, targetClusterAlias=target-dev, timestamp=1690777090006}
'Kafka' 카테고리의 다른 글
Kafka Cruise-Control 사용해보기 (0) | 2023.08.31 |
---|---|
MirrorMaker2 Basic (0) | 2023.07.24 |
클러스터간 메시지 복제 (0) | 2023.07.24 |
Kafka KRaft Protocol 정리 (0) | 2023.05.28 |
Performance Management (0) | 2023.04.16 |