メインコンテンツまでスキップ

Amazon Kinesis からデータをロードする

このトピックでは、Amazon Kinesis から CelerData にデータをロードする方法について説明します。

仕組み

全体的なソリューションは次のとおりです。

kinesis1

  1. ビジネスアプリケーションがストリーミングデータを Kinesis データストリームに書き込みます。
  2. Kinesis データストリームから入力データを読み取るために、Kinesis Data Analytics アプリケーションを作成します。
  3. Kinesis Data Analytics アプリケーションは、flink-connector-starrocks SDK を使用して CelerData クラスターにデータを書き込みます。

注意事項

  • 最初に get started 演習を行い、Kinesis データストリームをソースおよびシンクとして使用する Kinesis Data Analytics アプリケーションを作成することを強くお勧めします。これにより、Kinesis に迅速に慣れ、CelerData クラスターにデータを書き込む前に発生する可能性のある問題を解決できます。
  • スムーズなロード操作を確保するために、AWS Management Console を使用して Kinesis データストリームと Kinesis Data Analytics アプリケーションを作成し、get started 演習で指定された名前を使用することをお勧めします。
  • Kinesis Data Analytics アプリケーションを構成する際には、次の構成に注意してください:
    • IAM ポリシーを編集する際には、Kinesis データストリームにアクセスするための権限を追加することを確認してください。
    • データ分析アプリケーションのネットワーキングを構成する際には、Kinesis Data Analytics アプリケーションがインターネットにアクセスできることをConfigure the application networking の指示に厳密に従って確認してください。
  • CelerData クラスターを作成する際には、SSL 接続を有効にする必要があります。
  • アプリケーションを正常に実行し、データストリームにデータをロードし始めたら、Data Viewer を使用してデータストリーム内のデータを表示し、ターゲットの CelerData テーブルにデータがロードされているかどうかを確認できます。

始める前に

Kinesis Data Analytics アプリケーションがインターネット経由で CelerData クラスターに接続できることを確認してください。

CelerData クラスターのネットワーキングを構成する

  1. 特に CelerData クラスターがプライベートサブネットにある場合、インターネットから CelerData クラスターに接続できることを確認してください。CelerData クラスターがパブリックサブネットにある場合、デフォルトでインターネット経由で接続できます。このトピックでは、パブリックサブネットにある CelerData クラスターを例として使用します。

  2. CelerData クラスターリソースに関連付けられたセキュリティグループが、Kinesis Analytics アプリケーションからのトラフィックを許可していることを確認してください。

    1. CelerData Cloud BYOC コンソールにサインインし、CelerData クラスターに関連付けられたセキュリティグループ IDを見つけます: kinesis2

    2. Amazon VPC コンソールにサインインします。

    3. 左側のナビゲーションペインで、Security Groups を選択します。CelerData クラスターリソースに関連付けられたセキュリティグループを見つけます。

    4. Inbound Rules タブで、Edit inbound rules を選択します。

    5. 赤枠で強調表示された次のルールを追加します:

      ポート 4439030 を追加する必要があります。 kinesis3

      Note

      CelerData クラスターを作成する際には、SSL 接続を有効にする必要があります。

Kinesis Data Analytics アプリケーションのネットワーキングを構成する

Kinesis Data Analytics アプリケーションを構成する際に、Kinesis Data Analytics アプリケーションがインターネットにアクセスできることを確認してください。

  • アプリケーションの VPC をカスタマイズする場合、アプリケーションのネットワーキングは次の要件を満たす必要があります:
    • Kinesis Data Analytics アプリケーションはプライベートサブネットで実行される必要があります。
    • VPC にはパブリックサブネットに NAT ゲートウェイまたはインスタンスが含まれている必要があります。
    • プライベートサブネットから NAT ゲートウェイまたはインスタンスをホストするパブリックサブネットへのアウトバウンドトラフィックのルートが構成されている必要があります。 詳細については、Internet and Service Access for a VPC-connected Kinesis Data Analytics application を参照してください。
  • または、アプリケーションのネットワーキングを No VPC として構成することもできます。

基本操作

ストリーミングデータを Kinesis Data Stream に書き込む

Kinesis Data Stream を使用して、大量のデータレコードをリアルタイムで保存できます。

  1. Kinesis データストリームを作成する。Kinesis を初めて使用する場合は、get started 演習と同じ名前 ExampleInputStream を使用することをお勧めします。
  2. Kinesis Data Analytics アプリケーションを正常に実行した後、Kinesis データストリームにデータを書き込む

