Kafka 50

Schema-Registry Compatibility

목차 [Compatibility] 호환성은 Schema Evolution이 발생할 때 어떻게 변화할 수 있는가에 대한 것이다. 기본적으로 3가지 전략이 있다. Backward Forward Full Backward Compatibility Consumer V2가 V1을 문제없이 처리가능한 호환성. Consumer 업데이트 → Producer 업데이트 순서로 배포해야한다. 필드추가 (기본값이 있을 때 가능) Consumer가 V2일 때 V1을 처리할 수 있다. #V1 { "Name": "String", "Age": "Int" } #V2 { "Name": "String", "Age": "Int", "Sex": "Char" (Default: M) } 필드 삭제 가능. Consumer가 V2일 때 V1을 문제없..

Schema-Registry + Validation

목차 [Schema-Validation] Confluent 5.4 버전부터 지원하는 기능으로 Kafka에 등록된 토픽이 유효한 Schema Registry ID를 갖고 있는지 확인 할 수 있다. 이를 통해 중앙 집중식 토픽 정책 적용을 하기위한 기반을 마련할 수 있게 되었다. 아래처럼 validation = true로 지정하여 검증할 수 있다. 중앙 집중화된 데이터 거버넌스가 중요한 이유 데이터 거버넌스는 한 기업의 데이터를 공식적이고 적절하게 관리하여 책임성과 전송성을 확보할 수 있도록 한다. 즉, 조직끼리 데이터의 생성, 전송, 해석 등이 동일하게 관리될 수 있다. 이벤트 스트리밍에서는 아래와 같은 뉘앙스를 가질 수 있다. 모든 사람이 이해할 수 있는 하나의 구조를 가질 수 있게 된다. P2 메시지 ..

Schema-Registry : Producer

목차 [주요 Configuration] #schema-registry url spring.kafka.producer.properties.schema.registry.url: http;://localhost:8081 # schema 자동등록 여부 (default: true. 운영에서는 false 권고) spring.kafka.producer.properties.auto.register.schemas: false # 최신버전의 schema만 사용할지 여부 (default: false) spring.kafka.producer.properties.use.latest.version: false # key-serializer spring.kafka.producer.properties.key-serializer: o..

Schema-Registry?

목차 [Overview] Schema Registry는 메타데이터에 대한 서비스를 제공한다. Schema Registry는 Avro, JSON, Protobuf 스키마를 저장하고 검색할 수 있는 RESTful 인터페이스를 제공한다. 특화된 subject name strategy를 기반으로 모든 스키마의 버전을 기록하고, 다양한 compatibility 세팅과 정의된 compatibility를 바탕으로 schema evolution을 허용한다. Schema Registry는 Kafka Broker와는 분리되어 있다. Producer와 Conumer는 메시지를 쓰기위해 여전히 Kafka와 통신하게 된다. 이와 동시에 Schema Registry와 통신하여 메시지의 데이터 모델을 describe하는 스키마를 ..

Fluentd - Kafka 로컬 파이프라인 구축해보기

fluentd를 통한 데이터 파이프라인 구축을 위해서 로컬환경에서 fluentd - kafka로 이어지는 파이프라인을 구축해본다. [Docker Container로 간단히 구동하기] Docker-Image Pull docker pull fluent/fluentd:edge-debian Fluentd 설정 파일 생성 http input을 stdout으로 출력하는 형식으로 테스트한다. $ sudo vim fluentd.conf @type http port 9880 bind 0.0.0.0 @type stdout Docker Run # RUN $ sudo docker run -d --name fluentd -p 9880:9880 -v $(pwd):/fluentd/etc fluent/fluentd:edge-debi..

Apache Flink란?

[Apache Flink?] 분산 스트림 처리를 하는 엔진으로 "unbounded", "bounded" 데이터에 대해 상태가 있는 (stateful) 스트림 처리를 하는데 적합한 프레임 워크 비슷한 프레임워크들 중에 Spark, Kafka Streams, Storm 등이 있음 Spark: 마이크로 배치 Kafka Streams: Data Source로 Kafka만 지원함. Storm: Exactly Once 처리를 지원하지 않음. [Unbounded(무한한)와 Bounded(경계있는) 데이터의 처리 ] 모든 종류의 데이터는 이벤트 스트림으로 생성될 수 있다. ex) 카드거래, 센서측정, 로그, 웹 사이트의 사용자 행동 이러한 데이터는 Unbounded 혹은 Bounded 스트림으로 처리될 수 있다. Un..

Logstash를 이용한 Mongdb - Kafka 파이프라인 구성

목차 [개요] 로컬환경에서 Logstash를 활용하여 Mongodb 데이터를 Kafka로 이관하는 작업을 검증해본다. Source: Mongodb Sink: Kafka [준비물] Docker (Docker-compose 필요) Mongodb, Kafka 등을 컨테이너로 구동할 Docker-Compose.yml 파일 Datagrip: Mongodb에 데이터를 넣고 확인할 도구 Kafka-ui: 카프카 어드민 Logstash [작업순서] Docker-Compose.yml 파일 작성 mongodb, zookeeper, kafka, kafka-ui 구동 Datagrip으로 mongodb 접속 database 생성 collecction (table) 생성 document (row) 생성 logstash 세팅 mo..

Logstash

목차 [Logstash란?] 데이터 파이프라인 구조에서 데이터의 집계, 변환, 저장 역할을 수행함. Logstash는 형식이나 복잡성과 관계 없이 데이터를 동적으로 수집, 전환, 전송할 수 있다고 한다. grok을 이용해 비구조적 데이터에서 구조를 도출하여 IP 주소에서 위치 정보 좌표를 해독하고, 민감한 필드를 익명화하거나 제외시키며, 전반적인 처리를 손쉽게 해주는 역할을 한다. Components Source : 모든 형태, 크기, 소스의 데이터 수집 데이터의 매우 다양한 시스템에 매우 다양한 형태로 보관되어 있다. Logstash는 이같이 다양한 시스템에서 데이터를 가져오는 다양한 소스를 지원한다. Filter : 데이터 이동 과정에서의 구문 분석 및 변환 Logstash의 필터는 데이터가 소스에서..

Kafka 토픽명에 날짜가 포함된 경우 오래된 날짜의 토픽 주기적으로 삭제하기

일배치 작업을 위해 날짜가 포함되어 생성된 Kafka 토픽이 있을 수 있다. ex) topic-20221026 이 경우 시간이 흐를수록 토픽은 쌓이기만하므로 정기적으로 삭제해줄 필요가 있을 수 있다. 아래와 같은 스크립트 작성 후 Jenkins에서 스케줄링하는 방식으로 시도 #30일전 이름을 가진 토픽들을 배열에 담음 OLD_DAY=`date +%Y%m%d -d -30day` TOPICS=(`sudo kafka-topics --bootstrap-server localhost:9092 --list | grep $OLD_DAY`) #배열을 순회하면서 삭제 for topic in ${TOPICS[@]} do sudo kafka-topics --bootstrap-server localhost:9092 --del..

Kafka 2022.10.27