メインコンテンツまでスキップ

Amazon MSK からデータをロードする

このトピックでは、Amazon Managed Streaming for Apache Kafka (Amazon MSK) から CelerData へメッセージ(イベント)をストリームするための Routine Load ジョブの作成方法を紹介します。

Amazon MSK クラスターから CelerData へストリームのメッセージを継続的にロードするには、Amazon MSK クラスターの Kafka トピックにメッセージストリームを保存し、Routine Load ジョブを作成してメッセージを消費します。Routine Load ジョブは CelerData に持続し、トピック内のすべてまたはいくつかのパーティションのメッセージを消費するための一連のロードタスクを生成し、メッセージを CelerData にロードします。

Routine Load ジョブは、CelerData にロードされたデータが失われたり重複したりしないように、正確に一度だけの配信セマンティクスをサポートします。

サポートされているデータファイル形式

Routine Load は現在、MSK クラスターからの CSV および JSON 形式のデータの消費をサポートしています。

注意

CSV 形式のデータについては、CelerData は 50 バイト以内の UTF-8 エンコードされた文字列をカラムセパレータとしてサポートしています。一般的に使用されるカラムセパレータには、カンマ(,)、タブ、パイプ(|)があります。

動作の仕組み

MSK クラスターおよび Confluent Cloud クラスターからのデータのロードは、どちらも Routine Load を使用します。Routine Load の基本的な概念と原則については、Load data from Confluent Cloud through Routine Load を参照してください。

始める前に

  1. MSK クラスターを準備します。MSK getting started に従って MSK クラスターをすばやく作成できます。また、既存の MSK クラスターを使用することもできます。

    注意

    MSK の VPC とサブネットは、クラスター作成後に変更できません。

  2. MSK クラスターと CelerData クラスターが同じ VPC にある場合:

    1. CelerData クラスターリソースが属するセキュリティグループが、MSK クラスターリソースが属するセキュリティグループからのトラフィックを受け入れられることを確認します。

      1. Amazon VPC コンソール にサインインします。
      2. 左側のナビゲーションペインで、Security Groups を選択します。CelerData クラスターリソースが属するセキュリティグループを見つけます。
      3. Inbound Rules タブで、Edit inbound rules を選択します。
      4. Add rule を選択します。
        1. TypeAll traffic に設定します。
        2. SourceCustom を選択し、MSK クラスターリソースが属するセキュリティグループを選択します。 add inbound rule
      5. Save rules をクリックします。
    2. CelerData クラスターが MSK クラスターにアクセスするための認証と暗号化を設定します:

      1. MSK クラスターのセキュリティ設定を構成します。

        1. MSK クラスターを選択します。

        2. Actions > Edit security settings を選択します。

        3. Edit security settings ダイアログボックスで、Access control methodSASL/SCRAM を選択します。

          注意

          • Access control methodSASL/SCRAM を選択すると、Amazon MSK はクライアントとブローカー間のすべてのトラフィックに対して TLS 暗号化を有効にします。
          • Access control methodUnauthenticated access を選択し、Encryption メソッドに PLAINTEXT を選択して簡単なテストを行うこともできます。これらのセキュリティ設定は認証と暗号化を使用しないため、本番環境には安全ではありません

          security settings*

      2. MSK クラスターの認証と暗号化のためにカスタマーマネージドキーとシークレットを作成します。

        1. Key Management Service (KMS)Creating keys に従ってカスタマーマネージドキーを作成します。

        2. AWS secrets ManagerSet up SASL/SCRAM authentication に従ってシークレットを作成します。以下の設定に注意してください:

          1. Choose secret type セクションで、次のようにパラメータを設定します:

            secret

            • Secret type: Other type of secrets を選択します。

            • Key/value pairs: シークレットを入力します。このシークレットは、Routine Load ジョブを作成する際の property.sasl.username および property.sasl.password として使用されます。

            • Encryption Key: 暗号化のために上記で作成したカスタマーマネージドキーとシークレットを関連付けます。

            注意

            Encryption Key として aws/secretsmanager を選択しないでください。Secrets Manager はデフォルトで AWS KMS キーをシークレットに使用し、デフォルトの AWS KMS キーで作成されたシークレットは Amazon MSK クラスターと関連付けることができません。

            Encryption Key

          2. Configure secret セクションで、Secret name がプレフィックス AmazonMSK_ で始まることを確認します。

          3. シークレットを作成した後、シークレットの Secret ARN をコピーします。MSK クラスターとシークレットを関連付ける際に Secret ARN を使用する必要があります。

      3. シークレットを MSK クラスターと関連付けます。

        1. MSK クラスターを選択します。

        2. Actions > Edit security settings を選択します。

        3. Associate secrets をクリックし、上記でコピーした Secret ARN を貼り付けます。 associate secret

  3. MSK クラスターと CelerData が異なる VPC にある場合、CelerData クラスターが MSK クラスターにアクセスするためのネットワーキング、認証、および暗号化を次のように設定します:

    • ネットワーキング

      • オプション 1: CelerData クラスターが AWS 内で MSK クラスターに接続する AWS VPC peering を使用できます。AWS 内での他の接続オプションについては、other methods を参照してください。
      • オプション 2: CelerData クラスターがインターネット経由で MSK クラスターに接続する
        • MSK ネットワーキングを設定する public access を有効にします。

          注意

          MSK クラスターが public access に記載されているすべての条件を満たしていることを確認してください。

        • CelerData ネットワーキングを設定する

          • CelerData クラスターのサブネットがインターネットに接続できることを確認します。

            1. CelerData クラスターがプライベートサブネットにある場合、CelerData クラスターがインターネットに接続できるようにプライベートサブネットに NAT ゲートウェイが設定されていることを確認します。
            2. CelerData クラスターがパブリックサブネットにある場合、CelerData クラスターはデフォルトでインターネットに接続できます。
          • セキュリティ目的で、CelerData クラスターが MSK クラスターにアクセスできるように、CelerData クラスターのセキュリティグループに次のアウトバウンドルールを追加します。 add outbound rule

            1. Port range を設定します。ポート範囲は、MSK クラスターの View client information ページの Bootstrap servers セクションの Public endpoint で確認できます。ポート範囲は、MSK クラスターの認証に応じて 9092 または 9096 です。
            2. DestinationCustom を選択し、MSK クラスターの Public endpoint で見つけられるパブリック IP アドレスを設定します。0.0.0.0/0 を使用することもできます。
    • 認証と暗号化

      MSK クラスターと CelerData クラスターが異なる VPC にある場合の認証と暗号化の設定は、2 つのクラスターが同じ VPC にある場合の設定と同じです。