CelerData クラスターにデータベースとテーブルを作成する

Kinesis データストリーム内のデータレコードに従って、CelerData クラスターにデータベースとテーブルを作成します。

CREATE DATABASE IF NOT EXISTS example_db;

USE example_db;
CREATE TABLE stock (
`EVENT_TIME` datetime NOT NULL,
`TICKER` varchar(65533) NOT NULL,
`PRICE` float NULL
) ENGINE=OLAP
DUPLICATE KEY(`EVENT_TIME`)
DISTRIBUTED BY HASH(`EVENT_TIME`) BUCKETS 2
PROPERTIES (
"replication_num" = "1"
);

Kinesis Data Analytics アプリケーションを使用して Kinesis Data Stream からデータを読み取り、CelerData にデータを書き込む

Kinesis Data Analytics アプリケーションを使用して、データストリームから継続的にデータを読み取り、ストリーミングデータを処理および分析し、CelerData クラスターに書き込むことができます。

デモコードを編集して Kinesis Data Analytics アプリケーションを構築する

Kinesis Data Analytics アプリケーションは、Kinesis Data Stream からデータを読み取り、flink-connector-starrocks SDK を使用してデータを複数の並列 Stream Load ジョブにカプセル化し、CelerData クラスターにデータをロードします。

アプリケーションのデモコードは、主にソースデータストリームとターゲット CelerData クラスターの Flink ジョブプロパティを構成します。元の AWS コードと比較して、デモコードは pom.xml ファイルで flink-connector-starrocks 依存関係を使用し、シンク関数でターゲット CelerData クラスターのジョブプロパティを構成します。

デモコードを編集して Kinesis Data Analytics アプリケーションを構築する手順は次のとおりです:

  1. Kinesis Data Analytics アプリケーションを開発するための前提条件を満たしていることを確認してください。

  2. リモートリポジトリをクローンします。

    git clone git@github.com:StarRocks/demo.git
  3. BasicStreamingJob.java ファイルを含むディレクトリに移動します。このトピックでは Flink バージョン 1.15 を例として使用するため、GettingStarted ディレクトリに移動する必要があります。

    cd ./demo/AwsDemo/amazon-kinesis-data-analytics-java-examples/GettingStarted/src/main/java/com/amazonaws/services/kinesisanalytics
  4. デモコードを編集し、BasicStreamingJob.java ファイルでソースとシンクのプロパティを構成します。

    Note

    以下は BasicStreamingJob.java ファイルの一部のコードのみを示しています。全体のコードとコードの差分については、この pull request を参照してください。

    public class BasicStreamingJob {
    private static final Log log = LogFactory.getLog(BasicStreamingJob.class);

    private static final String region = "us-west-2"; // The region of the source data stream
    private static final String inputStreamName = "ExampleInputStream"; // The name of the source data stream
    private static final String outputStreamName = "ExampleOutputStream";

    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
    Properties inputProperties = new Properties();
    inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
    inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
    }

    ......

    public static void main(String[] args) throws Exception {
    // Set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    /* If you want to use runtime configuration properties, uncomment the lines below
    * DataStream<String> input = createSourceFromapplicationProperties(env);
    */
    log.info("Create an input");
    DataStream<String> input = createSourceFromStaticConfig(env);

    /* If you want to use runtime configuration properties, uncomment the lines below
    * input.sinkTo(createSinkFromapplicationProperties())
    */
    log.info("Start to create an sink");
    // input.sinkTo(createSinkFromStaticConfig());
    input.addSink(createCelerDataSinkFromapplicationProperties());
    log.info("Success to add a CelerData sink");

    env.execute("Flink Streaming Java API Skeleton");
    }

    private static SinkFunction<String> createCelerDataSinkFromapplicationProperties() throws IOException {
    Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getapplicationProperties();
    Properties outputProperties = applicationProperties.get("ProducerConfigProperties");
    if (outputProperties == null) {
    outputProperties = new Properties();
    log.info("ProducerConfigProperties is not set. It will use default config");
    }
    else {
    log.info("ProducerConfigProperties: " + outputProperties.toString());
    }

    StarRocksSinkOptions.Builder builder = StarRocksSinkOptions.builder()
    .withProperty("jdbc-url", outputProperties.getProperty("jdbc-url", "jdbc:mysql://xxxxxxxx.cloud-app.celerdata.com:9030"))
    .withProperty("load-url", outputProperties.getProperty("load-url", "https://xxxxxxxx.cloud-app.celerdata.com"))
    .withProperty("username", outputProperties.getProperty("username", "admin"))
    .withProperty("password", outputProperties.getProperty("password", "123456"))
    .withProperty("database-name", outputProperties.getProperty("database-name", "example_db"))
    .withProperty("table-name", outputProperties.getProperty("table-name", "stock"))
    /**
    * The connector can encapsulate the data into two formats: CSV and JSON
    * For CSV data:
    * .withProperty("sink.properties.format", "csv")
    * For JSON data:
    */
    .withProperty("sink.properties.format", "json")
    .withProperty("sink.properties.jsonpaths", "[\"event_time\", \"ticker\", \"price\"]")
    // .withProperty("sink.properties.columns", "event_time, ticker, price")
    .withProperty("sink.properties.strip_outer_array", "true");
    for (Map.Entry<Object, Object> property : outputProperties.entrySet()) {
    if (StringUtils.startsWith(property.getKey().toString(), "sink.")) {
    builder.withProperty(property.getKey().toString(), property.getValue().toString());
    }
    }
    return StarRocksSink.sink(builder.build());
    }

