Data Pipeline/Logstash

Logstash로 Kafka 토픽 복제해보기

재심 2023. 6. 20. 09:43
input {
        kafka {
                id => "input_kafka"
                bootstrap_servers => "source1:9092,source2:9092"
                group_id => "logstash-consumer"
                topics_pattern => "^test_topic"
                partition_assignment_strategy => "cooperative_sticky"
                auto_offset_reset => "latest"
                codec => json
                decorate_events => true
        }
}
 
filter{         
        mutate{
          remove_field => ["@version","@timestamp"] //@version, timestamp 제거
        }
 
}
output {
        kafka {
                bootstrap_servers => "sink1:9092,sink2:9092"
                codec => json
                acks => "1"
                retries => "5"
                linger_ms => "100"       
                compression_type => "snappy"       
                topic_id => "sink_topic"
                message_key => "%{mykey}" //json 메시지 내부의 값을 key로 지정하여 생성
        }
}