Kafka/Schema-Registry

Schema-Registry Compatibility

재심 2022. 10. 31. 21:12

목차

 

[Compatibility]

     

    Schema-Registry Compatibility

    호환성은 Schema Evolution이 발생할 때 어떻게 변화할 수 있는가에 대한 것이다.

    기본적으로 3가지 전략이 있다.

     

    • Backward
    • Forward
    • Full

    Backward Compatibility

    Consumer V2가 V1을 문제없이 처리가능한 호환성. 

    Consumer 업데이트 → Producer 업데이트 순서로 배포해야한다.

     

    필드추가 (기본값이 있을 때 가능)

    Consumer가 V2일 때 V1을 처리할 수 있다. 

    #V1
    {
        "Name": "String",
        "Age": "Int"
    }
     
    #V2
    {
        "Name": "String",
        "Age": "Int",
        "Sex": "Char" (Default: M)
    }

    필드 삭제 가능.

    Consumer가 V2일 때 V1을 문제없이 처리할 수 있다. 

    #V1
    {
        "Name": "String",
        "Age": "Int"
    }
     
    #V2
    {
        "Name": "String"
    }
     
     

    Forward Compatibility

    Consumer V1을 처리하고 있지만 V2도 문제없이 처리가능한 호환성. 

    가장 일반적이라고 한다.

    Producer 업데이트 → Consumer 업데이트

     

    필드추가

    #V1
    {
        "Name": "String",
        "Age": "Int"
    }
     
    #V2
    {
        "Name": "String",
        "Age": "Int",
        "Sex": "Char"
    }
     

    필드삭제 (기본값이 있는 경우만 가능)

    #V1
    {
        "Name": "String",
        "Age": "Int" (Default: 10)
    }
     
    #V2
    {
        "Name": "String"   
    }

     

    Full Compatibility

    만약 메시지가 V3까지 발전했을 때 Consumer가 V1, V2, V3 모두 처리 가능한 상태. 

    가장 권고되는 호환성. 

    Producer, Consumer 순서 상관없이 업데이트 가능.

     

    필드추가 (기본값이 있는 경우 가능)

    #V1
    {
        "Name": "String",
        "Age": "Int"
     
    }
     
    #V2
    {
        "Name": "String",
        "Age": "Int",
        "Sex": "Char" (Default: M)
    }

    필드삭제 (기본값이 있는 경우 가능)

    #V1
    {
        "Name": "String",
        "Age": "Int" (Default: 10),
        "Sex": "Char" (Default: M)
    }
     
    #V2
    {
        "Name": "String",
        "Age": "Int" (Default: 10)
    }

     

    Compatibility Settings

    추가로 각 전략마다 TRANSITIVE를 붙일 수 있는데, TRANSITIVE가 없는 경우에는 바로 앞의 버전만 체크하는데 TRANSITIVE가 있는 경우 V1,V2,V3,V4가 있는 경우 V4를 등록할 때 V1까지 모두 다 체크하는 전략이다.

     

    => 호환성은 총 7가지로 정리할 수 있다.

     

    • Forward Compatibility
    • Backward Compatibility
    • Full Compatibility
    • Forward Transitive Compatibility
    • Backward Transitive Compatibility
    • Full Transitive Compatibility
    • None

    [권장사항]

    어쩔 수 없이 호환성 보장이 힘든 경우는 새로운 토픽을 만들어 사용하는 것을 권장

    기본값을 왠만하면 부여하자.

    Enum 형식이 있는 경우 변경했을 때 호환성이 무조건 깨진다고 한다. 주의해서 사용이 필요하다.

    필드 이름을 변경하는 경우에도 호환성이 깨지니 alias를 사용하는 것을 권고한다. 

    삭제시에는 기본값이 있는 필드만, 생성할 때도 왠만하면 기본값을 부여하자. 

    가급적이면 Full Compatibility를 사용하자..!

    key에는 스키마 적용을 하지 않는 것을 권고한다고 한다.

     

    [주의사항]

    기본적으로 카프카 라이브러리가 JAVA, C기반 크게 2가지가 있다.

    2가지를 섞어서 사용할 때 호환성 이슈가 있을 수 있으니 주의가 필요하다. (Long, BIGINT)

     

     

     

    [Compatibility Test ]

    참조문서: https://docs.confluent.io/platform/current/schema-registry/avro.html

    TRANSITIVE, NON-TRANSITIVE 설명

    BACKWARD compatibility means that consumers using the new schema can read data produced with the last schema. For example, if there are three schemas for a subject that change in order X-2, X-1, and X then BACKWARD compatibility ensures that consumers using the new schema X can process data written by producers using schema X or X-1, but not necessarily X-2. If the consumer using the new schema needs to be able to process data written by all registered schemas, not just the last two schemas, then use BACKWARD_TRANSITIVE instead of BACKWARD. For example, if there are three schemas for a subject that change in order X-2, X-1, and X then BACKWARD_TRANSITIVE compatibility ensures that consumers using the new schema X can process data written by producers using schema X, X-1, or X-2.
    To understand the BACKWARD and BACKWARD_TRANSTIVE, please check the below:
     
    Suppose for some schema,
    • 
    Version 3 is backward compatible with Version 2
    • 
    Version 2 is backward compatible with Version 1
    This does 
    not 
    mean Version 3 is backward compatible with Version 1
    Want to enforce backward compatibility with 
    all 
    prior versions → set 
    BACKWARD_TRANSITIVE 
    mode.
     
    Here is an example of a set of schemas that are backward compatible, but not backward transitive:
    • 
    Schema V1: latitude, longitude
    • 
    Schema V2: latitude, longitude, altitude (default altitude of 0) 
    • 
    Schema V3: latitude, longitude, altitude (no default)
     
    Consumers with V3 can read messages produced with V2 because all three of latitude, longitude, and altitude would be provided. 
    Consumers with schema V2 could read messages produced with V1 because they would infer a default value of 0 for altitude. 
    However, consumers with version V3 could not read messages produced with V1 because the consumer is expected a value for altitude to be provided and is not providing a default.
     
    This is the same as FORWARD_TRANSITIVE and FULL_TRANSITIVE.
     
    And your scenario is good at the BACKWARD_TRAINSITIVE because schema X can read Schema X-2.
     
    Please let me know if you have any further concerns about this case.

    일단 기본적으로 Compatibility Check는 Schema를 변경할 때 한다. (제일 헷갈렸던 것이 Consumer, Producer에서 Check를 하는거라고 생각했었다)

    그래서 위의 설명에 따르면 Backward 정책에서 Schema Evolution이 V1 → V2->V3순서로 발생했다고 가정하면, V1→V2, V2->V3로 evolution 될 때 compatibility에 문제가 없었을 것이다.

    그런데, 이 때 V1과 V3를 비교하면 V1→ V3간에는 Backward Compatibility를 만족하지 않는다 (V3는 default 값이 없기 때문)

     

    그래서 만약에 Consumer중에 V1 Schema를 사용하는 Consumer가 있다면 V3 Schema로 쓰여진 데이터를 제대로 읽지 못할 것이다. 

    그러므로 이러한 현상을 방지하기 위해서는 Backward-Transitive를 사용해서 V2->V3로 갈때 V1-V3간에도 Compatibility를 체크하라고 하는 것이다.

    그렇게 되면 V1 Schema를 사용하는 Consumer도 잘 동작하는 것을 보장할 수 있게 된다.

     

    만약 모든 Consumer들이 이미 V2를 쓰고 있고, V3로 evolution 됐을 때도 V3 Schema로 변경될 예정이라면 굳이 Transitive 정책을 사용하지 않아도 된다.  

     

     

     

    BACKWARD, BACKWARD_TRANSITIVE 동작 테스트

    아래 상황을 만들어 놓고 테스트한다.

     

    Subject Version Schema 정의 필드구성
    비고
    1 {
      "fields": [
        {
          "name": "name",
          "type": {
            "avro.java.string": "String",
            "type": "string"
          }
        },
        {
          "name": "age",
          "type": "int"
        }
      ],
      "name": "MemberBackward",
      "namespace": "com.ebaykorea.schema.registry.tester.producer.avro",
      "type": "record"
    }
    name 
    age
     
    2 {
      "fields": [
        {
          "name": "name",
          "type": {
            "avro.java.string": "String",
            "type": "string"
          }
        },
        {
          "name": "age",
          "type": "int"
        },

       { 
          "name": "address",
          "type": {
            "avro.java.string": "String",
            "type": "string"
          },

          "default": "busan"
        }
      ],
      "name": "MemberBackward",
      "namespace": "com.ebaykorea.schema.registry.tester.producer.avro",
      "type": "record"
    }
    name
    age
    address (default: busan)
    address 필드 추가
    3 {
      "fields": [
        {
          "name": "name",
          "type": {
            "avro.java.string": "String",
            "type": "string"
          }
        },
        {
          "name": "age",
          "type": "int"
        },

       { 
          "name": "address",
          "type": {
            "avro.java.string": "String",
            "type": "string"
          }      
        }

      ],
      "name": "MemberBackward",
      "namespace": "com.ebaykorea.schema.registry.tester.producer.avro",
      "type": "record"
    }
    name
    age
    address (default: none)

    address의 default value 제거 

    V2 Consumer 가 Schema를 제대로 활용하는지 체크 

    Schema Version Schema 정의
    필드구성
    2 {
      "fields": [
        {
          "name": "name",
          "type": {
            "avro.java.string": "String",
            "type": "string"
          }
        },
        {
          "name": "age",
          "type": "int"
        },

       { 
          "name": "address",
          "type": {
            "avro.java.string": "String",
            "type": "string"
          },

          "default": "busan"
        }
      ],
      "name": "MemberBackward",
      "namespace": "com.ebaykorea.schema.registry.tester.producer.avro",
      "type": "record"
    }
    name
    age
    address (default: busan)
    spring:
      kafka:
        bootstrap-servers: localhost:9092
        consumer:
          group-id: 20220101-group-2
          auto-offset-reset: latest
          client-id: jaeshim-schema-registry-teser
          keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
          #KafkaAvroDeseiralizer를 정의하면 반드시 Schema-Registry URL을 선언해주어야 한다.
          valueDeserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          properties:
            schema.registry.url: http://localhost:8081 #schema-registry url
            auto.register.schemas: false
            use.latest.version: false
            specific.avro.reader: true

    시나리오

    1. Producer가 V3 Schema를 활용해서 Data 생성
    2. Consumer가 V2 Schema를 활용해서 데이터 읽어오는지 확인

    확인결과

    Schema-Registry 설정값도 적용되었고.. subject가 없는데 그냥 불러온다..

    뭐지? 

     

    [참조]

    https://rmcodestar.github.io/kafka/2020/05/24/confluent-schema-registry-%ED%98%B8%ED%99%98%EC%84%B1/

     

    confluent schema registry의 schema 호환성 · rmcodestar.github.io

    confluent schema registry의 schema 호환성 24 May 2020 | kafka confluent schema registry 궁금증 Schema version이 달라지면 어떻게 호환성이 관리될까? 시간이 지나면 초기 개발과 달리 스키마에 필드를 추가하거나 삭

    rmcodestar.github.io

     

    'Kafka > Schema-Registry' 카테고리의 다른 글

    Schema-Registry Naming Strategy  (0) 2023.05.05
    Schema-Registry + Validation  (0) 2022.10.31
    Schema-Registry : Consumer  (0) 2022.10.31
    Schema-Registry : Producer  (0) 2022.10.31
    Schema-Registry?  (0) 2022.10.31