基本操作

MSK クラスターおよび Confluent Cloud クラスターからのデータのロードは、どちらも Routine Load を使用するため、以下では MSK クラスターからデータをロードする際に特に注意が必要な情報のみを紹介します。Routine Load の詳細については、Load data from Confluent Cloud を参照してください。

Routine Load ジョブを作成する

CSV 形式のデータをロードする

このセクションでは、MSK クラスターから CSV 形式のデータを消費し、CelerData にデータをロードするための Routine Load ジョブの作成方法を説明します。

注意

Routine Load ジョブを作成する前に、ソースデータとしてのデータセットを準備し、CelerData クラスターにテーブルを作成する必要があります。

次のステートメントを実行して、トピック ordertest1 からメッセージを消費し、テーブル example_tbl1 にデータをロードする Routine Load ジョブ example_tbl1_ordertest1 を作成します。ロードジョブは、トピックの指定されたパーティションの初期オフセットからメッセージを消費します。

太字のプロパティに注意してください。

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol"="SASL_SSL",
"property.sasl.mechanism"="SCRAM-SHA-512",
"property.sasl.username"="<username>",
"property.sasl.password"="<password>"
);
  • Kafka broker list

    "kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",

    プロパティ kafka_broker_list を使用して、MSK クラスターへの接続を確立するためのホストとポートのペアのリストを指定する必要があります。ホストとポートのペアのリストは、MSK クラスターの View client information ページの Bootstrap servers セクションの Private endpoint で確認できます。

    注意

    CelerData クラスターがインターネット経由で MSK クラスターにアクセスする場合、MSK クラスターの View client information ページの Bootstrap servers セクションの Public endpoint を使用する必要があります。

  • 認証と暗号化

    "property.security.protocol"="SASL_SSL",
    "property.sasl.mechanism"="SCRAM-SHA-512",
    "property.sasl.username"="***",
    "property.sasl.password"="***",
    • property.security.protocol: SASL_SSL を使用できます。

      注意

      本番環境では SASL_PLAINTEXT の使用は推奨されません。

    • property.sasl.mechanism: SCRAM-SHA-512 を使用できます。

    • property.sasl.username および property.sasl.password: 上記で作成した MSK クラスターと関連付けられたシークレットです。

JSON 形式のデータをロードする

このセクションでは、MSK クラスター内の JSON 形式のデータを消費し、CelerData にデータをロードするための Routine Load ジョブの作成方法を説明します。

注意

Routine Load ジョブを作成する前に、ソースデータとしてのデータセットを準備し、CelerData クラスターにテーブルを作成する必要があります。

