メインコンテンツまでスキップ

Kafka コネクタを使用して Confluent Cloud からデータをロードする

このトピックでは、Kafka コネクタ、starrocks-kafka-connector を使用して、Confluent から CelerData にメッセージ(イベント)をストリームする方法を紹介します。Kafka コネクタは、少なくとも一度のセマンティクスを保証します。

Kafka コネクタは Kafka Connect とシームレスに統合でき、これにより CelerData は Kafka エコシステムとより良く統合されます。リアルタイムデータを CelerData にロードしたい場合、賢明な選択です。Routine Load と比較して、以下のシナリオで Kafka コネクタが推奨されます:

  • Routine Load がサポートする CSV、JSON、Avro フォーマット以外の Protobuf などのさまざまなフォーマットでデータをロードします。データが Kafka Connect のコンバータを使用して JSON および CSV フォーマットに変換できる限り、Kafka コネクタを介して CelerData にデータをロードできます。
  • Debezium 形式の CDC データなど、データ変換をカスタマイズします。
  • 複数の Kafka トピックからデータをロードします。
  • Confluent Cloud からデータをロードします。
  • ロードバッチサイズ、並行性、およびその他のパラメータを細かく制御して、ロード速度とリソース利用のバランスを取る必要があります。

始める前に

Confluent クラスターがインターネット経由で CelerData クラスターに接続できることを確認してください。

CelerData クラスターのネットワーク設定を構成する

  1. CelerData クラスターがインターネットから接続できることを確認します。特に、CelerData クラスターがプライベートサブネットにある場合です。CelerData クラスターがパブリックサブネットにある場合、デフォルトでインターネット経由で接続できます。このトピックでは、パブリックサブネットにある CelerData クラスターを例として使用します。

  2. CelerData クラスターのリソースが関連付けられているセキュリティグループが、Confluent クラスターからのトラフィックを許可していることを確認します。

    1. CelerData Cloud BYOC コンソール にサインインし、CelerData クラスターが関連付けられている Security Group ID を見つけます。

      img1

    2. Amazon VPC コンソール にサインインします。

    3. 左側のナビゲーションペインで Security Groups を選択します。表示されるページで、CelerData クラスターのリソースが関連付けられているセキュリティグループを見つけます。

    4. Inbound Rules タブで、Edit inbound rules を選択します。

    5. 赤枠で強調表示された次のルールを追加します:

      ポート 4439030 を追加する必要があります。

      img2

基本的な手順

次の例は、Confluent Cloud から CelerData に Avro 形式のレコードをロードする方法を示しています。

  1. ソースコネクタを使用して、Confluent クラスターのトピックにデータを生成します。次の例では、ソースコネクタは Datagen Source で、データ形式は Avro です。
  2. CelerData テーブルを作成します。
  3. シンクコネクタ(カスタムコネクタ、starrocks-kafka-connector を使用する必要があります)を使用して、Confluent クラスターのトピックから CelerData テーブルにデータをロードします。

Confluent クラスターのトピックにデータを生成する

  1. Confluent クラスターを選択し、その Connectors ページに入り、+ Add Connector をクリックします。その後、ソースコネクタとして Datagen Source を選択します。

    img3

  2. Datagen Source コネクタを構成します。

    1. Topic 選択セクションで:

      + Add new topic をクリックし、トピックの名前とパーティション番号を指定します。この例では、トピックの名前を datagen_topic と指定します。

      img4

    2. Kafka credentials セクションで:

      この例はロードプロセスに素早く慣れるための簡単なガイドなので、Global access を選択し、Generate API key & download をクリックします。

      img5

    3. Configuration セクションで:

      出力レコードの値の形式とテンプレートを選択します。この例では、形式を AVRO、テンプレートを orders と指定します。

      img6

    4. Sizing セクションで:

      デフォルトの設定を使用します。

    5. Review and launch セクションで:

      Datagen ソースコネクタの設定を確認し、すべての設定を検証したら Continue をクリックします。

      img7

  3. Connectors ページで、追加した Datagen ソースコネクタを確認します。

    img8

    Datagen ソースコネクタが稼働すると、トピック datagen_topic にメッセージが入力されていることを確認できます。

CelerData クラスターにテーブルを作成する

トピック datagen_topic の Avro 形式のレコードのスキーマに従って、CelerData クラスターにテーブルを作成します。

CREATE TABLE test123 (
ordertime LARGEINT,
orderid int,
itemid string,
orderunits string,
city string,
state string,
zipcode bigint
)
DISTRIBUTED BY HASH(orderid);

CelerData テーブルにデータをロードする

この例では、シンクコネクタ(Kafka コネクタである starrocks-kafka-connector)を使用して、Confluent トピックから CelerData テーブルに Avro 形式のレコードをロードします。

NOTE