ソース およびシンクプロパティ

  • ソースプロパティ

    プロパティ必須説明
    regionYes入力 Kinesis Data Stream のリージョン。
    inputStreamNameYes入力 Kinesis Data Stream の名前。
  • シンクプロパティ

    プロパティ必須説明
    jdbc-urlYesMySQL サーバーを通じて CelerData クラスターにアクセスするために使用される URL。この形式は jdbc:mysql://xxxxxxxxx.cloud-app.celerdata.com:9030 です。
    load-urlYesHTTP サーバーを通じて CelerData クラスターにアクセスするために使用される URL。この URL は、CelerData クラスターに割り当てられたドメイン名を持つ HTTPS URL です。例:https://xxxxxxxxx.cloud-app.celerdata.com
    database-nameYesCelerData データベースの名前。
    table-nameYesCelerData テーブルの名前。
    usernameYesターゲットデータベースとテーブルに書き込み権限を持つアカウントのユーザー名。
    passwordYes上記のアカウントのユーザー名のパスワード。
    sink.properties.*NoStream Load プロパティ。詳細については、data_desc および opt_properties を参照してください。
    sink.properties.formatNoデータ形式。有効な値:CSV および JSON。デフォルト値:CSV
    sink.properties.jsonpathsNoJSON データからロードしたいフィールドの名前。このパラメータの値は JSON 形式です。
    sink.properties.strip_outer_arrayNoflink-connector-starrocks がデータを JSON 形式にカプセル化し、その後 CelerData クラスターにデータをロードする場合、このパラメータを true に設定する必要があります。flink-connector-starrocks はメッセージをグループ化して Stream Load ジョブを実行します。有効な値:true および false。デフォルト値:false

    後でプロパティを更新したり、さらにプロパティを構成したりする場合は、Configuration タブの Runtime properties セクションで構成を変更できます。

    kinesis4

  1. pom.xml ファイルに flink-connector-starrocks 依存関係を追加します。

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <version>1.2.6_flink-1.15</version>
    </dependency>

    flink-connector-starrocks バージョンは ${flink-connector-starrocks.version}_flink-${fink.version}_${scala.version} の形式です。また、Flink バージョンが 1.15.x の場合、Scala バージョンを指定する必要はありません。サポートされているすべてのバージョンについては、flink-connector-starrocks の github リポジトリ を参照してください。

  2. デモコードをコンパイルして JAR ファイルにパッケージ化します。詳細については、Compile the application Code を参照してください。

    mvn package -Dflink.version=1.15.2
  1. aws-kinesis-analytics-java-apps-1.0.jar ファイルを Amazon S3 バケットにアップロードします。詳細については、Upload the Apache Flink Streaming Java Code を参照してください。

Kinesis Data Analytics アプリケーションの作成と構成

Create and Run the Kinesis Data Analytics application に従って、Kinesis Data Analytics アプリケーションを作成および構成します。次の構成に注意してください。

IAM ポリシーの構成

Kinesis Data Analytics アプリケーションがデータストリームを読み取るために必要な権限を含む IAM ロールを構成します。

  1. Kinesis Data Analytics アプリケーションを作成する際に、デフォルトで必要なポリシーを持つ IAM ロールを作成します。 kinesis5

  2. アプリケーションを作成した後、IAM ポリシーを編集し、アプリケーションに添付されたポリシーが次のように編集されていることを確認します。サンプルアカウント ID 012345678901 を自分のアカウント ID に置き換える必要があります。

    {
    "Sid": "ReadInputStream",
    "Effect": "Allow",
    "Action": "kinesis:*",
    "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
    }

