Kafka/Connect

Tranforms - kryptonite-for-kafka를 사용하여 Field-Level 암복호화 해보기

재심 2023. 6. 9. 10:28

목차

    소개: https://developers.redhat.com/articles/2022/09/27/end-end-field-level-encryption-apache-kafka-connect

    github: https://github.com/hpgrahsl/kryptonite-for-kafka

     

     

    GitHub - hpgrahsl/kryptonite-for-kafka: Kryptonite for Kafka is a client-side 🔒 field level 🔓 cryptography library for Apa

    Kryptonite for Kafka is a client-side 🔒 field level 🔓 cryptography library for Apache Kafka® offering a Kafka Connect SMT, ksqlDB UDFs, and a standalone HTTP API service. It's an ! UNOFFICIAL !...

    github.com

     

    [개요]

    Kafka Connect를 사용할 때 특정 필드의 값은 민감 정보일 수 있다. (ex: 주소, 휴대폰번호 etc..)

    메시지를 처리할 때 지정한 필드만 암호화 처리할 경우 사용해볼만한 Custom Transformation이다. 

     

    소개글에는 RDBMS를 사용할 때 개별필드에 대한 암호화 예시를 보여주고 있다. 

     

    어떻게 Kryptonite는 데이터 유출 위험을 줄일까?

    Kafka는 여러 암호화모듈을 제공하지만 실제로 브로커에 넘나드는 데이터들도 암호화해야할 필요가 있을 수 있다. 

    하지만 기본제공하는 Connect Transform에는 이러한 기능이 제공되지 않고 있다. 

    Kryptonite는 이 수준의 암호화를 제공한다고 한다. 

     

    출처: redhat article

    위의 그림을 보면 ssn의 내용이 그대로 노출되고 있지만 이 부분을 암호화할 필요가 있다. 

    출처: redhat article

    그리고 메시지를 처리하는 Sink Connector도 이 암호화된 값을 복호화하여 목적지에 넣어야 한다. 

     

    [Kryptonite의 데이터 암호화]

    kryptonite는 Kafka Connect의 SMT과정에서 필드 수준의 암호화를 제공한다.

    또한 authenticated encryption을 GCM또는 SIV모드에서 AES를 적용한다고 한다. 

     

    GCM (Galosis/Counter Mode)

    대칭키 암호화에서 사용되는 인증과 암호화를 동시에 제공하는 모드. GCM은 AEAD모드로 분류된다고 한다. AEAD모드는 인증된 암호화 (Authenticated Encryption with Associated Data)의 역자라고 함. 

     

    SIV (Synthetic Initialization Vector)

    메시지 인증코드 (MAC, Message Authentication Code)를 사용할 수 없는 상황에서 개발된 인증 암호화 모드라고 한다. 

     

    출처: redhat article

     

    SMT를 거치고 나면 ssn이 암호화된다.

     

    이 방식의 장점?

    항상 메시지가 암호화된 상태로 있는 것을 보장한다. 그리고 카프카 수준에서의 보안이 설정되어 있지 않더라도 메시지 자체가 암호화되어있기 때문에 기본 암호화를 깨뜨리지 않는 한 데이터의 민감한 부분을 노출시키지 않을 수 있다. 

     

    [예제: MySQL to MongoDB]

    출처: redhat article

     

    데이터 중 street이 민감정보이다.

     

    암호화 적용전 configuation은 아래와 같다.

     

    {
        "name": "mysql-source-001",
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": false,
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": false,
            "tasks.max": "1",
            "database.hostname": "mysql",
            "database.port": "3306",
            "database.user": "root",
            "database.password": "debezium",
            "database.server.id": "1234567",
            "database.server.name": "mysqlhost",
            "database.whitelist": "inventory",
            "table.whitelist": "inventory.addresses",
            "database.history.kafka.bootstrap.servers": "kafka:9092",
            "database.history.kafka.topic": "mysqlhost-schema",
            "transforms": "unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones": false,
            "transforms.unwrap.delete.handling.mode": "drop"
        }
    }

     

    데이터의 예시는 아래와 같다.

    {
      "id": 13,
      "customer_id": 1003,
      "street": "3787 Brownton Road",
      "city": "Columbus",
      "state": "Mississippi",
      "zip": "39701",
      "type": "SHIPPING"
    }

    이제 street 필드의 value를 필드수준에서 암호화해보자. 아래와 같이 설정한다고 한다.

    {
        "name": "mysql-source-enc-001",
        "config": {
            /* ... */
            "transforms": "unwrap,cipher",
            /* ... */
            "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
            "transforms.cipher.cipher_mode": "ENCRYPT",
            "transforms.cipher.cipher_data_keys": "${file:/secrets/classified.properties:cipher_data_keys}",
            "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
            "transforms.cipher.field_config": "[{\"name\":\"street\"}]",
            "transforms.cipher.predicate":"isTombstone",
            "transforms.cipher.negate":true,
            "predicates": "isTombstone",
            "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
        }
    }

    결과는 아래와 같다.

    {
      "id": 13,
      "customer_id": 1003,
      "street": "NLWw4AshpLIIBjLoqM0EgDiUGooYH3jwDnW71wdInMGomFVLHo9AQ6QPEh6fmLRJKVwE3gwwsWux",
      "city": "Columbus",
      "state": "Mississippi",
      "zip": "39701",
      "type": "SHIPPING"
    }

    Consmer Side에서도 유사하게 한다고 한다.

    transform 부분에서 decipher라고 정의해주고, decipher에서 CipherField값을 정의해서 사용한다. 

     

    {
        "name": "mongodb-sink-dec-001",
        "config": {
            "topics": "mysqlhost.inventory.addresses",
            "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable":false,
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable":false,
            "tasks.max": "1",
            "connection.uri":"mongodb://mongodb:27017",
            "database":"kryptonite",
            "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
            "delete.on.null.values": true,
            "transforms": "createid,removefield,decipher",
            "transforms.createid.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
            "transforms.createid.renames": "id:_id",
            "transforms.removefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.removefield.blacklist": "id",
            "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
            "transforms.decipher.cipher_mode": "DECRYPT",
            "transforms.decipher.cipher_data_keys": "${file:/secrets/classified.properties:cipher_data_keys}",
            "transforms.decipher.field_config": "[{\"name\":\"street\"}]",
            "transforms.decipher.predicate":"isTombstone",
            "transforms.decipher.negate":true,
            "predicates": "isTombstone",
            "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
        }
    }

    결과적으로 아래와 같이 복호화 된다.

     

    { "_id": 13,
      "zip": "39701",
      "city": "Columbus",
      "street": "3787 Brownton Road",
      "state": "Mississippi",
      "customer_id": 1003,
      "type": "SHIPPING" }

     

    [직접 해보기1. JDBC Source Connector]

    MSSQL의 특정 테이블 데이터를 읽어 카프카에 메시지로 발행해보자.

    이 때 특정 필드를 암호화한다. 

     

    Prerequisite

    해당 라이브러리를 먼저 내려받고 connect가 구동될 때 로드 될 수 있도록 해야한다. 

     

    라이브러리 다운로드: https://github.com/hpgrahsl/kryptonite-for-kafka/releases/tag/v0.4.1 

    connect-transform-kryptonite-0.4.1.zip 파일을 다운받고, 압축을 풀어준다.

    jar파일 목록이 있다. 

    적당한 디렉토리명을 지정해준뒤 connect 서버의 plugin.path로 옮겨준다.

    (디렉토리명을 connect-transform-kryptonite-0.4.1 로 지정하였다)

    #plugin path로 라이브러리 복사
    $ sudo cp -r connect-transform-kryptonite-0.4.1 /usr/share/java
     
    #connect 재시작
    $ sudo systemctl restart confluent-kafka-connect.service

    MSSQL 예제 테이블 작성

    dbo.jaeshim_connector_table 라는 테이블을 만들고 테스트해보고자 하였다.

    테이블 스키마는 아래와 같다.

    Column Name Type Comment
    SEQ BIGINT PK
    ITEM_NO BIGINT  
    ITEM_NAME VARCHAR(100) 암복호화 대상 필드
    CHANGE_DATE DATETIME2 기본값: 현재 시간
    REGI_DATE DATETIME2 기본값: 현재 시간

     

    INSERT 쿼리

    INSERT INTO dbo.jaeshim_connector_table
    (
        ITEM_NO,
        ITEM_NAME,
        CHANGE_DATE,
        REGI_DATE
    )
    VALUES
    (
    307,
    'ITEM_NAME2',
    GETDATE(),
    GETDATE()
    )

     

    SELECT 쿼리

    SELECT ITEM_NO AS ITME_NO
    , ITEM_NAME AS ITEM_NAME
    , CHANGE_DATE AS CHANGE_DATE
    , REGI_DATE AS REGI_DATE
    , SEQ AS SEQ  
    FROM dbo.jaeshim_connector_table WITH(NOLOCK) 
    WHERE ITEM_NO=307

    조회결과

     

    Jdbc SourceConnector 작성

    해당 테이블로 부터 데이터를 읽어 jaeshim-jdbc-query-cipher 라는 토픽으로 메시지를 발행해볼 예정. 

    아직 kryptonite 적용 전. 

    {
      "name": "JdbcSourceConnectorConnector",
      "config": {
        "name": "JdbcSourceConnectorConnector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "mode": "incrementing",
        "incrementing.column.name": "SEQ",
        "validate.non.null": "false",
        "query": "SELECT ITEM_NO AS ITME_NO, ITEM_NAME AS ITEM_NAME, CHANGE_DATE AS CHANGE_DATE, REGI_DATE AS REGI_DATE, SEQ AS SEQ  FROM dbo.jaeshim_connector_table WITH(NOLOCK) WHERE ITEM_NO=307",
        "query.suffix": "",
        "poll.interval.ms": "5000",
        "topic.prefix": "jaeshim-jdbc-query-cipher"
      }
    }

    구동 후 토픽에 메시지가 정상적으로 쌓이는 지 확인

    {
      "schema": {
        "type": "struct",
        "fields": [
          
          {
            "type": "int64",
            "optional": false,
            "field": "ITME_NO"
          },
          {
            "type": "string",
            "optional": false,
            "field": "ITEM_NAME"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "CHANGE_DATE"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "REGI_DATE"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "SEQ"
          }
        ],
        "optional": false
      },
      "payload": {
        "ITME_NO": 307,
        "ITEM_NAME": "ITEM_NAME2",
        "CHANGE_DATE": 1685973533440,
        "REGI_DATE": 1685973533440,
        "SEQ": 199
      }
    }

    일단 Jdbc Source Connector자체는 잘 구동되는듯하다. 이제 필드암호화를 적용해본다. 

     

    Kryptonite SMT 적용해보기

    아래 부분을 적용해야 하는듯하다.

     

    키 파일 작성

    예제에는 file에서 키를 읽어오도록하였다. 키파일을 작성한다. 

    경로는 /etc/kafka/connect-secrets.properties 파일에 작성하였다.

    cipher_data_keys=[ { "identifier": "my-demo-secret-key-123", "material": { "primaryKeyId": 1000000001, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "GhDRulECKAC8/19NMXDjeCjK", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 1000000001, "outputPrefixType": "TINK" } ] } } ]

    connect properties에 transform 추가

    transform에 cipher을 추가하고, 필요한 정보들을 작성한다. 

    이 때 ITEM_NAME필드를 암호화해보려고 한다. 

       "transforms": "cipher",
       "transforms.cipher.predicate": "isTombstone",
       "transforms.cipher.negate": "true",
       "transforms.cipher.cipher_data_keys": "
       "transforms.cipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
       "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
       "transforms.cipher.cipher_mode": "ENCRYPT",
       "transforms.cipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
       "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",

    전체설정이 아래처럼 변경되었다.

    {
      "name": "JdbcSourceConnectorConnector",
      "config": {
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "transforms.cipher.predicate": "isTombstone",
        "transforms.cipher.negate": "true",
        "transforms.cipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.cipher.cipher_mode": "ENCRYPT",
        "transforms.cipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
        "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "name": "JdbcSourceConnectorConnector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "transforms": "cipher",
        "predicates": "isTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "catalog.pattern": "",
        "schema.pattern": "",
        "mode": "incrementing",
        "incrementing.column.name": "SEQ",
        "validate.non.null": "false",
        "query": "SELECT ITEM_NO AS ITME_NO, ITEM_NAME AS ITEM_NAME, CHANGE_DATE AS CHANGE_DATE, REGI_DATE AS REGI_DATE, SEQ AS SEQ  FROM dbo.jaeshim_connector_table WITH(NOLOCK) WHERE ITEM_NO=307",
        "query.suffix": "",
        "poll.interval.ms": "5000",
        "topic.prefix": "jaeshim-jdbc-query-cipher"
      }
    }

     

    결과 확인

    ITEM_NAME필드가 암호화 되었다. 

    {
      "schema": {
        "type": "struct",
        "fields": [
          
          {
            "type": "int64",
            "optional": false,
            "field": "ITME_NO"
          },
          {
            "type": "string",
            "optional": false,
            "field": "ITEM_NAME"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "CHAGE_DATE"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "REGI_DATE"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "SEQ"
          }
        ],
        "optional": false
      },
      "payload": {
        "ITME_NO": 307,
        "ITEM_NAME": "LQE7msoBaMlbxPkxvWoeOju7W0Iw2W/RTg0YbdprI8Q1MSxJ35VGGVaxamR3DDCybXktZGVtby1zZWNyZXQta2V5LTEys2ux",
        "CHANGE_DATE": 1685973533440,
        "REGI_DATE": 1685973533440,
        "SEQ": 199
      }
    }

     

    [직접 해보기2. JDBC Sink Connector]

    목표: Kafka에 저장된 메시지를 다시 MSSQL의 테이블에 복호화 후 INSERT 

    Source Kafka Topic Sink
    jaeshim_connector_table jaeshim-jdbc-cipher-20230608-1 jaeshim_connector_table_sink_cipher

     

    테이블 스키마. 동일한 구조를 가진다. PK는 SEQ

    Column Name Type Comment
    SEQ BIGINT PK
    ITEM_NO BIGINT  
    ITEM_NAME VARCHAR(100) 암복호화 대상 필드
    CHANGE_DATE DATETIME2 기본값: 현재 시간
    REGI_DATE DATETIME2 기본값: 현재 시간

    INSERT 쿼리

    INSERT INTO dbo.jaeshim_connector_table
    (
        ITEM_NO,
        ITEM_NAME,
        CHANGE_DATE,
        REGI_DATE
    )
    VALUES
    (
    307,
    'ITEM_NAME2',
    GETDATE(),
    GETDATE()
    )

    JDBC Source Connector 수정

    JDBC Sink Connector는 테이블에 데이터를 넣기 위한 스키마 정보가 필요하다. 아래처럼 스키마 정보를 활성화 해주고, schema-registry URL도 명시하여 schema-registry를 사용하여 진행해본다. 

    또한 AvroConverter를 사용하였다. 

    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",
    "key.converter.schema.registry.url": "http://{schema}:8081",
    "value.converter.schema.registry.url": "http://{schema}:8081",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter"

    JDBC Source Connector

    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://{schema}:8081",
        "value.converter.schema.registry.url": "http://{schema}:8081",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",   
        "transforms": "cipher",
        "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.cipher.cipher_mode": "ENCRYPT",
        "transforms.cipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
        "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "transforms.cipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.cipher.predicate":"isTombstone",
        "transforms.cipher.negate":true,
        "predicates": "isTombstone",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "mode": "incrementing",
        "incrementing.column.name": "SEQ",
        "validate.non.null": "true",
        "query": "SELECT ITEM_NO AS ITME_NO, ITEM_NAME AS ITEM_NAME, CHANGE_DATE AS CHANGE_DATE, REGI_DATE AS REGI_DATE, SEQ AS SEQ  FROM dbo.jaeshim_connector_table WITH(NOLOCK) WHERE ITEM_NO=307",  
        "poll.interval.ms": "5000",
        "topic.prefix": "jaeshim-jdbc-cipher-20230608-1"
      }

    JDBC Sink Connector 작성

    우선 대상 테이블을 생성하는 쿼리를 작성한다.

    CREATE TABLE dbo.jaeshim_connector_table_sink_cipher
    (
        SEQ BIGINT PRIMARY KEY NOT NULL,
        ITEM_NO BIGINT NOT NULL,
        ITEM_NAME VARCHAR(100) NOT NULL,
        REGI_DATE DATETIME2 DEFAULT GETDATE(),
        CHANGE_DATE DATETIME2 DEFAULT GETDATE()
    )

    JDBC Sink Connector는 대상이 되는 필드의 스키마가 필요하다. 그 정보도 같이 적어줘야 한다. 

    "transforms.decipher.field_config": "[{\"name\":\"ITEM_NAME\",\"schema\": {\"type\": \"STRING\"}}]",
    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://{schema}:8081",
        "value.converter.schema.registry.url": "http://{schema}:8081",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "topics": "jaeshim-jdbc-cipher-20230608-1",
        "transforms": "decipher",
        "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.decipher.cipher_mode": "DECRYPT",
        "transforms.decipher.field_config": "[{\"name\":\"ITEM_NAME\",\"schema\": {\"type\": \"STRING\"}}]",
        "transforms.decipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.decipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "transforms.decipher.predicate":"isTombstone",
        "transforms.decipher.negate":true,
        "predicates": "isTombstone",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "table.name.format": "jaeshim_connector_table_sink_cipher",
        "delete.on.null.values": true
      }

    데이터를 생성해본다.

    INSERT INTO dbo.jaeshim_connector_table
    (
        ITEM_NO,
        ITEM_NAME,
        CHANGE_DATE,
        REGI_DATE
    )
    VALUES
    (
    307,
    'ITEM_NAME33',
    GETDATE(),
    GETDATE()
    )

    카프카 메시지에 암호화되어 유입 확인.

    {
      "ITEM_NO": 307,
      "ITEM_NAME": "LgE7msoBCLAgDTCYsMMI8crcozqeFZDkuedZHN5xqWUY3wJBx+ZUJrzTnexIdwwwsm15LWRlbW8tc2VjcmV0LWtleS0xMrNrsQ==",
      "CHANGE_DATE": {
        "long": 1686304126946
      },
      "SEQ": 63
    }

    대상 테이블을 조회해본다.

    SELECT TOP 100 *
    FROM dbo.jaeshim_connector_table_sink_cipher with(nolock)

    복호화가 정상적으로 진행되어 유입되었다.

    실패 이력

    시도1. JSONConverter 사용. Schema 미사용. 

    SourceConnector

    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",   
        "transforms": "cipher",
        "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.cipher.cipher_mode": "ENCRYPT",
        "transforms.cipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
        "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "transforms.cipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.cipher.predicate":"isTombstone",
        "transforms.cipher.negate":true,
        "predicates": "isTombstone",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "mode": "incrementing",
        "incrementing.column.name": "SEQ",
        "validate.non.null": "true",
        "query": "SELECT ITEM_NO AS ITME_NO, ITEM_NAME AS ITEM_NAME, CHANGE_DATE AS CHANGE_DATE, REGI_DATE AS REGI_DATE, SEQ AS SEQ  FROM dbo.jaeshim_connector_table WITH(NOLOCK) WHERE ITEM_NO=307",      
        "poll.interval.ms": "5000",
        "topic.prefix": "jaeshim-jdbc-cipher-20230608-1"
      }

    데이터 INSERT후 생성 후 토픽 내용 확인.

    value 
    
    {  
      "ITEM_NO": 307,
      "ITEM_NAME": "LQE7msoB0V/v9u/fQV92G4GS+quvrglpqigstooPjfGf6Mvpk3L8Sv0QWV8ADDCybXktZGVtby1zZWNyZXQta2V5LTEys2ux",
      "CHANGE_DATE": 1686217629063,
      "REGI_DATE": 1686217629063,
      "SEQ": 40
    }
    
    key
    
    null

    Sink Connector

    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "topics": "jaeshim-jdbc-cipher-20230608-1",
        "transforms": "decipher",   
        "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.decipher.cipher_mode": "DECRYPT",
        "transforms.decipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
        "transforms.decipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.decipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "transforms.decipher.predicate":"isTombstone",
        "transforms.decipher.negate":true,
        "predicates": "isTombstone",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "table.name.format": "jaeshim_connector_table_sink_cipher",
        "delete.on.null.values": true
      }

    오류 발생함. 찾아보니 JdbcSinkConnector는 테이블의 필드에 데이터를 넣기위해 스키마가 필요한데, 스키마가 없으니 발생하는 오류라고 한다.

    https://forum.confluent.io/t/the-jdbc-sink-connector-and-schemas/2274

     

    The JDBC Sink connector 33 streams data from Kafka to a relational database and relational databases have schemas The JDBC Sink connector therefore requires a schema to be present for the data.

     

    [2023-06-08 09:55:49,182] DEBUG WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask:275)
    [2023-06-08 09:55:49,182] ERROR WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:208)
    org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
            at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'jaeshim_jdbc_sink_connector_cipher' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='jaeshim-jdbc-cipher-20230608-1',partition=0,offset=0,timestamp=1686185682312) with a LinkedHashMap value and null value schema.
            at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
            at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
            at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
            at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)

    시도2. JSONConverter 사용. Schema 사용. 

    토픽 삭제 후 재생성

    SourceConnector

    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",   
        "transforms": "cipher",
        "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.cipher.cipher_mode": "ENCRYPT",
        "transforms.cipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
        "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "transforms.cipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.cipher.predicate":"isTombstone",
        "transforms.cipher.negate":true,
        "predicates": "isTombstone",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "mode": "incrementing",
        "incrementing.column.name": "SEQ",
        "validate.non.null": "true",
        "query": "SELECT ITEM_NO AS ITME_NO, ITEM_NAME AS ITEM_NAME, CHANGE_DATE AS CHANGE_DATE, REGI_DATE AS REGI_DATE, SEQ AS SEQ  FROM dbo.jaeshim_connector_table WITH(NOLOCK) WHERE ITEM_NO=307", 
        "poll.interval.ms": "5000",
        "topic.prefix": "jaeshim-jdbc-cipher-20230608-1"
      }

    데이터 INSERT후 생성 후 토픽 내용 확인.

    ITEM_NAME의 스키마는 일단 "string"이다. 

    value 
    
    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "int64",
            "optional": false,
            "field": "ITEM_NO"
          },
          {
            "type": "string",
            "optional": false,
            "field": "ITEM_NAME"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "CHANGE_DATE"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "REGI_DATE"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "SEQ"
          }
        ],
        "optional": false
      },
      "payload": {
        "ITEM_NO": 307,
        "ITEM_NAME": "LQE7msoBo195MnQ8MHbAnCm4lJtleXhUgyMhj45T7B/o7aon9xgD9wtyPdCpDDCybXktZGVtby1zZWNyZXQta2V5LTEys2ux",
        "CHANGE_DATE": 1686218445656,
        "REGI_DATE": 1686218445656,
        "SEQ": 42
      }
    }
    
    key
    
    null

    Sink Connector

    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "topics": "jaeshim-jdbc-cipher-20230608-1,
        "transforms": "decipher",   
        "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.decipher.cipher_mode": "DECRYPT",
        "transforms.decipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
        "transforms.decipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.decipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "transforms.decipher.predicate":"isTombstone",
        "transforms.decipher.negate":true,
        "predicates": "isTombstone",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "table.name.format": "jaeshim_connector_table_sink_cipher",
        "delete.on.null.values": true
      }

    오류가 발생함. 오류 내용을 보면 DECRYPT에 대한 스키마 사양이 필요하지만 이 스키마가 제공되지 않았다고 오류가 발생한다. 

    [2023-06-08 10:07:16,577] ERROR WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:208)
    org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
            at org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70)
            at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:543)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: org.apache.kafka.connect.errors.DataException: schema-aware data needs schema spec for DECRYPT but none was given for field path 'ITEM_NAME'
            at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.lambda$derivePrimaryType$0(SchemaRewriter.java:170)
            at java.base/java.util.Optional.orElseThrow(Optional.java:408)
            at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewri

     

    시도3. AvroConverter 사용. Schema 사용. 

    토픽 삭제 후 재생성. AvroConverter를 사용할 경우 SchemaRegistry URL을 명시하지 않으면 오류가 난다. 

    org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.

    SourceConnector

    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://{schema}:8081",
        "value.converter.schema.registry.url": "http://{schema}:8081",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",   
        "transforms": "cipher",
        "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.cipher.cipher_mode": "ENCRYPT",
        "transforms.cipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
        "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "transforms.cipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.cipher.predicate":"isTombstone",
        "transforms.cipher.negate":true,
        "predicates": "isTombstone",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "mode": "incrementing",
        "incrementing.column.name": "SEQ",
        "validate.non.null": "true",
        "query": "SELECT ITEM_NO AS ITME_NO, ITEM_NAME AS ITEM_NAME, CHANGE_DATE AS CHANGE_DATE, REGI_DATE AS REGI_DATE, SEQ AS SEQ  FROM dbo.jaeshim_connector_table WITH(NOLOCK) WHERE ITEM_NO=307",    "poll.interval.ms": "5000",
        "topic.prefix": "jaeshim-jdbc-cipher-20230608-1"
      }

    데이터 INSERT후 생성 후 토픽 내용 확인.

    ITEM_NAME의 스키마는 똑같이 string이다. 

    value 
    
    {
      "ITEM_NO": 307,
      "ITEM_NAME": "LQE7msoBMszuPICzYwhPzeKbzDYPzMkAS8gG2wyeZZFDoLeAnRPGfRrTImHfDDCybXktZGVtby1zZWNyZXQta2V5LTEys2ux",
      "CHANGE_DATE": {
        "long": 1686219238466
      },
      "REGI_DATE": {
        "long": 1686219238466
      },
      "SEQ": 44
    }
    
    value-schema
    
    {
      "fields": [
       
        {
          "name": "ITEM_NO",
          "type": "long"
        },
        {
          "name": "ITEM_NAME",
          "type": "string"
        },
        {
          "default": null,
          "name": "CHANGE_DATE",
          "type": [
            "null",
            {
              "connect.name": "org.apache.kafka.connect.data.Timestamp",
              "connect.version": 1,
              "logicalType": "timestamp-millis",
              "type": "long"
            }
          ]
        },
        {
          "default": null,
          "name": "REGI_DATE",
          "type": [
            "null",
            {
              "connect.name": "org.apache.kafka.connect.data.Timestamp",
              "connect.version": 1,
              "logicalType": "timestamp-millis",
              "type": "long"
            }
          ]
        },
        {
          "name": "SEQ",
          "type": "long"
        }
      ],
      "name": "ConnectDefault",
      "namespace": "io.confluent.connect.avro",
      "type": "record"
    }

    Sink Connector

    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://{schema}:8081",
        "value.converter.schema.registry.url": "http://{schema}:8081",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "topics": "jaeshim-jdbc-cipher-20230608-1,
        "transforms": "decipher",   
        "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.decipher.cipher_mode": "DECRYPT",
        "transforms.decipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
        "transforms.decipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
        "transforms.decipher.cipher_data_key_identifier": "my-demo-secret-key-123",
        "transforms.decipher.predicate":"isTombstone",
        "transforms.decipher.negate":true,
        "predicates": "isTombstone",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "table.name.format": "jaeshim_connector_table_sink_cipher",
        "delete.on.null.values": true
      }
     

    AVRO로 해도 동일한 오류가 발생한다. DECRYPT를 할 때 필요한 스키마가 부족하다는 뜻인것 같음. 


    [2023-06-08 10:23:29,287] ERROR WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:208)
    org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
            at org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70)
            at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:543)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: org.apache.kafka.connect.errors.DataException: schema-aware data needs schema spec for DECRYPT but none was given for field path 'ITEM_NAME'
            at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.lambda$derivePrimaryType$0(SchemaRewriter.java:170)
            at java.base/java.util.Optional.orElseThrow(Optional.java:408)
            at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.derivePrimaryType(SchemaRewriter.java:169)

     

    시도4. AvroConverter 사용. Schema 사용. But 복호화를 시도하지 않기

    SinkConnector만 수정한다.

    transform과 predicates 부분을 삭제. 

    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://{schema}:8081",
        "value.converter.schema.registry.url": "http://{schema}:8081",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "topics": "jaeshim-jdbc-cipher-20230608-1,   
        "connection.url": "mssql,1433",
        "connection.user": "user",
        "connection.password": "pass",
        "table.name.format": "jaeshim_connector_table_sink_cipher",
        "delete.on.null.values": true
      }
     
    암호화된 내용이 그대로 유입되었다. 

     

    기타 시도 이력

    시도 결과 비고
    복호화 시 Decipher 대신 cipher로  실패 https://github.com/hpgrahsl/kryptonite-for-kafka/blob/master/connect-transform-kryptonite/README.md
    동일한 오류 발생함
    Caused by: org.apache.kafka.connect.errors.DataException: schema-aware data needs schema spec for DECRYPT but none was given for field path 'ITEM_NAME'
    transforms.cipher.field_mode를 OBJECT or ELEMENT로 실패 OBJECT로 할 경우 복합 필드를 한 번에 처리하며 단일 필드는 ELEMENT로 두어도 된다고 한다. 
    OBJECT, ELEMENT 둘다 실패함. 
    JsonSchemaConverter 사용 실패  
    predicates 설정 제외해보기 실패  

     

    소스 분석

    github: https://github.com/hpgrahsl/kryptonite-for-kafka

     

    오류 로그 분석

    Kafka Connect의 로그레벨 수준을 DEBUG로 낮춘 뒤 확인.

    #스키마 정보를 업데이트? 왜 하는지는 모르겠음. 
    [2023-06-09 08:56:42,018] DEBUG Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"ITEM_NO\",\"type\":\"long\"},{\"name\":\"ITEM_NAME\",\"type\":\"string\"},{\"name\":\"CHANGE_DATE\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"SEQ\",\"type\":\"long\"}]}"} to http://172.30.1.4:8081/subjects/jaeshim-jdbc-cipher-20230608-1-value?normalize=false&deleted=true (io.confluent.kafka.schemaregistry.client.rest.RestService:273)

    #레코드를 받아서 처리를 시작함.
    [2023-06-09 08:56:42,036] DEBUG SMT received record SinkRecord{kafkaOffset=3, timestampType=CreateTime} ConnectRecord{topic='jaeshim-jdbc-cipher-20230608-1, kafkaPartition=0, key=null, keySchema=null, value=Struct{ITEM_NO=307,ITEM_NAME=LQE7msoB93uEbNDad6XejX6mB6606WyhY0AiOvXGk56HRd4dN9lKX/p/4SKLDDCybXktZGVtby1zZWNyZXQta2V5LTEys2ux,CHANGE_DATE=Fri Jun 09 17:52:55 KST 2023,SEQ=62}, valueSchema=Schema{STRUCT}, timestamp=1686268378215, headers=ConnectHeaders(headers=)} (com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField:161)[2023-06-09 08:56:42,036] DEBUG processing schema-aware data (com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField:179)

    #여기서 schema가 캐시에 없으니 업데이트가 필요하다고 함. 
    [2023-06-09 08:56:42,036] DEBUG adapting schema because record's schema not present in cache (com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField:183)
    [2023-06-09 08:56:42,037] DEBUG adapting original schema for DECRYPT mode (com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter:68)[2023-06-09 08:56:42,037] DEBUG copying schema for non-matched field 'ITEM_NO' (com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter:76)

    #여기서 matched되는 이유는 field_config 에 [{\"name\":\"ITEM_NAME\"}] 값을 적어주었기 때문이다.  
    [2023-06-09 08:56:42,037] DEBUG adapting schema for matched field 'ITEM_NAME' (com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter:73)[2023-06-09 08:56:42,037] DEBUG WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Skipping offset commit, no change since last commit (org.apache.kafka.connect.runtime.WorkerSinkTask:456)[2023-06-09 08:56:42,037] DEBUG WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask:275)[2023-06-09 08:56:42,037] ERROR WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:208)org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)        at org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70)        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:543)        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)        at java.base/java.lang.Thread.run(Thread.java:834)Caused by: org.apache.kafka.connect.errors.DataException: schema-aware data needs schema spec for DECRYPT but none was given for field path 'ITEM_NAME'        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.lambda$derivePrimaryType$0(SchemaRewriter.java:170)        at java.base/java.util.Optional.orElseThrow(Optional.java:408)        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.derivePrimaryType(SchemaRewriter.java:169)        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptSchema(SchemaRewriter.java:74)        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:184)        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:165)        at org.apache.kafka.connect.runtime.PredicatedTransformation.apply(PredicatedTransformation.java:56)        at org.apache.kafka.connect.runtime.TransformationChain.lambda$transformRecord$0(TransformationChain.java:70)        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)        ... 15 more[2023-06-09 08:56:42,038] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:161)
    //SchemaWriter의 초기화 부분
    //json에 명시한 값들을 읽어와서 맵핑시킨다.
    //fieldPathMap -> fieldConfig
    //CIPHER_MODE -> ENCRYPT, DECRYPT
    @Override
      public void configure(Map<String, ?> props) {
        try {
          var config = new SimpleConfig(CONFIG_DEF, props);
          var fieldPathMap =
              OBJECT_MAPPER
                  .readValue(config.getString(FIELD_CONFIG), new TypeReference<Set<FieldConfig>>() {})
                  .stream().collect(Collectors.toMap(FieldConfig::getName, Function.identity()));
          var kryptonite = configureKryptonite(config);
          var serdeProcessor = new KryoSerdeProcessor();
          recordHandlerWithSchema = new SchemaawareRecordHandler(config, serdeProcessor, kryptonite, CipherMode
              .valueOf(
              config.getString(CIPHER_MODE)),fieldPathMap);
          recordHandlerWithoutSchema = new SchemalessRecordHandler(config, serdeProcessor, kryptonite, CipherMode.valueOf(
              config.getString(CIPHER_MODE)),fieldPathMap);
          schemaRewriter = new SchemaRewriter(fieldPathMap, FieldMode.valueOf(config.getString(
              FIELD_MODE)),CipherMode.valueOf(config.getString(CIPHER_MODE)), config.getString(PATH_DELIMITER));
          schemaCache = new SynchronizedCache<>(new LRUCache<>(16));
        } catch (JsonProcessingException e) {
          throw new ConfigException(e.getMessage());
        }
     
    //호출 스택 순서대로
    @Override
      public R apply(R record) {
        LOGGER.debug("SMT received record {}",record);
        //Record에 Schema가 없으면 processWithoutSchema 구문을 탄다.
        if (operatingSchema(record) == null) {
          return processWithoutSchema(record);
        } else {
          return processWithSchema(record);
        }
      }
    //CipherField.java
    public R processWithSchema(R record) {
        LOGGER.debug("processing schema-aware data");
        var valueStruct = requireStruct(operatingValue(record), PURPOSE);
        //캐시에 가지고 있던 스키마 정보를 조회하고 없으면 스키마 정보를 업데이트 함.
        var updatedSchema = schemaCache.get(valueStruct.schema());
        if(updatedSchema == null) {
          LOGGER.debug("adapting schema because record's schema not present in cache");
          updatedSchema = schemaRewriter.adaptSchema(valueStruct.schema(),"");
          schemaCache.put(valueStruct.schema(),updatedSchema);
        }
        var updatedValueStruct = new Struct(updatedSchema);
        recordHandlerWithSchema.matchFields(valueStruct.schema(),valueStruct,updatedSchema,updatedValueStruct,"");
        LOGGER.debug("resulting record data {}",updatedValueStruct);
        return newRecord(record, updatedSchema, updatedValueStruct);
      }
      public Schema adaptSchema(Schema original, String matchedPath) {
        LOGGER.debug("adapting original schema for {} mode",cipherMode);
        var builder  = SchemaUtil.copySchemaBasics(original);
        for (var field : original.fields()) {
        //matchPath가 ""값이니 updatePath는 그냥 ITEM_NO, ITEM_NAME같은 값이 된다.
          var updatedPath = matchedPath.isEmpty() ? field.name() : matchedPath + pathDelimiter + field.name();
          if (fieldConfig.containsKey(updatedPath)) { //정의한 fieldConfig에 있는지 확인
            LOGGER.debug("adapting schema for matched field '{}'",updatedPath); //updatedPath: ITEM_NAME
            adaptField(derivePrimaryType(field,updatedPath),builder,field,updatedPath); //field: ITEM_NAME, field: ITEM_NAME
          } else {
            LOGGER.debug("copying schema for non-matched field '{}'",updatedPath);
            builder.field(field.name(), field.schema());
          }
        }
        return original.isOptional() ? builder.optional().build() : builder.build();
      }
     
      private Type derivePrimaryType(Field field, String fieldPath) { //field: ITEM_NAME, field: ITEM_NAME
        try {
          if(CipherMode.ENCRYPT == cipherMode)
            return field.schema().type(); //string
          var fc = fieldConfig.get(fieldPath); //fieldConfigMap에서 ITEM_NAME을 꺼냄
          var fs = fc.getSchema().orElseThrow( //여기서 스키마를 꺼냄.
              () -> new DataException( //오류가 발생한 곳은 이곳이다. 위의 내용과 종합해보면 ITEM_NAME이라는 필드를 꺼냈는데, 그것에 대한 스키마가 제대로 없다고 한다.
                  "schema-aware data needs schema spec for "+cipherMode+" but none was given"
                      + " for field path '"+fieldPath+"'")
          );
          return extractTypeFromConfig(fs,fieldPath);
        } catch(IllegalArgumentException exc) {
          throw new DataException("hit invalid type spec for field path "+fieldPath,exc);
        }
      }
    private void adaptField(Type decisiveType, SchemaBuilder builder, Field field, String updatedPath) {
        LOGGER.trace("adapting to {} field type {}",cipherMode,decisiveType);
        switch (decisiveType) {
          case ARRAY:
            adaptArraySchema(field, builder, updatedPath);
            break;
          case MAP:
            adaptMapSchema(field, builder, updatedPath);
            break;
          case STRUCT:
            adaptStructSchema(field, builder, updatedPath);
            break;
          default:
            builder.field(field.name(),
                typeSchemaMapper.getSchemaForPrimitiveType(
                    decisiveType,field.schema().isOptional(),cipherMode
                )
            );
        }
      }

    디버깅 결과 고찰

    아래 코드에서 ITEM_NAME에 대한 스키마가 없다고 한다. 결국 어디선가 스키마를 정의해주어야 한다는 것인데 filedConfig에 있는 값을 보고 스키마를 판독하는 것으로 생각된다.

     

    var fs = fc.getSchema().orElseThrow( //여기서 스키마를 꺼냄.
    () -> new DataException( //오류가 발생한 곳은 이곳이다. 위의 내용과 종합해보면 ITEM_NAME이라는 필드를 꺼냈는데, 그것에 대한 스키마가 제대로 없다고 한다.
    "schema-aware data needs schema spec for "+cipherMode+" but none was given"
    + " for field path '"+fieldPath+"'")
    );

    매뉴얼을 다시보니 스키마를 직접 명시해주고 있었다. field_config를 수정해준다.

    https://github.com/hpgrahsl/kryptonite-for-kafka/blob/master/connect-transform-kryptonite/README.md

     

    "transforms.cipher.field_config": "[{\"name\":\"myString\",\"schema\": {\"type\": \"STRING\"}}
    AS-IS TO-BE
    "[{"name":"ITEM_NAME"}]" "[{"name":"ITEM_NAME","schema": {"type": "STRING"}}]"

    결과는 성공

     

    [참조]

    https://developers.redhat.com/articles/2022/09/27/end-end-field-level-encryption-apache-kafka-connect

     

    End-to-end field-level encryption for Apache Kafka Connect | Red Hat Developer

    This article introduces end-to-end encryption for data integration scenarios built on top of Apache Kafka using Kafka Connect together with the open-source

    developers.redhat.com

    https://github.com/hpgrahsl/kryptonite-for-kafka

     

    GitHub - hpgrahsl/kryptonite-for-kafka: Kryptonite for Kafka is a client-side 🔒 field level 🔓 cryptography library for Apa

    Kryptonite for Kafka is a client-side 🔒 field level 🔓 cryptography library for Apache Kafka® offering a Kafka Connect SMT, ksqlDB UDFs, and a standalone HTTP API service. It's an ! UNOFFICIAL !...

    github.com

     

    'Kafka > Connect' 카테고리의 다른 글

    Kafka Connect 모니터링  (0) 2023.04.22
    Kafka Connect 클러스터 구성 방법과 리밸런싱  (0) 2023.04.22
    Kafka Connect 플러그인 설치 방법  (0) 2023.04.22
    Kafka Connect 구성요소  (0) 2023.04.22
    Kafka Connector 개요  (0) 2022.10.29