목차
[개요]
로컬환경에서 Logstash를 활용하여 Mongodb 데이터를 Kafka로 이관하는 작업을 검증해본다.
- Source: Mongodb
- Sink: Kafka
[준비물]
- Docker (Docker-compose 필요)
- Mongodb, Kafka 등을 컨테이너로 구동할 Docker-Compose.yml 파일
- Datagrip: Mongodb에 데이터를 넣고 확인할 도구
- Kafka-ui: 카프카 어드민
- Logstash
[작업순서]
- Docker-Compose.yml 파일 작성
- mongodb, zookeeper, kafka, kafka-ui 구동
- Datagrip으로 mongodb 접속
- database 생성
- collecction (table) 생성
- document (row) 생성
- logstash 세팅
- mongodb input, output 플러그인 설치
- input: mongodb, output: kafka 설정파일 작성
- 구동 및 확인
- kafka 데이터 확인
Docker-compose.yml 파일 작성
docker-compose.yml
version: '2.2'
networks:
somenetwork:
services:
mongo:
container_name: mongo
image: mongo
networks:
- somenetwork
ports:
- 27017:27017
tty: true
stdin_open: true
zoo1:
image: confluentinc/cp-zookeeper:7.1.2
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.1.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka2:
image: confluentinc/cp-kafka:7.1.2
hostname: kafka2
container_name: kafka2
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka3:
image: confluentinc/cp-kafka:7.1.2
hostname: kafka3
container_name: kafka3
ports:
- "9094:9094"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- zoo1
- kafka1
- kafka2
- kafka3
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:19092,kafka2:19093,kafka3:19094
KAFKA_CLUSTERS_0_ZOOKEEPER: zoo1:2181
KAFKA_CLUSTERS_0_JMXPORT: 9997
구동
$ docker-compose up -d
Creating network "mongodb_somenetwork" with the default driver
Creating network "mongodb_default" with the default driver
...
Datagrip으로 mongodb 접속
접속
+ 버튼 - Data Source - MongoDB
로컬에서 띄웠고, 포트도 27017이 맞으므로 그대로 연결한다.
DB생성
#mongodb에서는 DB생성을 하려면 use 키워드를 써야한다고 한다.
use Test
Collection 생성
Collection은 테이블같은 개념이다.
db.createCollection('test_collection')
Document 생성
실제 데이터를 생성해보자.
mongodb에서는 데이터를 json형태로 저장한다고 한다.
//데이터 생성
db.test_collection.insertOne({
'number': 123,
'hash': '0x1234567',
'array': [1,2,3,4]
})
//확인
db.test_collection.find()
Logstash mongodb 플러그인 설치
기본 Logstash 이미지에는 mongodb-input, output 플러그인이 없다.
그러므로 Docker Container로 구동하기 위해서는 Dockerfile을 활용하여 기본이미지에 플러그인을 설치한 이미지를 사용하여야 한다.
본 PoC에서는 로컬에서 logstash bin파일을 기반으로 로컬에 플러그인을 설치하고 진행한다.
logstash 다운로드: https://www.elastic.co/kr/downloads/logstash
다운받은 후 환경변수로 등록해도되고, 아래처럼 bin 경로로 이동해도된다.
플러그인 리스트 확인 및 mongodb 플러그인 설치
$ ./logstash-plugin list
Using bundled JDK: /Users/jaeshim/JAEMIN/docker/logstash-8.3.3/jdk.app/Contents/Home
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
.. 목록에 mongodb 플러그인은 없음
# 플러그인 설치
$ ./logstash-plugin logstash-input-mongodb
$ ./logstash-plugin logstash-output-mongodb
# 플러그인 설치 확인
$ ./logstash-plugin list | grep mongo
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
logstash-input-mongodb
logstash-output-mongodb
logstash 파이프라인 생성 (conf 파일 만들기)
input {
mongodb {
uri => 'mongodb://localhost:27017/Test'
placeholder_db_dir => '../'
placeholder_db_name => 'logstash_sqlite.Test'
collection => 'test_collection'
batch_size => 5000
generateId => false
parse_method => "simple"
}
}
output{
kafka{
bootstrap_servers => "localhost:9092"
codec => json
acks => "1"
retries => "5"
linger_ms => "100"
compression_type => "snappy"
topic_id => "test_20220731"
}
stdout { codec => rubydebug }
}
logstash 파이프라인 구동
$ ./logstash -r -f /Users/jaeshim/JAEMIN/docker/mongodb/logstash_in_mongo_out_kafka.conf
Using bundled JDK: /Users/jaeshim/JAEMIN/docker/logstash-8.3.3/jdk.app/Contents/Home
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
...
[2022-07-31T15:16:44,109][INFO ][org.apache.kafka.clients.producer.ProducerConfig][main] ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = snappy
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
...
Datagrip에서 document 생성 및 logstash 로그 확인
Kafka에서 메시지확인
http://localhost:8080 접속 - kafka ui - topics - test_20220730 - messages - 생성한 메시지 그대로 들어갔는지 확인
[좀 더 진행해볼 것]
- 대량데이터 테스트
- payload 데이터만 들어갈 수 있도록 logstash filter 추가
- fusion에서 logstash mongodb plugin 설치된 custom image로 배포 및 구동 테스트
'Data Pipeline > Logstash' 카테고리의 다른 글
Logstash로 Kafka 토픽 복제해보기 (0) | 2023.06.20 |
---|---|
Logstash로 MSSQL - Redis 파이프라인 구성해보기 (0) | 2022.12.24 |
Logstash (0) | 2022.10.29 |