アプリケーションコードの場所を構成する

アプリケーションを構成する際に、アプリケーションコードの場所を指定する必要があります:

  • Amazon S3 バケットの場合、アプリケーション JAR ファイルを含む S3 バケットを指定します。

  • Path to Amazon S3 objectの場合、アプリケーション JAR ファイルの名前を指定します。デフォルトの名前は aws-kinesis-analytics-java-apps-1.0.jar です。

    kinesis6

アプリケーションネットワーキングの構成

  • アプリケーションを構成する際に、アプリケーションがインターネットにアクセスできるようにネットワーキングを構成する必要があります:

    • オプション 1: VPC connectivity に対して Custom VPC configuration を選択し、VPC ドロップダウンリストから No VPC を選択します。 kinesis7

    • オプション 2: Internet and Service Access for a VPC-Connected Kinesis Data Analytics application に提供されている指示に厳密に従ってネットワーキングを指定します。要するに、アプリケーションネットワーキングは次の要件を満たす必要があります:

      • Kinesis Data Analytics アプリケーションはプライベートサブネットで実行される必要があります。
      • VPC にはパブリックサブネットに NAT ゲートウェイまたはインスタンスが含まれている必要があります。
      • プライベートサブネットから NAT ゲートウェイまたはインスタンスをホストするパブリックサブネットへのアウトバウンドトラフィックのルートが構成されている必要があります。

Kinesis Data Analytics アプリケーションを実行する

  1. MyApplication ページで、Run を選択します。表示されるメッセージで、Confirm をクリックします。 kinesis8

  2. Monitoring タブの Logs セクションで、アプリケーションの実行情報を表示します。例えば、アプリケーションのErrorsExceptionsJob status exceptions などを分析してエラーや例外を確認します。 kinesis9

    Configuration タブの Logging and monitoring セクションで、アプリケーションの INFO または WARN ログを表示することもできます。ログストリーム kinesis-analytics-log-stream を開いて、BasicStreamingJobstarrocks などのイベントフィルターを使用して詳細なログを検索します。 kinesis10

FAQ

  1. 問題の説明: Monitoring タブの Logs セクションの Exceptions 部分で、フィールド throwableInformationorg.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request : Connect to kinesis.us-west-2.amazonaws.com:443 [kinesis.us-west-2.amazonaws.com/34.223.45.1] failed: connect timed out と表示された場合、どうすればよいですか。
    考えられる原因: Kinesis Analytics アプリケーションがプライベートサブネットではなく、パブリックサブネットに起動されています。
    解決策:
    • アプリケーションの VPC をカスタマイズする場合、アプリケーションのネットワーキングが次の要件を満たしていることを確認してください:
      • Kinesis Data Analytics アプリケーションはプライベートサブネットで実行される必要があります。
      • VPC にはパブリックサブネットに NAT ゲートウェイまたはインスタンスが含まれている必要があります。
      • プライベートサブネットから NAT ゲートウェイまたはインスタンスをホストするパブリックサブネットへのアウトバウンドトラフィックのルートが構成されている必要があります。 詳細については、Internet and Service Access for a VPC-connected Kinesis Data Analytics application を参照してください。
    • または、アプリケーションのネットワーキングを No VPC として構成することもできます。
  2. 問題の説明: Monitoring タブの Logs セクションの Error 部分で、フィールド messageError occurred when trying to start the job と表示され、フィールド throwableInformationFailed to get StarRocks version と表示された場合、どうすればよいですか。
    考えられる原因: CelerData クラスターの jdbc-url が正しくありません。
    解決策: mysql -u <celerdata_domain_name> -P 9030 -uadmin を使用して、CelerData クラスターへの接続をテストします。
  3. 問題の説明: Monitoring タブの Logs セクションの Exceptions 部分で、フィールド message"message": "Failed to connect to address:``xxxxxxxxx.cloud-app.celerdata.com``” と表示された場合、どうすればよいですか。
    考えられる原因: アプリケーションが HTTPS プロトコルを介して CelerData クラスターのドメインにアクセスできません。
    解決策:
    次の CelerData 構成を確認してください:
    • CelerData クラスターが SSL 接続を有効にしています。
    • celerdata クラスターに関連付けられたセキュリティグループが、ポート 443 を開くためのインバウンドルールを追加しています。