
Schema-Registry + Validation

    Confluent 5.4 버전부터 지원하는 기능으로 Kafka에 등록된 토픽이 유효한 Schema Registry ID를 갖고 있는지 확인 할 수 있다. 

    이를 통해 중앙 집중식 토픽 정책 적용을 하기위한 기반을 마련할 수 있게 되었다. 

    아래처럼 validation = true로 지정하여 검증할 수 있다.


    Schema Validation


    중앙 집중화된 데이터 거버넌스가 중요한 이유 

    데이터 거버넌스는 한 기업의 데이터를 공식적이고 적절하게 관리하여 책임성과 전송성을 확보할 수 있도록 한다. 즉, 조직끼리 데이터의 생성, 전송, 해석 등이 동일하게 관리될 수 있다.

    이벤트 스트리밍에서는 아래와 같은 뉘앙스를 가질 수 있다.


    • 모든 사람이 이해할 수 있는 하나의 구조를 가질 수 있게 된다.
    • P2 메시지 (개인정보) 등이 조직의 정책을 지키도록 할 수 있다. 
    • 클러스터에 연결하는 모든 클라이언트가 적절한 프로토콜을 사용할 수 있도록 한다. 


    데이터 거버넌스 정책관리는 플랫폼 자체에서 하는 것이 가장좋다. 즉, 클라이언트를 일일이 검사할 필요없이 플랫폼에서 이를 검사하면 된다.

    카프카에서는 클라이언트들이 Avro, JSON, Protobuf와 같은 표준 데이터 형식을 사용하고, 클라이언트들은 이 메시지들을 직렬화/역직렬화 하기위해 이 같은 스키마에 동의하고 사용하게 된다.

    이 같은 스키마를 Schema-Registry가 관리하도록하여 데이터의 호환성을 보장하여 클라이언트 간 주고받는 메시지를 좀 더 수월하게 관리할 수 있다. 


    하지만 이 같은 데이터 호환성은 사실 클라이언트끼리만 협의하는 것이라 Broker에 그냥 쓰게되면 이걸 막을 수 있는 방법은 따로 없다. 

    → Schema Validation이 필요

    Schema Validation

    Schema Validation을 하려면 Confluent Server에서 이를 알아야 한다. 

    (confluent.value.schema.validation = true로 설정)


    Confluent Server는 엔터프라이즈 기능을 포함하는 Kafka 서버를 말하며, 검증의 경우 메시지별로 스키마 검증을 하기 때문에 잠재적으로 상당한 오버헤드가 발생할 수 있다.


    메시지 검사 단계

    • Broker 수준에서 conflent.schema.registry.url을 추가하여 Schema Registry에 접근하여 스키마 정보를 읽어 올 수 있도록 설정.
    • Topic에서 Schema Validation을 설정 confluent.key.schema.validation = true, confluent.value.schema.validation = true



    Broker에 설정


    토픽 생성 시

    kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \
    --partitions 1 --topic movies \
    --config confluent.value.schema.validation=true


    [Schema Registry 사용으로 인한 이점]

    Safe decoupling

    • 누군가가 "date" 필드를 "unix timestamp"로 변경하면 우리 대시보드에 보고된다? 

    Safe evolution

    • 새로운 필드를 추가하고 싶은데, 모든 consumer에 잠재적인 영향을 주는 것을 원치않는다.

    P2 data protection

    • 우리 감사팀이 우리에게 P2 데이터가 없는지 검사하려고 하는데, 이 모든 메시지의 메타데이터를 얻을 수 있다. 

    Data discovery

    • 개발팀이 개발하려고 하는데, 도메인지식 등이 없으면 Kafka에 어떤 데이터가 있는지 쉽게 파악하기 어렵다


    [Terminology (용어정리)]


    Schema: 메시지의 구조를 정의 

    Subject: 스키마가 진화(evolve) 할 수 있는 범위를 정의


    [Schema Registry 동작방식]

    Schema-Registry Architecture



    • 로컬 캐시에 스키마가 없으면 스키마 등록 요청을 한다.
    • 등록이 완료되면 아이디를 받게되며 데이터를 id와 함께 직렬화하여 전송한다. (id는 헤더에 담는다)


    • 로컬 캐시에 스키마가 없으면 id기반으로 스키마를 받아온다.
    • 스키마 id별로 역직렬화된 데이터를 받아온다.





    [스키마 정의]

    스키마 id는 auto-incremented 되는 integer값으로 늘어난다. 

     "namespace": "io.confluent.examples.clients.basicavro",
     "type": "record",
     "name": "Payment",
     "fields": [
         {"name": "id", "type": "string"},
         {"name": "amount", "type": "double"}


    [Subject Naming Strategies]


    serializer는 스키마를 Schema Registry에 subject 이름으로 등록하며, 이 이름은 registry에 있는 namespace에 정의된다. 


    • Compatibility (호환성) 검사는 subject단위로 수행된다.
    • Version은 subject와 연결되어 있다. 
    • Schema가 진화해도 동일한 subject에 연결되어 있지만 새로운 스키마 ID와 버전을 가져오게 된다. 



    subject 이름은 아래 3가지 전략에 달려있다. 



    • TopicNameStrategy (Default): Topic 이름에서 subject 명을 정한다
    • RecordNameStrategy: avro의 record type이름으로 부터 subject가 정해지며 이는 subject아래에서 다른 데이터 구조를 가지는 이벤트와 연관된 논리적 그룹을 구성하는 방법을 제공한다.
    • TopicRecordNameStrategy: 



    동일 토픽 내의 모든 메시지가 동일한 스키마를 따라야 한다. 

    토픽 이름 단위로 메시지를 그룹핑 할 때 좋다.

    논리적인 방식으로 토픽이름을 지정하면 스키마이름이 그것을 따르게 된다.


    RecordNameStrategy + TopicRecordNameStrategy

    토픽 단위의 그룹핑이 적절하지 않은 경우에 대한 스키마 관리를 지원한다.

    (ex: 하나의 토픽 내에서 여러개의 스키마를 가지는 레코드가 있을 수 있는데, 그 경우 사용하는 예시인듯)

    만약 데이터가 시간 순의 이벤트 시퀀스를 나타내고, 이 메시지들의 서로 다른 구조를 가질 때 유용하다. 

    ex: 어떤 사건이 있다면 그것이 시간순으로 일어날 수 있는 케이스가 있다.

    → Payment 서비스에서 "고객 생성, 고객주소변경, 고객계정에 신용카드추가"  이런 과정은 시간순으로 발생하게 되는데, 이것은 시간순으로 반드시 처리되어야하기 때문에 동일토픽, 동일파티션에 데이터가 들어가야 한다. 





    [Schema Compatibility]

    스키마 Evolution시 호환성을 어떻게 보장할 것인가?

    Schema-Registry Compatibility





    • Forward
    • Backward
    • Full
    • Transitive forward, backward, full

    Forward compatibility

    새로운 스키마를 사용하여 생성된 데이터는 새 스키마의 모든 기능을 사용하지 못하더라도 consumer에서 이전 스키마를 사용하여 읽을 수 있다. 

    많이쓰는 방법이며 반드시 Producer가 먼저 배포되어야 한다.

    특정 필드가 삭제될 때 해당 필드는 이전 스키마에서 default value를 가지고 있었어야 한다.  


    새로운 필드를 추가할 때 

    새로운 필드 없이 이벤트를 처리하는 consumer는 계속 그 동작을 할 수 있음.

    옵션 필드를 삭제할 때

    consumer는 새 데이터를 계속 읽을 수 있고, 삭제된 필드에서는 기본값을 사용한다. 


    Backward compatibility (Default)

    덜 대중적인 방법

    consumer를 먼저 배포되어야 한다. 

    새로운 스키마에 필드가 추가되었는데, 해당 필드에 default value가 없다면 오류가 발생한다. 



    필드를 삭제할 때

    해당 필드 없이 이벤트를 처리하도록 개발된 consumer는 해당 필드를 무시한다. 


    옵션 필드를 추가할 때

    기본 값을 지정해야 한다

    역직렬화 시 누락된 필드에는 기본값이 사용된다.


    Full compability

    Forward, Backward 둘 다에 해당한다.


    이전 데이터는 새 스키마로 읽을 수 있음

    새로운 데이터는 구버전 스키마로 읽을 수 있음

    완벽한 호환성을 위한 가장 쉬운방법

    모든 필드에 기본값을 사용해야 한다. 

    Producer, Consumer가 순서에 상관없이 배포될 수 있다. 


    JSON Schema 에서는 사용할 수 없다고 한다. 


    Tips for writing schemas

    • 삭제될 가능성이 있는 필드라면 default value를 지정하라. 해당 필드에 데이터가 들어오지 않아도 에러가 발생하지 않는다.
    • 추가되는 필드라면 default value가 지정되어야 한다
    • enum은 변경가능성이 없을 때만 사용한다.
    • 필드의 이름은 가능한 변경하지 않는다.



    [Serialization Formats]


    카프카에서는 표준.

    모든 Schema compatibility type과 Schema Registry 기능들을 지원한다. 

    Schema Registry 튜토리얼의 예제로 활용된다.  


    Confluent Platform 5.5 이상부터 지원

    Record 및 TopicRecord Naming Strategies에서는 스키마 이름에 메시지 이름이 사용된다 (무슨말인지 모르겠음)


    Confluent Platform 5.5 및 Confluent Clould에서 사용가능

    Record 및 TopicRecord Naming Strategies에서는 스키마 이름에 "schema"가 사용된다.

    Generic Type은 JsonNode






