input {
kafka {
id => "input_kafka"
bootstrap_servers => "source1:9092,source2:9092"
group_id => "logstash-consumer"
topics_pattern => "^test_topic"
partition_assignment_strategy => "cooperative_sticky"
auto_offset_reset => "latest"
codec => json
decorate_events => true
}
}
filter{
mutate{
remove_field => ["@version","@timestamp"] //@version, timestamp 제거
}
}
output {
kafka {
bootstrap_servers => "sink1:9092,sink2:9092"
codec => json
acks => "1"
retries => "5"
linger_ms => "100"
compression_type => "snappy"
topic_id => "sink_topic"
message_key => "%{mykey}" //json 메시지 내부의 값을 key로 지정하여 생성
}
}