Kafka コネクタを使用して Confluent Cloud からデータをロードする
このトピックでは、Kafka コネクタ、starrocks-kafka-connector を使用して、Confluent から CelerData にメッセージ(イベント)をストリームする方法を紹介します。Kafka コネクタは、少なくとも一度のセマンティクスを保証します。
Kafka コネクタは Kafka Connect とシームレスに統合でき、これにより CelerData は Kafka エコシステムとより良く統合されます。リアルタイムデータを CelerData にロードしたい場合、賢明な選択です。Routine Load と比較して、以下のシナリオで Kafka コネクタが推奨されます:
- Routine Load がサポートする CSV、JSON、Avro フォーマット以外の Protobuf などのさまざまなフォーマットでデータをロードします。データが Kafka Connect のコンバータを使用して JSON および CSV フォーマットに変換できる限り、Kafka コネクタを介して CelerData にデータをロードできます。
- Debezium 形式の CDC データなど、データ変換をカスタマイズします。
- 複数の Kafka トピックからデータをロードします。
- Confluent Cloud からデータをロードします。
- ロードバッチサイズ、並行性、およびその他のパラメータを細かく制御して、ロード速度とリソース利用のバランスを取る必要があります。
始める前に
Confluent クラスターがインターネット経由で CelerData クラスターに接続できることを確認してください。
CelerData クラスターのネットワーク設定を構成する
-
CelerData クラスターがインターネットから接続できることを確認します。特に、CelerData クラスターがプライベートサブネットにある場合です。CelerData クラスターがパブリックサブネットにある場合、デフォルトでインターネット経由で接続できます。このトピックでは、パブリックサブネットにある CelerData クラスターを例として使用します。
-
CelerData クラスターのリソースが関連付けられているセキュリティグループが、Confluent クラスターからのトラフィックを許可していることを確認します。
-
CelerData Cloud BYOC コンソール にサインインし、CelerData クラスターが関連付けられている Security Group ID を見つけます。
-
Amazon VPC コンソール にサインインします。
-
左側のナビゲーションペインで Security Groups を選択します。表示されるページで、CelerData クラスターのリソースが関連付けられているセキュリティグループを見つけます。
-
Inbound Rules タブで、Edit inbound rules を選択します。
-
赤枠で強調表示された次のルールを追加します:
ポート
443
と9030
を追加する必要があります。
-
基本的な手順
次の例は、Confluent Cloud から CelerData に Avro 形式のレコードをロードする方法を示しています。
- ソースコネクタを使用して、Confluent クラスターのトピックにデータを生成します。次の例では、ソースコネクタは Datagen Source で、データ形式は Avro です。
- CelerData テーブルを作成します。
- シンクコネクタ(カスタムコネクタ、starrocks-kafka-connector を使用する必要があります)を使用して、Confluent クラスターのトピックから CelerData テーブルにデータをロードします。
Confluent クラスターのトピックにデータを生成する
-
Confluent クラスターを選択し、その Connectors ページに入り、+ Add Connector をクリックします。その後、ソースコネクタとして Datagen Source を選択します。
-
Datagen Source コネクタを構成します。
-
Topic 選択セクションで:
+ Add new topic をクリックし、トピックの名前とパーティション番号を指定します。この例では、トピックの名前を
datagen_topic
と指定します。 -
Kafka credentials セクションで:
この例はロードプロセスに素早く慣れるための簡単なガイドなので、Global access を選択し、Generate API key & download をクリックします。
-
Configuration セクションで:
出力レコードの値の形式とテンプレートを選択します。この例では、形式を
AVRO
、テンプレートをorders
と指定します。 -
Sizing セクションで:
デフォルトの設定を使用します。
-
Review and launch セクションで:
Datagen ソースコネクタの設定を確認し、すべての設定を検証したら Continue をクリックします。
-
-
Connectors ページで、追加した Datagen ソースコネクタを確認します。
Datagen ソースコネクタが稼働すると、トピック
datagen_topic
にメッセージが入力されていることを確認できます。
CelerData クラスターにテーブルを作成する
トピック datagen_topic
の Avro 形式のレコードのスキーマに従って、CelerData クラスターにテーブルを作成します。
CREATE TABLE test123 (
ordertime LARGEINT,
orderid int,
itemid string,
orderunits string,
city string,
state string,
zipcode bigint
)
DISTRIBUTED BY HASH(orderid);