次のステートメントを実行して、トピック ordertest2 からメッセージを消費し、テーブル example_tbl2 にデータをロードする Routine Load ジョブ example_tbl2_ordertest2 を作成します。ロードタスクは、トピックの指定されたパーティションの初期オフセットからメッセージを消費します。

CREATE ROUTINE LOAD example_db.example_tbl2_ordertest2 ON example_tbl2
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
"format" ="json",
"jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2",
"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol"="SASL_SSL",
"property.sasl.mechanism"="SCRAM-SHA-512",
"property.sasl.username"="<username>",
"property.sasl.password"="<password>"
);

ロードジョブとタスクを確認する

ロードジョブを確認する

SHOW ROUTINE LOAD ステートメントを実行して、ロードジョブのステータスを確認します。

ロードタスクを確認する

SHOW ROUTINE LOAD TASK ステートメントを実行して、ロードジョブのロードタスクを確認します。

ロードジョブを一時停止する

PAUSE ROUTINE LOAD ステートメントを実行して、ロードジョブを一時停止できます。

PAUSE ROUTINE LOAD FOR example_tbl2_ordertest2;

ロードジョブを再開する

RESUME ROUTINE LOAD ステートメントを実行して、一時停止したロードジョブを再開できます。

RESUME ROUTINE LOAD FOR example_tbl2_ordertest2;

ロードジョブを変更する

ロードジョブを変更する前に、PAUSE ROUTINE LOAD ステートメントを使用して一時停止する必要があります。その後、ALTER ROUTINE LOAD を実行してロードジョブを変更できます。ロードジョブを変更した後、RESUME ROUTINE LOAD ステートメントを実行して再開し、SHOW ROUTINE LOAD ステートメントを使用してロードジョブのステータスを確認できます。

ALTER ROUTINE LOAD FOR example_tbl2_ordertest2
PROPERTIES
(
"desired_concurrent_number" = "6"
)
FROM kafka
(
"kafka_partitions" = "0,1,2,3,4,5,6,7",
"kafka_offsets" = "OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_END,OFFSET_END,OFFSET_END,OFFSET_END"
);

ロードジョブを停止する

STOP ROUTINE LOAD ステートメントを実行して、ロードジョブを停止できます。

STOP ROUTINE LOAD FOR example_tbl2_ordertest2;

FAQ

  • 問題の説明: Routine Load ジョブで SHOW ROUTINE LOAD ステートメントを実行すると、ReasonOfStateChanged フィールドに failed to create kafka consumer: sasl.username and sasl.password must be set というエラーが報告されます。 解決策: 認証メカニズムとして SASL/SCRAM を使用する場合、property.sasl.username および property.sasl.password プロパティを設定する必要があります。

  • 問題の説明: Routine Load ジョブで SHOW ROUTINE LOAD ステートメントを実行すると、ReasonOfStateChanged フィールドに failed to get partition meta: Local: Broker transport failure というエラーが報告されます。

    Error 1064: failed to send proxy request to TNetworkAddress(hostname:10.0.1.2, port:8060) err failed to send proxy request to TNetworkAddress(hostname:10.0.1.2, port:8060) err [failed to get partition meta: Local: Broker transport failure]

    考えられる原因: TLS チャネルが確立できません。

    解決策: CelerData クラスターから MSK クラスターへの SSL 証明書検証を無効にしてトラブルシューティングを行うことができます。

    1. Kafka ブローカーリストが正しいことを確認します。

    2. CelerData から MSK クラスターへの SSL 証明書検証を無効にして TLS チャネルを一時的にシャットダウンします。

      "property.enable.ssl.certificate.verification"="false"

    注意

    • SSL 証明書検証を無効にすることは安全ではありません。本番環境ではオフにする必要があります。
    • セキュリティ目的で、MSK クラスターの property.sasl.username および property.sasl.password プロパティとして 一時的なシークレット を生成する必要があります。CelerData クラスターが SSL 証明書検証を使用せずに MSK クラスターに接続する場合、property.sasl.usernameproperty.sasl.password のような情報は安全ではありません。

    Routine Load ジョブがその後 CelerData にデータを正常にロードする場合、SSL 証明書に問題がある可能性があります。たとえば、CelerData が CA ルート証明書を自動的に見つけられない場合があります。このような場合、以下のプロパティを追加して CelerData に手動で CA ルート証明書を指定できます。CA ルート証明書の詳細については、librdkafka - INTRODUCTION.md # SSL を参照してください。

    "property.ssl.ca.location"="/etc/ssl/certs/ca-bundle.crt"