목차
[Schema-Validation]
Confluent 5.4 버전부터 지원하는 기능으로 Kafka에 등록된 토픽이 유효한 Schema Registry ID를 갖고 있는지 확인 할 수 있다.
이를 통해 중앙 집중식 토픽 정책 적용을 하기위한 기반을 마련할 수 있게 되었다.
아래처럼 validation = true로 지정하여 검증할 수 있다.
중앙 집중화된 데이터 거버넌스가 중요한 이유
데이터 거버넌스는 한 기업의 데이터를 공식적이고 적절하게 관리하여 책임성과 전송성을 확보할 수 있도록 한다. 즉, 조직끼리 데이터의 생성, 전송, 해석 등이 동일하게 관리될 수 있다.
이벤트 스트리밍에서는 아래와 같은 뉘앙스를 가질 수 있다.
- 모든 사람이 이해할 수 있는 하나의 구조를 가질 수 있게 된다.
- P2 메시지 (개인정보) 등이 조직의 정책을 지키도록 할 수 있다.
- 클러스터에 연결하는 모든 클라이언트가 적절한 프로토콜을 사용할 수 있도록 한다.
데이터 거버넌스 정책관리는 플랫폼 자체에서 하는 것이 가장좋다. 즉, 클라이언트를 일일이 검사할 필요없이 플랫폼에서 이를 검사하면 된다.
카프카에서는 클라이언트들이 Avro, JSON, Protobuf와 같은 표준 데이터 형식을 사용하고, 클라이언트들은 이 메시지들을 직렬화/역직렬화 하기위해 이 같은 스키마에 동의하고 사용하게 된다.
이 같은 스키마를 Schema-Registry가 관리하도록하여 데이터의 호환성을 보장하여 클라이언트 간 주고받는 메시지를 좀 더 수월하게 관리할 수 있다.
하지만 이 같은 데이터 호환성은 사실 클라이언트끼리만 협의하는 것이라 Broker에 그냥 쓰게되면 이걸 막을 수 있는 방법은 따로 없다.
→ Schema Validation이 필요
Schema Validation
Schema Validation을 하려면 Confluent Server에서 이를 알아야 한다.
(confluent.value.schema.validation = true로 설정)
Confluent Server는 엔터프라이즈 기능을 포함하는 Kafka 서버를 말하며, 검증의 경우 메시지별로 스키마 검증을 하기 때문에 잠재적으로 상당한 오버헤드가 발생할 수 있다.
메시지 검사 단계
- Broker 수준에서 conflent.schema.registry.url을 추가하여 Schema Registry에 접근하여 스키마 정보를 읽어 올 수 있도록 설정.
- Topic에서 Schema Validation을 설정 confluent.key.schema.validation = true, confluent.value.schema.validation = true
예시
Broker에 설정
confluent.schema.registry.url=http://schema-registry:8081
|
토픽 생성 시
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \
--partitions 1 --topic movies \
--config confluent.value.schema.validation=true
|
[Schema Registry 사용으로 인한 이점]
Safe decoupling
- 누군가가 "date" 필드를 "unix timestamp"로 변경하면 우리 대시보드에 보고된다?
Safe evolution
- 새로운 필드를 추가하고 싶은데, 모든 consumer에 잠재적인 영향을 주는 것을 원치않는다.
P2 data protection
- 우리 감사팀이 우리에게 P2 데이터가 없는지 검사하려고 하는데, 이 모든 메시지의 메타데이터를 얻을 수 있다.
Data discovery
- 개발팀이 개발하려고 하는데, 도메인지식 등이 없으면 Kafka에 어떤 데이터가 있는지 쉽게 파악하기 어렵다
[Terminology (용어정리)]
Topic
Schema: 메시지의 구조를 정의
Subject: 스키마가 진화(evolve) 할 수 있는 범위를 정의
Serialization
[Schema Registry 동작방식]
Producer
- 로컬 캐시에 스키마가 없으면 스키마 등록 요청을 한다.
- 등록이 완료되면 아이디를 받게되며 데이터를 id와 함께 직렬화하여 전송한다. (id는 헤더에 담는다)
Consumer
- 로컬 캐시에 스키마가 없으면 id기반으로 스키마를 받아온다.
- 스키마 id별로 역직렬화된 데이터를 받아온다.
[Tutorial]
https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html
[스키마 정의]
스키마 id는 auto-incremented 되는 integer값으로 늘어난다.
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
[Subject Naming Strategies]
serializer는 스키마를 Schema Registry에 subject 이름으로 등록하며, 이 이름은 registry에 있는 namespace에 정의된다.
- Compatibility (호환성) 검사는 subject단위로 수행된다.
- Version은 subject와 연결되어 있다.
- Schema가 진화해도 동일한 subject에 연결되어 있지만 새로운 스키마 ID와 버전을 가져오게 된다.
subject 이름은 아래 3가지 전략에 달려있다.
- TopicNameStrategy (Default): Topic 이름에서 subject 명을 정한다
- RecordNameStrategy: avro의 record type이름으로 부터 subject가 정해지며 이는 subject아래에서 다른 데이터 구조를 가지는 이벤트와 연관된 논리적 그룹을 구성하는 방법을 제공한다.
- TopicRecordNameStrategy:
TopicNameStrategy
동일 토픽 내의 모든 메시지가 동일한 스키마를 따라야 한다.
토픽 이름 단위로 메시지를 그룹핑 할 때 좋다.
논리적인 방식으로 토픽이름을 지정하면 스키마이름이 그것을 따르게 된다.
RecordNameStrategy + TopicRecordNameStrategy
토픽 단위의 그룹핑이 적절하지 않은 경우에 대한 스키마 관리를 지원한다.
(ex: 하나의 토픽 내에서 여러개의 스키마를 가지는 레코드가 있을 수 있는데, 그 경우 사용하는 예시인듯)
만약 데이터가 시간 순의 이벤트 시퀀스를 나타내고, 이 메시지들의 서로 다른 구조를 가질 때 유용하다.
ex: 어떤 사건이 있다면 그것이 시간순으로 일어날 수 있는 케이스가 있다.
→ Payment 서비스에서 "고객 생성, 고객주소변경, 고객계정에 신용카드추가" 이런 과정은 시간순으로 발생하게 되는데, 이것은 시간순으로 반드시 처리되어야하기 때문에 동일토픽, 동일파티션에 데이터가 들어가야 한다.
https://www.confluent.io/blog/put-several-event-types-kafka-topic/
[Schema Compatibility]
스키마 Evolution시 호환성을 어떻게 보장할 것인가?
https://docs.confluent.io/platform/current/schema-registry/avro.html#compatibility-types
Types
- Forward
- Backward
- Full
- Transitive forward, backward, full
Forward compatibility
새로운 스키마를 사용하여 생성된 데이터는 새 스키마의 모든 기능을 사용하지 못하더라도 consumer에서 이전 스키마를 사용하여 읽을 수 있다.
많이쓰는 방법이며 반드시 Producer가 먼저 배포되어야 한다.
특정 필드가 삭제될 때 해당 필드는 이전 스키마에서 default value를 가지고 있었어야 한다.
새로운 필드를 추가할 때
새로운 필드 없이 이벤트를 처리하는 consumer는 계속 그 동작을 할 수 있음.
옵션 필드를 삭제할 때
consumer는 새 데이터를 계속 읽을 수 있고, 삭제된 필드에서는 기본값을 사용한다.
Backward compatibility (Default)
덜 대중적인 방법
consumer를 먼저 배포되어야 한다.
새로운 스키마에 필드가 추가되었는데, 해당 필드에 default value가 없다면 오류가 발생한다.
필드를 삭제할 때
해당 필드 없이 이벤트를 처리하도록 개발된 consumer는 해당 필드를 무시한다.
옵션 필드를 추가할 때
기본 값을 지정해야 한다
역직렬화 시 누락된 필드에는 기본값이 사용된다.
Full compability
Forward, Backward 둘 다에 해당한다.
이전 데이터는 새 스키마로 읽을 수 있음
새로운 데이터는 구버전 스키마로 읽을 수 있음
완벽한 호환성을 위한 가장 쉬운방법
모든 필드에 기본값을 사용해야 한다.
Producer, Consumer가 순서에 상관없이 배포될 수 있다.
JSON Schema 에서는 사용할 수 없다고 한다.
Tips for writing schemas
- 삭제될 가능성이 있는 필드라면 default value를 지정하라. 해당 필드에 데이터가 들어오지 않아도 에러가 발생하지 않는다.
- 추가되는 필드라면 default value가 지정되어야 한다
- enum은 변경가능성이 없을 때만 사용한다.
- 필드의 이름은 가능한 변경하지 않는다.
[Serialization Formats]
Avro
카프카에서는 표준.
모든 Schema compatibility type과 Schema Registry 기능들을 지원한다.
Schema Registry 튜토리얼의 예제로 활용된다.
Protobuf
Confluent Platform 5.5 이상부터 지원
Record 및 TopicRecord Naming Strategies에서는 스키마 이름에 메시지 이름이 사용된다 (무슨말인지 모르겠음)
JSON
Confluent Platform 5.5 및 Confluent Clould에서 사용가능
Record 및 TopicRecord Naming Strategies에서는 스키마 이름에 "schema"가 사용된다.
Generic Type은 JsonNode
[참조]
https://docs.confluent.io/platform/current/schema-registry/index.html
'Kafka > Schema-Registry' 카테고리의 다른 글
Schema-Registry Naming Strategy (0) | 2023.05.05 |
---|---|
Schema-Registry Compatibility (0) | 2022.10.31 |
Schema-Registry : Consumer (0) | 2022.10.31 |
Schema-Registry : Producer (0) | 2022.10.31 |
Schema-Registry? (0) | 2022.10.31 |