목차
[개요]
MSSQL의 데이터를 Redis로 Sync하고 싶은데, 이 때 logstash를 활용해보려고 한다.
update일자를 기준으로 최신데이터가 INSERT되거나 UPDATE되면 이를 logstash가 감지해서 Redis에 계속해서 업데이트해주는 파이프라인을 구축해보는 것이 목표.
[로컬 세팅: Docker container로 MSSQL, Redis 구동]
Kafka도 활용할 수 있을 것 같아 Kafka도 추가해놓았다.
M1 맥북에서는 azure sql을 사용해야 한다고 한다.
services:
sqlserver:
image: mcr.microsoft.com/azure-sql-edge:latest
user: root
container_name: mssql
hostname: mssql
ports:
- 1433:1433
volumes:
- ./data:/var/opt/mssql/data
environment:
ACCEPT_EULA: Y
SA_PASSWORD: password1!
[Logstash 다운로드]
https://www.elastic.co/kr/downloads/logstash
docker로 돌릴거면 docker-compose에 구성해서 돌려도 됨.
추가플러그인 설치가 필요한 경우 docker build를 통해 커스텀 이미지를 만들어야 한다.
[Logstash 구성 및 실행 테스트]
로컬에서 logstash 구동이 되는지 우선 확인해보자.
MSSQL Jdbc Driver 다운로드
jdbc plugin에서 MSSQL에 접근하기 위해 MSSQL Jdbc driver 다운로드가 필요하다.
다운받고 적절한 경로에 jar파일을 옮겨놓도록 한다.
logstash-output-redis-set 플러그인 다운로드
기본 logstash-output-redis 플러그인은 data type을 list와 channel (pub-sub 에서 사용) 만 지원한다. HashMap이 필요한 경우 logstash-output-redis-set 이라는 플러그인 설치가 필요하다.
logstash-output-redis-set: https://github.com/feix/logstash-output-redis-set
$ ./logstash-plugin install logstash-output-redis-set
logstash Config 파일 생성
nput {
stdin{
}
}
filter {
}
output {
stdout{
codec => rubydebug
}
}
실행
#이 명령어가 수행되려면 logstash.conf가 한 단계 상위 디렉토리에 있어야 한다.
logstash/logstash-8.5.3/bin$ ./logstash -f logstash.conf
# 123입력
123
# 아래처럼 나오면 성공
{
"message" => "123",
"@version" => "1",
"event" => {
"original" => "123"
},
"@timestamp" => 2022-12-17T04:56:58.226660200Z,
"host" => {
"hostname" => "D-045041-00"
}
}
[예제]
예제1: MSSQL 쿼리 결과의 단순 JSON으로 Redis에 INSERT/UPDATE 하기
logstash jdbc input plugin의 경우 쿼리 결과를 행별로 처리하며, 이를 단순 Json으로 처리할 수 있다.
MSSQL의 쿼리결과를 아래처럼 JSON으로 만들 예정.
{
"branch_id": 1,
"use_yn": "Y",
"update_date": "2022-01-31 23:59:59"
}
MSSQL 테이블 생성
CREATE TABLE TEST
(
BRANCH_ID BIGINT NOT NULL PRIMARY KEY,
USE_YN CHAR(1) NOT NULL,
UPDATE_DATE DATETIME2
)
Logstash Config 작성
update일자 기준으로 데이터를 계속해서 싱크해 올 것이기 때문에 추적할 컬럼, 마지막으로 읽어온 날짜 등의 값을 계속해서 추적해야 한다. 그래서 아래와 같은 config들이 필요하다.
- tracking_column
- tracking_column_type
- last_run_metadata_path
- record_last_run
input {
jdbc {
jdbc_driver_library => "/home/jaeshim/docker/sqljdbc_11.2/kor/mssql-jdbc-11.2.1.jre11.jar" //mssql jdbc 드라이버 파일 위치
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://127.0.0.1:1433;databaseName=JAESHIM;integratedSecurity=false;encrypt=true;trustServerCertificate=true;"
jdbc_user => "sa"
jdbc_password => "password1!"
tracking_column => update_date //추적할 컬럼명
tracking_column_type => "timestamp" //추적할 컬럼 타입
last_run_metadata_path => "/home/jaeshim/docker/logstash/last-run.txt" //마지막으로 체크한 컬럼값
record_last_run => true //마지막으로 가져온 값을 사용하겠다고 선언. true로 선언할 경우 :sql_last_value를 통해 쿼리에서 사용할 수 있다.
statement => "select BRANCH_ID BRANCH_ID, USE_YN AS USE_YN,UPDATE_DATE AS UPDATE_DATE from dbo.TEST with(nolock) where update_date > :sql_last_value"
schedule => "*/10 * * * * *" //실행 주기 (10초)
}
}
filter {
mutate{
remove_field => ["@timestamp","@version"]
}
}
output {
redis_set{
host => "127.0.0.1"
port => 6379
action => HSET //Redis Data Type
key => "test:%{branch_id}" //Key
field => "%{branch_id}" //HashMap의 Key
value => "{\"branch_id\": \"%{branch_id}\",\"use_yn\": \"%{use_yn}\"}" //HashMap의 Value
}
stdout { codec => rubydebug }
}
MSSQL 데이터 생성
#데이터 생성
insert into dbo.TEST values (1,'Y',CURRENT_TIMESTAMP)
#SELECT
select top 100 * from dbo.TEST with(nolock)
Logstash 구동
$ ./logstash -f logstash.conf
###############
[2022-12-17T14:10:11,290][INFO ][logstash.inputs.jdbc ][main][528582c301a9971fecd96a6ce6c0de2cb6e52507efd957f3915ba256d08c2396] (0.029230s) select BRANCH_ID BRANCH_ID, USE_YN AS USE_YN, UPDATE_DATE AS UPDATE_DATE from dbo.TEST with(nolock) where update_date > '1970-01-01T00:00:00.000'
{
"update_dt" => 2022-12-15T20:24:18.990Z,
"use_yn" => "Y",
"branch_id" => 1
}
last-run 파일 확인
이 파일에서 마지막으로 체크한 update_date 시간을 얻어온다.
이 파일을 삭제할 경우 현재시간 기준으로 새로 체크하는 방식으로 동작한다.
--- 2022-12-17 14:10:10.784286000 Z
Redis 확인
Redis를 docker에서 구동했으므로 docker exec -it 커맨드를 이용해서 컨테이너 이미지 내부 접속 후 redis cli를 통해 확인해야 한다.
Redis 주요 커맨드
- 기본커맨드: https://freeblogger.tistory.com/10
- 해시 관련 커맨드: https://m.blog.naver.com/PostView.naver?isHttpsRedirect=true&blogId=wideeyed&logNo=221428664697
$ redis-cli
# 전체 키 조회 시 1번 branch_id로 key가 생긴 것 확인
keys *
1) "test:1"
# HGET으로 HashMap 내부 확인
127.0.0.1:6379> HGET test:1 1
"{\\\"branch_id\\\": \\\"1\\\", \\\"use_yn\\\": \\\"Y\\\"}"
이슈
플러그인이 쿼리결과를 행별로 처리하기 때문에 아래와 같이 PK를 단위로 묶어서 json array로 넣는것은 위 예제로 불가능하다.
아래 예제2에서 MSSQL의 FOR JSON기능을 이용해서 풀어보려함.
[ {
"branch_id": 1005,
"holiday_date": "2022-12-25 00:00:00",
"use_yn": "Y",
"update_date": "2022-01-01 00:00:00"
},
{
"branch_id": 1005,
"holiday_date": "2022-08-15 00:00:00",
"use_yn": "Y",
"update_date": "2022-01-31 23:59:59"
},
]
예제2. MSSQL의 쿼리 결과를 PK단위로 묶어 JSON ARRAY로 생성하기
아래처럼 키 단위 (branch_id)로 JSON ARRAY로 묶어 Redis에 키 단위로 발급하려고 한다.
//테이블 생성
CREATE TABLE TEST2
(
BRANCH_ID BIGINT NOT NULL ,
HOLIDAY_DATE SMALLDATETIME NOT NULL,
USE_YN CHAR(1) NOT NULL,
UPDATE_DATE DATETIME
)
//데이터 생성
INSERT INTO TEST2 VALUES(62,'2018-03-01','N',CURRENT_TIMESTAMP)
INSERT INTO TEST2 VALUES(62,'2018-12-25','N',CURRENT_TIMESTAMP)
INSERT INTO TEST2 VALUES(89,'2018-06-06','Y',CURRENT_TIMESTAMP)
아래와 같은 포맷을 만들어볼 예정..!
//branch_id 62의 메시지
{
"save_date":"24-12-2022 12:58:41",
"holidays":[
{
"BRANCH_ID":62,
"HOLIDAY_DATE":"2018-03-01T00:00:00",
"USE_YN":"N",
"UPDATE_DATE": "2022-12-23T00:43:32.260"
},
{
"BRANCH_ID":62,
"HOLIDAY_DATE":"2018-12-25T00:00:00",
"USE_YN":"N",
"UPDATE_DATE": "2022-12-23T00:43:32.260"
}
]
}
//branch_id 89의 메시지
{
"save_date":"24-12-2022 12:58:41",
"holidays":[
{
"BRANCH_ID":89,
"HOLIDAY_DATE":"2018-08-15T00:00:00",
"USE_YN":"N",
"UPDATE_DATE": "2022-12-23T00:43:32.260"
}
]
}
단순히 jdbc plugin을 사용하면 쿼리 결과를 행별로 처리하므로 위와 같은 메시지 생성이 불가능하다.
그러므로 mssql 자체에서 FOR JSON을 사용해서 PK별로 json array형태로 메시지를 생성하는 쿼리를 작성해야한다.
참조: https://dbrang.tistory.com/1175
SELECT BRANCH_ID,CONCAT(SAVE_DATE,STUFF(js_root_code,1,1,'')) AS final_js_code
FROM (
select BRANCH_ID AS BRANCH_ID,CONCAT('{"SAVE_DATE": "', FORMAT(CURRENT_TIMESTAMP,'dd-MM-yyyy hh:mm:ss'),'",') AS SAVE_DATE,js_root_code=(
select BRANCH_ID AS BRANCH_ID,HOLIDAY_DATE AS HOLIDAY_DATE, USE_YN AS USE_YN, UPDATE_DATE AS UPD_DATE
from dbo.TEST2 with(nolock)
WHERE A.BRANCH_ID = BRANCH_ID FOR JSON PATH,ROOT('holidays')
)
FROM dbo.TEST2 A WITH(NOLOCK)
GROUP BY A.BRANCH_ID
) AA
쿼리 결과
//메시지1
{
"SAVE_DATE":"24-12-2022 02:04:25",
"holidays":[
{
"BRANCH_ID":62,
"HOLIDAY_DATE":"2018-03-01T00:00:00",
"USE_YN":"N",
"UPD_DATE":"2022-12-24T02:03:32.667"
},
{
"BRANCH_ID":62,
"HOLIDAY_DATE":"2018-12-25T00:00:00",
"USE_YN":"N",
"UPD_DATE":"2022-12-24T02:03:32.673"
}
]
}
//메시지2
{
"SAVE_DATE":"24-12-2022 02:04:25",
"holidays":[
{
"BRANCH_ID":89,
"HOLIDAY_DATE":"2018-06-06T00:00:00",
"USE_YN":"Y",
"UPD_DATE":"2022-12-24T02:03:32.677"
}
]
}
Logstash Config 작성
input {
jdbc {
jdbc_driver_library => "/home/jaeshim/docker/sqljdbc_11.2/kor/mssql-jdbc-11.2.1.jre11.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://127.0.0.1:1433;databaseName=JAESHIM;integratedSecurity=false;encrypt=true;trustServerCertificate=true;"
jdbc_user => "sa"
jdbc_password => "password1!"
tracking_column => update_date
tracking_column_type => "timestamp"
last_run_metadata_path => "/home/jaeshim/docker/logstash/last-run-20221223.txt"
record_last_run => true
statement => "위 쿼리 줄바꿈 없이 붙여넣기"
schedule => "*/10 * * * * *"
codec => json{}
}
}
filter {
}
output {
redis_set{
host => "127.0.0.1"
port => 6379
action => HSET
key => "test:%{branch_id}"
field => "%{branch_id}"
value => "%{final_js_code}"
}
stdout { codec => rubydebug }
}
Logstash 구동
./logstash -f logstash_mssql-redis-json.conf
Redis 확인
$ hget exshop:branch:mapping:branch-id:89 89
"{\\\"SAVE_DATE\\\": \\\"24-12-2022 01:07:10\\\",\"holidays\":[{\"BRANCH_ID\":89,\"HOLIDAY_DATE\":\"2018-06-06T00:00:00\",\"USE_YN\":\"Y\",\"UPDATE_DATE\":\"2022-12-23T06:14:13.797\"}]}"
'Data Pipeline > Logstash' 카테고리의 다른 글
Logstash로 Kafka 토픽 복제해보기 (0) | 2023.06.20 |
---|---|
Logstash를 이용한 Mongdb - Kafka 파이프라인 구성 (0) | 2022.10.29 |
Logstash (0) | 2022.10.29 |