starrocks-kafka-connector という名前の Kafka コネクタはカスタムコネクタであり、カスタムコネクタは Confluent Cloud がサポートする AWS リージョン でのみ作成できます。カスタムコネクタの機能と制限についての詳細は、Confluent Documentation を参照してください。

  1. Confluent クラスターの Connectors ページに入り、+ Add Connector をクリックし、Add plugin をクリックします。

  2. Kafka コネクタのアーカイブをアップロードします。

    • Connector plugin detail:

      • Connector plugin name: Kafka コネクタの名前を入力します。例えば StarRocks-kafka-connector
      • Custom plugin description: Kafka コネクタの説明を入力します。
      • Connector class: Kafka コネクタの Java クラスを入力します。これは com.starrocks.connector.kafka.StarRocksSinkConnector です。
    • Connector type: コネクタタイプを Sink として選択します。

    • Connector archive: Select connector archive をクリックし、Kafka コネクタの ZIP ファイルをアップロードします。

      Kafka コネクタの TAR ファイルを Github からダウンロードし、TAR ファイルを解凍します。すべてのファイルを ZIP ファイルに圧縮し、その ZIP ファイルをアップロードする必要があります。

      img9

  3. Kafka コネクタを構成して起動します。

    1. Confluent クラスターの Connectors ページに入り、+ Add Connector をクリックし、StarRocks-kafka-connector を選択します。

      img10

    2. Kafka credentials セクションで:

      この例はロードプロセスに素早く慣れるための簡単なガイドなので、Global access を選択し、Generate API key & download をクリックします。

    3. Configuration セクションで:

      キーと値のペアまたは JSON 形式で設定を追加します。この例では、JSON 形式で設定を追加します。

      img11

      完全な JSON 形式の設定は以下の通りです:

      {
      "topics": "datagen_topic",
      "confluent.custom.schema.registry.auto": "true",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "connect.timeoutms": "6000",
      "starrocks.http.url": "https://81xxxxxxx.cloud-app.celerdata.org:443",
      "starrocks.username": "xxxxxx",
      "starrocks.password": "xxxxxx",
      "starrocks.database.name": "test",
      "starrocks.topic2table.map": "datagen_topic:test123",
      "sink.properties.strip_outer_array": "true",
      "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]"
      }

      設定:

      サポートされている設定と説明については、Parameters を参照してください。以下の設定に特に注意してください:

      • starrocks.http.url: CelerData クラスターの HTTP URL を https://<endpoint>:443 の形式で入力します。エンドポイントは CelerData クラスターの Overview ページの接続セクションで見つけることができます。

        img12

      • また、ソースデータ形式に応じて追加する必要がある設定もあります。

        この例では、ソースデータは AVRO レコードであり、Confluent Cloud Schema Registry が使用されているため、必要な設定には confluent.custom.schema.registry.autovalue.converter も含める必要があります。詳細については、Confluent Documentation を参照してください。

        {
        "topics": "datagen_topic",
        "confluent.custom.schema.registry.auto": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "connect.timeoutms": "6000",
        "starrocks.http.url": "https://81xxxxxxx.cloud-app.celerdata.org:443",
        "starrocks.username": "xxxxxx",
        "starrocks.password": "xxxxxx",
        "starrocks.database.name": "test",
        "starrocks.topic2table.map": "datagen_topic:test123",
        "sink.properties.strip_outer_array": "true",
        "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]"
        }

        ソースデータが Schema Registry を使用しない JSON レコードの場合、必要な設定には以下も含める必要があります:

          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",

        完全な設定は以下のようになります:

        {
        "topics": "datagen_topic",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "connect.timeoutms": "6000",
        "starrocks.http.url": "https://81xxxxxxx.cloud-app.celerdata.org:443",
        "starrocks.username": "xxxxxx",
        "starrocks.password": "xxxxxx",
        "starrocks.database.name": "test",
        "starrocks.topic2table.map": "datagen_topic:test123",
        "sink.properties.strip_outer_array": "true",
        "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]"
        }
    4. Networking セクションで:

      接続エンドポイントの形式を <endpoint>:443:TCP として入力します。エンドポイントは starrocks.http.url のエンドポイントと同じで、形式は https://<endpoint>:443 です。

      img13

    5. Sizing セクションで:

      デフォルトの設定を使用します。

    6. Review and launch セクションで:

      Kafka コネクタの設定を確認し、すべての設定を検証したら Continue をクリックします。

  4. Connectors ページで、起動した Kafka コネクタを確認します。

    img14

    Kafka コネクタがプロビジョニングを完了すると、ステータスが稼働中に変わります。

  5. 両方のコネクタが稼働した後、CelerData テーブルのデータを確認します。

    img15

パラメータ

name

  • 必須: YES
  • デフォルト値:
  • 説明: この Kafka コネクタの名前。Kafka Connect クラスター内のすべての Kafka コネクタ間でグローバルに一意である必要があります。例えば、starrocks-kafka-connector

