Data Pipeline/Logstash

Logstash로 MSSQL - Redis 파이프라인 구성해보기

재심 2022. 12. 24. 10:36

목차

    [개요]

    MSSQL의 데이터를 Redis로 Sync하고 싶은데, 이 때 logstash를 활용해보려고 한다.

    update일자를 기준으로 최신데이터가 INSERT되거나 UPDATE되면 이를 logstash가 감지해서 Redis에 계속해서 업데이트해주는 파이프라인을 구축해보는 것이 목표.

     

    [로컬 세팅: Docker container로 MSSQL, Redis 구동]

    Kafka도 활용할 수 있을 것 같아 Kafka도 추가해놓았다.

    https://github.com/sjm767/docker-compose-storage/blob/main/docker/mssql-redis-sync/docker-compose.yml

     

    GitHub - sjm767/docker-compose-storage: 상황별로 필요한 docker-compose 파일 모음

    상황별로 필요한 docker-compose 파일 모음 . Contribute to sjm767/docker-compose-storage development by creating an account on GitHub.

    github.com

     

    M1 맥북에서는 azure sql을 사용해야 한다고 한다.
    services:
      sqlserver:
        image: mcr.microsoft.com/azure-sql-edge:latest
        user: root
        container_name: mssql
        hostname: mssql
        ports:
          - 1433:1433
        volumes:
          - ./data:/var/opt/mssql/data
        environment:
          ACCEPT_EULA: Y
          SA_PASSWORD: password1!

    [Logstash 다운로드]

    https://www.elastic.co/kr/downloads/logstash 

     

    Download Logstash Free | Get Started Now

    Download Logstash or the complete Elastic Stack (formerly ELK stack) for free and start collecting, searching, and analyzing your data with Elastic in minutes.

    www.elastic.co

    docker로 돌릴거면 docker-compose에 구성해서 돌려도 됨. 

     

    추가플러그인 설치가 필요한 경우 docker build를 통해 커스텀 이미지를 만들어야 한다.  

    [Logstash 구성 및 실행 테스트]

    로컬에서 logstash 구동이 되는지 우선 확인해보자.

    MSSQL Jdbc Driver 다운로드

    jdbc plugin에서 MSSQL에 접근하기 위해 MSSQL Jdbc driver 다운로드가 필요하다. 

    https://learn.microsoft.com/ko-kr/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-2017

     

    다운로드 - JDBC Driver for SQL Server

    Microsoft JDBC Driver for SQL Server를 다운로드하여 SQL Server 및 Azure SQL Database에 연결하는 Java 애플리케이션을 개발합니다.

    learn.microsoft.com

     

    다운받고 적절한 경로에 jar파일을 옮겨놓도록 한다. 

    logstash-output-redis-set 플러그인 다운로드

    기본 logstash-output-redis 플러그인은 data type을 list와 channel (pub-sub 에서 사용) 만 지원한다. HashMap이 필요한 경우 logstash-output-redis-set 이라는 플러그인 설치가 필요하다. 

    https://www.elastic.co/guide/en/logstash/current/plugins-outputs-redis.html#plugins-outputs-redis-data_type

     

    Redis output plugin | Logstash Reference [8.5] | Elastic

    Variable substitution in the id field only supports environment variables and does not support the use of values from the secret store.

    www.elastic.co

     

    logstash-output-redis-set: https://github.com/feix/logstash-output-redis-set

     

    GitHub - feix/logstash-output-redis-set: logstash plugin, set output to redis

    logstash plugin, set output to redis. Contribute to feix/logstash-output-redis-set development by creating an account on GitHub.

    github.com

     

    $ ./logstash-plugin install logstash-output-redis-set

     

    logstash Config 파일 생성

    nput {
       stdin{
     
       }
     }
     
    filter {
     
    }
    output {
     
        stdout{
           codec => rubydebug
        }
    }

    실행

    #이 명령어가 수행되려면 logstash.conf가 한 단계 상위 디렉토리에 있어야 한다.
    logstash/logstash-8.5.3/bin$ ./logstash -f logstash.conf
    # 123입력
    123
    # 아래처럼 나오면 성공
    {
           "message" => "123",
          "@version" => "1",
             "event" => {
            "original" => "123"
        },
        "@timestamp" => 2022-12-17T04:56:58.226660200Z,
              "host" => {
            "hostname" => "D-045041-00"
        }
    }

     

    [예제]

    예제1: MSSQL 쿼리 결과의 단순 JSON으로 Redis에 INSERT/UPDATE 하기

    logstash jdbc input plugin의 경우 쿼리 결과를 행별로 처리하며, 이를 단순 Json으로 처리할 수 있다.

    MSSQL의 쿼리결과를 아래처럼 JSON으로 만들 예정.

    {  
      "branch_id": 1, 
      "use_yn": "Y",
      "update_date": "2022-01-31 23:59:59"
    }

    MSSQL 테이블 생성

    CREATE TABLE TEST
    (
        BRANCH_ID BIGINT NOT NULL PRIMARY KEY,
        USE_YN CHAR(1) NOT NULL,
        UPDATE_DATE DATETIME2
    )

    Logstash Config 작성

    update일자 기준으로 데이터를 계속해서 싱크해 올 것이기 때문에 추적할 컬럼, 마지막으로 읽어온 날짜 등의 값을 계속해서 추적해야 한다. 그래서 아래와 같은 config들이 필요하다.

    • tracking_column
    • tracking_column_type
    • last_run_metadata_path
    • record_last_run
    input {
       jdbc {
         jdbc_driver_library => "/home/jaeshim/docker/sqljdbc_11.2/kor/mssql-jdbc-11.2.1.jre11.jar" //mssql jdbc 드라이버 파일 위치
         jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
         jdbc_connection_string => "jdbc:sqlserver://127.0.0.1:1433;databaseName=JAESHIM;integratedSecurity=false;encrypt=true;trustServerCertificate=true;"
         jdbc_user => "sa"
         jdbc_password => "password1!"
         tracking_column => update_date //추적할 컬럼명
         tracking_column_type => "timestamp" //추적할 컬럼 타입
         last_run_metadata_path => "/home/jaeshim/docker/logstash/last-run.txt" //마지막으로 체크한 컬럼값
         record_last_run => true //마지막으로 가져온 값을 사용하겠다고 선언. true로 선언할 경우 :sql_last_value를 통해 쿼리에서 사용할 수 있다.
         statement => "select BRANCH_ID BRANCH_ID, USE_YN AS USE_YN,UPDATE_DATE AS UPDATE_DATE from dbo.TEST with(nolock) where update_date > :sql_last_value"
         schedule => "*/10 * * * * *" //실행 주기 (10초)
       }
     }
     
    filter {
     mutate{
     
       remove_field => ["@timestamp","@version"]
     }
     
    }
    output {
     
       redis_set{
         host => "127.0.0.1"
         port => 6379
         action => HSET //Redis Data Type
         key => "test:%{branch_id}" //Key
         field => "%{branch_id}" //HashMap의 Key
         value => "{\"branch_id\": \"%{branch_id}\",\"use_yn\": \"%{use_yn}\"}" //HashMap의 Value
       }
       stdout { codec => rubydebug }
    }

    MSSQL 데이터 생성

    #데이터 생성
    insert into dbo.TEST values (1,'Y',CURRENT_TIMESTAMP)
     
    #SELECT
    select top 100 *  from dbo.TEST with(nolock)

    Logstash 구동

    $  ./logstash -f logstash.conf
    ###############
    [2022-12-17T14:10:11,290][INFO ][logstash.inputs.jdbc     ][main][528582c301a9971fecd96a6ce6c0de2cb6e52507efd957f3915ba256d08c2396] (0.029230s) select BRANCH_ID BRANCH_ID, USE_YN AS USE_YN, UPDATE_DATE AS UPDATE_DATE from dbo.TEST with(nolock) where update_date > '1970-01-01T00:00:00.000'
    {
             
                "update_dt" => 2022-12-15T20:24:18.990Z,
                "use_yn" => "Y", 
             "branch_id" => 1
    }

    last-run 파일 확인

    이 파일에서 마지막으로 체크한 update_date 시간을 얻어온다.

    이 파일을 삭제할 경우 현재시간 기준으로 새로 체크하는 방식으로 동작한다. 

    --- 2022-12-17 14:10:10.784286000 Z

    Redis 확인 

    Redis를 docker에서 구동했으므로 docker exec -it 커맨드를 이용해서 컨테이너 이미지 내부 접속 후 redis cli를 통해 확인해야 한다.

    Redis 주요 커맨드

     

    redis-cli 명령어 정리

    redis-cli 명령어 정리 redis-cli 접속 호스트명과 포트번호를 생략하면 localhost의 6379로 접속됩니다. -n db번호 -a 비밀번호 -s 소켓 -u 서버 url 등 접속 시 다양한 옵션 설정이 사용 가능합니다. # localhost

    freeblogger.tistory.com

     

    [Redis] Hashes 명령(HSET, HGET, HDEL)

    레디스의 Hashes 자료구조로 저장/읽기/삭제 관련된 명령어를 알아보고 실습을 해보자. Hashes 자료구조...

    blog.naver.com

    $ redis-cli
     
    # 전체 키 조회 시 1번 branch_id로 key가 생긴 것 확인
    keys *
    1) "test:1"
     
    # HGET으로 HashMap 내부 확인
     
    127.0.0.1:6379> HGET test:1 1
    "{\\\"branch_id\\\": \\\"1\\\", \\\"use_yn\\\": \\\"Y\\\"}"

    이슈

    플러그인이 쿼리결과를 행별로 처리하기 때문에 아래와 같이 PK를 단위로 묶어서 json array로 넣는것은 위 예제로 불가능하다.

    아래 예제2에서 MSSQL의 FOR JSON기능을 이용해서 풀어보려함.

     [ {
        "branch_id": 1005,
        "holiday_date": "2022-12-25 00:00:00",
        "use_yn": "Y",
        "update_date": "2022-01-01 00:00:00"
      },
    
      {
        "branch_id": 1005,
        "holiday_date": "2022-08-15 00:00:00",
        "use_yn": "Y",
        "update_date": "2022-01-31 23:59:59"
      },
    ]

     

    예제2. MSSQL의 쿼리 결과를 PK단위로 묶어 JSON ARRAY로 생성하기

    아래처럼 키 단위 (branch_id)로 JSON ARRAY로 묶어 Redis에 키 단위로 발급하려고 한다.

    //테이블 생성
    CREATE TABLE TEST2
    (    
        BRANCH_ID BIGINT NOT NULL ,
        HOLIDAY_DATE SMALLDATETIME NOT NULL,
        USE_YN CHAR(1) NOT NULL,
        UPDATE_DATE DATETIME
    )
    //데이터 생성
     INSERT INTO TEST2 VALUES(62,'2018-03-01','N',CURRENT_TIMESTAMP)
     INSERT INTO TEST2 VALUES(62,'2018-12-25','N',CURRENT_TIMESTAMP)
    
     INSERT INTO TEST2 VALUES(89,'2018-06-06','Y',CURRENT_TIMESTAMP)

    아래와 같은 포맷을 만들어볼 예정..!

    //branch_id 62의 메시지
    {
       "save_date":"24-12-2022 12:58:41",
       "holidays":[
          {
             "BRANCH_ID":62,
             "HOLIDAY_DATE":"2018-03-01T00:00:00",
             "USE_YN":"N",
             "UPDATE_DATE": "2022-12-23T00:43:32.260"
          },
          {
             "BRANCH_ID":62,
             "HOLIDAY_DATE":"2018-12-25T00:00:00",
             "USE_YN":"N",
             "UPDATE_DATE": "2022-12-23T00:43:32.260"
          }
       ]
    }
    //branch_id 89의 메시지
    {
       "save_date":"24-12-2022 12:58:41",
       "holidays":[
          {
             "BRANCH_ID":89,
             "HOLIDAY_DATE":"2018-08-15T00:00:00",
             "USE_YN":"N",
             "UPDATE_DATE": "2022-12-23T00:43:32.260"
          }
       ]
    }

    단순히 jdbc plugin을 사용하면 쿼리 결과를 행별로 처리하므로 위와 같은 메시지 생성이 불가능하다.

    그러므로 mssql 자체에서 FOR JSON을 사용해서 PK별로 json array형태로 메시지를 생성하는 쿼리를 작성해야한다.

     

    참조: https://dbrang.tistory.com/1175

     

    [SQL2016] JSON을 활용한 집계처리 및 JSON의 NULL 확인 함수 - dBRang

    /******************************************************************************************************************* -- Title : [SQL2016] JSON을 활용한 집계처리 및 JSON의 NULL 확인 함수 - dBRang -- Reference : dBRang -- Key word : for json au

    dbrang.tistory.com

     

    SELECT BRANCH_ID,CONCAT(SAVE_DATE,STUFF(js_root_code,1,1,'')) AS final_js_code 
    FROM (
    	select BRANCH_ID AS BRANCH_ID,CONCAT('{"SAVE_DATE": "', FORMAT(CURRENT_TIMESTAMP,'dd-MM-yyyy hh:mm:ss'),'",') AS SAVE_DATE,js_root_code=(
    		
    				select BRANCH_ID AS BRANCH_ID,HOLIDAY_DATE AS HOLIDAY_DATE, USE_YN AS USE_YN,  UPDATE_DATE AS UPD_DATE
    				from dbo.TEST2 with(nolock) 
    				WHERE A.BRANCH_ID = BRANCH_ID FOR JSON PATH,ROOT('holidays')		
    				)
    	FROM dbo.TEST2 A WITH(NOLOCK)
    	GROUP BY A.BRANCH_ID
    ) AA

    쿼리 결과

    PK 단위로 구분되어 JSON ARRAY가 생성되었다.

    //메시지1
    {
       "SAVE_DATE":"24-12-2022 02:04:25",
       "holidays":[
          {
             "BRANCH_ID":62,
             "HOLIDAY_DATE":"2018-03-01T00:00:00",
             "USE_YN":"N",
             "UPD_DATE":"2022-12-24T02:03:32.667"
          },
          {
             "BRANCH_ID":62,
             "HOLIDAY_DATE":"2018-12-25T00:00:00",
             "USE_YN":"N",
             "UPD_DATE":"2022-12-24T02:03:32.673"
          }
       ]
    }
    //메시지2
    {
       "SAVE_DATE":"24-12-2022 02:04:25",
       "holidays":[
          {
             "BRANCH_ID":89,
             "HOLIDAY_DATE":"2018-06-06T00:00:00",
             "USE_YN":"Y",
             "UPD_DATE":"2022-12-24T02:03:32.677"
          }
       ]
    }

     

    Logstash Config 작성

    input {
       jdbc {
         jdbc_driver_library => "/home/jaeshim/docker/sqljdbc_11.2/kor/mssql-jdbc-11.2.1.jre11.jar"
         jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
         jdbc_connection_string => "jdbc:sqlserver://127.0.0.1:1433;databaseName=JAESHIM;integratedSecurity=false;encrypt=true;trustServerCertificate=true;"
         jdbc_user => "sa"
         jdbc_password => "password1!"
         tracking_column => update_date
         tracking_column_type => "timestamp"
         last_run_metadata_path => "/home/jaeshim/docker/logstash/last-run-20221223.txt"
         record_last_run => true
         statement => "위 쿼리 줄바꿈 없이 붙여넣기"
         schedule => "*/10 * * * * *"
         codec => json{}
       }
     }
     
    filter {
     
    }
    output {
       redis_set{
         host => "127.0.0.1"
         port => 6379
         action => HSET
         key => "test:%{branch_id}"
         field => "%{branch_id}"
         value => "%{final_js_code}"
       }
       stdout { codec => rubydebug }
    }

    Logstash 구동

    ./logstash -f logstash_mssql-redis-json.conf

    Redis 확인

    $ hget exshop:branch:mapping:branch-id:89 89
    "{\\\"SAVE_DATE\\\": \\\"24-12-2022 01:07:10\\\",\"holidays\":[{\"BRANCH_ID\":89,\"HOLIDAY_DATE\":\"2018-06-06T00:00:00\",\"USE_YN\":\"Y\",\"UPDATE_DATE\":\"2022-12-23T06:14:13.797\"}]}"

    'Data Pipeline > Logstash' 카테고리의 다른 글

    Logstash로 Kafka 토픽 복제해보기  (0) 2023.06.20
    Logstash를 이용한 Mongdb - Kafka 파이프라인 구성  (0) 2022.10.29
    Logstash  (0) 2022.10.29