Kafka/Connect

Kafka Connect 플러그인 설치 방법

재심 2023. 4. 22. 21:47

목차

    [Kafka Connector 확인하기]

    Confluent Hub: https://www.confluent.io/hub/ 

     

    Confluent Hub에는 사용가능한 Connector들을 확인할 수 있다.

    사용하려는 connector를 검색한다.

    라이센스 확인

    Connector 별로 라이센스가 달라서 확인이 필요하다.

    • Apache License, Confluent Community License: 무료이므로 사용상의 제약이 없다.
    • Confluent Support: 유료 라이센스이므로 사용 전 확인이 필요하다.

     

    [설치하기]

    플러그인을 설치하는 방법은 2가지가 있다.

    • Confluent CLI를 통한 설치
    • zip파일을 내려받아 connector plugin Path에 직접 옮겨주기 

     

    Confluent CLI를 통해 설치하려면 Confluent CLI 환경이 구축되어 있어야하고, 외부와 접근가능한 상태여야 한다.

    두번째 방법으로 진행해보았다.

     

    ZIP파일을 내려 받아 Connector Plugin Path에 직접 옮겨주기 

    ZIP파일을 다운받고 압축을 풀면 아래와 같은 파일들이 있다.

    이 중 필요한 것은 lib 하위 jar파일들이다.

    압축 푼 후 모습

    lib 하위 jar파일들만 따로 취합한다. 

    bin 폴더의 jar파일만 취합

     

    이제 이 파일들을 Connector Plugin Path에 옮겨주어야 한다.

    Connector 설정정보에서 Plugin Path를 확인한다.

     

    $ cat /etc/kafka/connect-distributed.properties

     

    Plugin Path가 "/usr/share/java" 이다.

    이 경로 하위에 위에서 만든 파일들을 포함한 디렉토리를 옮겨주면 된다. 

     

    파일을 옮겨주고 난뒤 connect 서비스 restart

    $ sudo systemctl restart confluent-kafka-connect.service

     

    REST API를 통해 사용가능한 플러그인들을 조회할 수 있다. 

    $curl GET http://localhost:8083/connectors/

     

    [Connector 등록하기]

    REST API를 통해 커넥터 등록도 할 수 있다.

    https://docs.confluent.io/platform/current/connect/references/restapi.html#tasks

     

    PUT /connectors/hdfs-sink-connector/config HTTP/1.1
    Host: connect.example.com
    Accept: application/json
     
    {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }

    예시. (Jdbc SourceConnector)

    {
      "name": "JdbcSourceConnector",
      "config": {
        ...
        "name": "JdbcSourceConnector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "connection.url": "jdbc:sqlserver://localhost:3950",
        "connection.user": "admin",
        "connection.password": "***********************************",
        ...
        "mode": "timestamp+incrementing",
        "incrementing.column.name": "SEQ",
        "timestamp.column.name": "CHANGE_DATE",
        "validate.non.null": "false",
        "query": "SELECT A FROM TABLE",
        "query.suffix": "",
        "poll.interval.ms": "5000",
        "topic.prefix": "jaeshim-test-topic"
        ...
      }
    }

    이렇게 REST API만으로 관리가 힘든 경우가 많기에 kafka-ui 같은 오픈소스 어드민 툴을 사용하는 것도 고려한다.

    https://github.com/provectus/kafka-ui

     

    GitHub - provectus/kafka-ui: Open-Source Web UI for Apache Kafka Management

    Open-Source Web UI for Apache Kafka Management. Contribute to provectus/kafka-ui development by creating an account on GitHub.

    github.com

     

     

    플러그인 업그레이드 방법

    1. 해당 커넥터를 중지한다.
    2. jar파일을 덮어씌운다.
    3. 재시작한다.

    이 때 기존에 어디까지 처리했는지 기록된 정보를 다시 활용할 수 있는지는 커넥터마다 다를 수 있어서 확인할 필요가 있다고 한다. 

     

    [구성 시 유의사항]

    Kafka Connector를 사용하면 기본적으로 다른 서비스에 접근하게된다.

    일반적으로 서비스 접근을 위해서는 id/password 정보가 필요한데, Connector 특성상 이 정보들이 Plain Text로 노출된다.

     

    해결방법으로는 Hashicorp Vault 같은 서비스들이 있다.

    그러므로 Connector를 구성할 때 단순히 Connector를 구성하는 것이 아니라 Vault 적용을 함께 하여야 한다.

    (불가능한 경우 SecurePassConfigProvider 등의 대체 수단을 적용하여야 한다)