connector.class

  • 必須: YES
  • デフォルト値: com.starrocks.connector.kafka.SinkConnector
  • 説明: この Kafka コネクタのシンクで使用されるクラス。

topics

  • 必須: YES
  • デフォルト値:
  • 説明: 購読する 1 つ以上のトピックで、各トピックは CelerData テーブルに対応します。デフォルトでは、CelerData はトピック名が CelerData テーブルの名前と一致すると仮定します。そのため、CelerData はトピック名を使用してターゲット CelerData テーブルを決定します。topics または topics.regex(下記)のいずれかを選択して入力してください。ただし、CelerData テーブル名がトピック名と異なる場合は、オプションの starrocks.topic2table.map パラメータ(下記)を使用してトピック名からテーブル名へのマッピングを指定してください。

topics.regex

  • 必須:
  • デフォルト値:
  • 説明: 購読する 1 つ以上のトピックに一致する正規表現。詳細については、topics を参照してください。topics.regex または topics(上記)のいずれかを選択して入力してください。

starrocks.topic2table.map

  • 必須: NO
  • デフォルト値:
  • 説明: トピック名が CelerData テーブル名と異なる場合の CelerData テーブル名とトピック名のマッピング。形式は <topic-1>:<table-1>,<topic-2>:<table-2>,... です。

starrocks.http.url

  • 必須: YES
  • デフォルト値:
  • 説明: CelerData クラスターの HTTP URL。形式は https://<endpoint>:443 です。エンドポイントは CelerData クラスターの Overview ページの接続セクションで見つけることができます。

starrocks.database.name

  • 必須: YES
  • デフォルト値:
  • 説明: CelerData データベースの名前。

starrocks.username

  • 必須: YES
  • デフォルト値:
  • 説明: CelerData クラスターアカウントのユーザー名。ユーザーは CelerData テーブルに対する INSERT 権限を持っている必要があります。

starrocks.password

  • 必須: YES
  • デフォルト値:
  • 説明: CelerData クラスターアカウントのパスワード。

key.converter

  • 必須: NO
  • デフォルト値: Kafka Connect クラスターで使用されるキーコンバータ
  • 説明: シンクコネクタ(Kafka-connector-starrocks)用のキーコンバータで、Kafka データのキーをデシリアライズするために使用されます。デフォルトのキーコンバータは Kafka Connect クラスターで使用されるものです。

value.converter

  • 必須: NO
  • デフォルト値: Kafka Connect クラスターで使用される値コンバータ
  • 説明: シンクコネクタ(kafka-connector-starrocks)用の値コンバータで、Kafka データの値をデシリアライズするために使用されます。デフォルトの値コンバータは Kafka Connect クラスターで使用されるものです。

key.converter.schema.registry.url

  • 必須: NO
  • デフォルト値:
  • 説明: キーコンバータのスキーマレジストリ URL。

value.converter.schema.registry.url

  • 必須: NO
  • デフォルト値:
  • 説明: 値コンバータのスキーマレジストリ URL。

tasks.max

  • 必須: NO
  • デフォルト値: 1
  • 説明: Kafka コネクタが作成できるタスクスレッドの最大数で、通常は Kafka Connect クラスターのワーカーノードの CPU コア数と同じです。このパラメータを調整してロードパフォーマンスを制御できます。

bufferflush.maxbytes

  • 必須: NO
  • デフォルト値: 94371840(90M)
  • 説明: 一度に CelerData に送信される前にメモリに蓄積できるデータの最大サイズ。最大値は 64 MB から 10 GB の範囲です。Stream Load SDK バッファはデータをバッファするために複数の Stream Load ジョブを作成する場合があることに注意してください。したがって、ここで言及されているしきい値は、総データサイズを指します。

bufferflush.intervalms

  • 必須: NO
  • デフォルト値: 300000
  • 説明: データのバッチを送信する間隔で、ロードの遅延を制御します。範囲:[1000, 3600000]。

connect.timeoutms

  • 必須: NO
  • デフォルト値: 1000
  • 説明: HTTP URL への接続のタイムアウト。範囲:[100, 60000]。

sink.properties.*

  • 必須:
  • デフォルト値:
  • 説明: ロード動作を制御するための Stream Load パラメータ。例えば、パラメータ sink.properties.format は Stream Load に使用される形式を指定し、CSV や JSON などがあります。サポートされているパラメータとその説明のリストについては、STREAM LOAD を参照してください。

sink.properties.format

  • 必須: NO
  • デフォルト値: json
  • 説明: Stream Load に使用される形式。Kafka コネクタは、データの各バッチを CelerData に送信する前にその形式に変換します。有効な値:csv および json。詳細については、CSV パラメータと JSON パラメータを参照してください。

制限事項

  • Kafka トピックからの単一メッセージを複数のデータ行にフラット化して CelerData にロードすることはサポートされていません。
  • Kafka コネクタのシンクは、少なくとも一度のセマンティクスを保証します。