Data Pipeline/Logstash

Logstash를 이용한 Mongdb - Kafka 파이프라인 구성

재심 2022. 10. 29. 20:41

목차

    [개요]

    로컬환경에서 Logstash를 활용하여 Mongodb 데이터를 Kafka로 이관하는 작업을 검증해본다.

     

    • Source: Mongodb
    • Sink: Kafka

    [준비물]

    • Docker (Docker-compose 필요)
    • Mongodb, Kafka 등을 컨테이너로 구동할 Docker-Compose.yml 파일
    • Datagrip: Mongodb에 데이터를 넣고 확인할 도구 
    • Kafka-ui: 카프카 어드민 
    • Logstash

    [작업순서]

    1. Docker-Compose.yml 파일 작성
      1. mongodb, zookeeper, kafka, kafka-ui 구동 
    2. Datagrip으로 mongodb 접속
      1. database 생성
      2. collecction (table) 생성
      3. document (row) 생성
    3. logstash 세팅
      1. mongodb input, output 플러그인 설치 
      2. input: mongodb, output: kafka 설정파일 작성
      3. 구동 및 확인
    4. kafka 데이터 확인

    Docker-compose.yml 파일 작성

    docker-compose.yml

    version: '2.2'
     
    networks:
      somenetwork:
     
    services:
      mongo:
        container_name: mongo
        image: mongo
        networks:
          - somenetwork
        ports:
          - 27017:27017
        tty: true
        stdin_open: true
      zoo1:
        image: confluentinc/cp-zookeeper:7.1.2
        container_name: zoo1
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_SERVERS: zoo1:2888:3888
      kafka1:
        image: confluentinc/cp-kafka:7.1.2
        hostname: kafka1
        container_name: kafka1
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
          KAFKA_BROKER_ID: 1
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
          KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
        depends_on:
          - zoo1
     
      kafka2:
        image: confluentinc/cp-kafka:7.1.2
        hostname: kafka2
        container_name: kafka2
        ports:
          - "9093:9093"
        environment:
          KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
          KAFKA_BROKER_ID: 2
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
          KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
        depends_on:
          - zoo1
      kafka3:
        image: confluentinc/cp-kafka:7.1.2
        hostname: kafka3
        container_name: kafka3
        ports:
          - "9094:9094"
        environment:
          KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
          KAFKA_BROKER_ID: 3
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
          KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
        depends_on:
          - zoo1
     
      kafka-ui:
        container_name: kafka-ui
        image: provectuslabs/kafka-ui:latest
        ports:
          - 8080:8080
        depends_on:
          - zoo1
          - kafka1
          - kafka2
          - kafka3
        environment:
          KAFKA_CLUSTERS_0_NAME: local
          KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:19092,kafka2:19093,kafka3:19094
          KAFKA_CLUSTERS_0_ZOOKEEPER: zoo1:2181
          KAFKA_CLUSTERS_0_JMXPORT: 9997

    구동

    $ docker-compose up -d
     
    Creating network "mongodb_somenetwork" with the default driver
    Creating network "mongodb_default" with the default driver
    ...

    Datagrip으로 mongodb 접속

    접속

    + 버튼 - Data Source - MongoDB 

    Datagrip으로 mongodb 접속하기

    로컬에서 띄웠고, 포트도 27017이 맞으므로 그대로 연결한다. 

    localhost로 접속되었다.

    DB생성

    #mongodb에서는 DB생성을 하려면 use 키워드를 써야한다고 한다. 
    use Test

    Test라는 DB를 생성

    Collection 생성

    Collection은 테이블같은 개념이다.

    db.createCollection('test_collection')

    Explorer에서 test_collection이 생성되었다.

    Document 생성

    실제 데이터를 생성해보자.

    mongodb에서는 데이터를 json형태로 저장한다고 한다.

    //데이터 생성
    db.test_collection.insertOne({
      'number': 123,
      'hash': '0x1234567',
      'array': [1,2,3,4]
         
    })
     
    //확인
    db.test_collection.find()

    생성확인

    Logstash mongodb 플러그인 설치

    기본 Logstash 이미지에는 mongodb-input, output 플러그인이 없다.

    그러므로 Docker Container로 구동하기 위해서는 Dockerfile을 활용하여 기본이미지에 플러그인을 설치한 이미지를 사용하여야 한다.

     

    본 PoC에서는 로컬에서 logstash bin파일을 기반으로 로컬에 플러그인을 설치하고 진행한다. 

     

    logstash 다운로드: https://www.elastic.co/kr/downloads/logstash

    다운받은 후 환경변수로 등록해도되고, 아래처럼 bin 경로로 이동해도된다.

     

    플러그인 리스트 확인 및 mongodb 플러그인 설치

    $ ./logstash-plugin list
    Using bundled JDK: /Users/jaeshim/JAEMIN/docker/logstash-8.3.3/jdk.app/Contents/Home
    OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
     
    .. 목록에 mongodb 플러그인은 없음
     
    # 플러그인 설치
    $ ./logstash-plugin logstash-input-mongodb
    $ ./logstash-plugin logstash-output-mongodb
     
    # 플러그인 설치 확인
    $ ./logstash-plugin list | grep mongo
     
    OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
    logstash-input-mongodb
    logstash-output-mongodb

    logstash 파이프라인 생성 (conf 파일 만들기)

    input {
      mongodb {
        uri => 'mongodb://localhost:27017/Test'
        placeholder_db_dir => '../'
        placeholder_db_name => 'logstash_sqlite.Test'
        collection => 'test_collection'
        batch_size => 5000
        generateId => false
        parse_method => "simple"
      }
    }
    output{
      kafka{
        bootstrap_servers => "localhost:9092"
        codec => json
        acks => "1"
        retries => "5"
        linger_ms => "100"
        compression_type => "snappy"
        topic_id => "test_20220731"
      }
      stdout { codec => rubydebug }
    }

    logstash 파이프라인 구동

    $ ./logstash -r -f /Users/jaeshim/JAEMIN/docker/mongodb/logstash_in_mongo_out_kafka.conf
     
    Using bundled JDK: /Users/jaeshim/JAEMIN/docker/logstash-8.3.3/jdk.app/Contents/Home
    OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
    ...
    [2022-07-31T15:16:44,109][INFO ][org.apache.kafka.clients.producer.ProducerConfig][main] ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id = producer-1
        compression.type = snappy
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = false
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 100
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
    ...

    Datagrip에서 document 생성 및 logstash 로그 확인

    Kafka에서 메시지확인

    http://localhost:8080 접속 - kafka ui - topics - test_20220730 - messages - 생성한 메시지 그대로 들어갔는지 확인 

    Mongodb - Logstash를 거쳐 Kafka로 정상 유입되었다.

     

    [좀 더 진행해볼 것]

    • 대량데이터 테스트 
    • payload 데이터만 들어갈 수 있도록 logstash filter 추가 
    • fusion에서 logstash mongodb plugin 설치된 custom image로 배포 및 구동 테스트

    'Data Pipeline > Logstash' 카테고리의 다른 글

    Logstash로 Kafka 토픽 복제해보기  (0) 2023.06.20
    Logstash로 MSSQL - Redis 파이프라인 구성해보기  (0) 2022.12.24
    Logstash  (0) 2022.10.29