メインコンテンツまでスキップ

Amazon S3 からのバッチデータロード

CelerData は、AWS S3 からデータをロードするために以下のオプションを提供しています。

これらのオプションにはそれぞれ利点があり、詳細は以下のセクションで説明します。ほとんどの場合、使用が簡単な INSERT+FILES() メソッドをお勧めします。ただし、INSERT+FILES() メソッドは現在、Parquet と ORC ファイル形式のみをサポートしています。そのため、CSV などの他のファイル形式のデータをロードする必要がある場合や、データロード中に DELETE などのデータ変更を行う必要がある場合は、Broker Load を利用できます。

始める前に

ソースデータの準備

CelerData にロードしたいソースデータが S3 バケットに適切に保存されていることを確認してください。また、データとデータベースの場所を考慮することもお勧めします。バケットと CelerData が同じリージョンにある場合、データ転送コストは大幅に低くなります。

このトピックでは、S3 バケット内のサンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet を提供します。このデータセットには、AWS 認証済みユーザーであれば有効な資格情報でアクセスできます。

権限の確認

CelerData クラスター内のテーブルにデータをロードするには、そのテーブルに対する INSERT 権限を持つユーザーとしてのみ可能です。INSERT 権限がない場合は、GRANT に従って、CelerData クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。

認証情報の収集

このトピックの例では、IAM ユーザー認証を使用しています。AWS S3 からデータを読み取る権限を持っていることを確認するために、IAM ユーザー認証の準備 を読み、適切な IAM ポリシー を設定した IAM ユーザーを作成する手順に従うことをお勧めします。

要するに、IAM ユーザー認証を実践する場合、以下の AWS リソースに関する情報を収集する必要があります。

  • データを保存する S3 バケット
  • バケット内の特定のオブジェクトにアクセスする場合の S3 オブジェクトキー(オブジェクト名)。S3 オブジェクトがサブフォルダに保存されている場合、オブジェクトキーにはプレフィックスを含めることができます。
  • S3 バケットが属する AWS リージョン
  • アクセス資格情報として使用されるアクセスキーとシークレットキー

利用可能なすべての認証方法については、AWS リソースへの認証 を参照してください。

CelerData への接続

ロードジョブを開始する前に、CelerData クラスターに接続し、使用する catalog を指定する必要があります。クラスターがエラスティッククラスターの場合、使用するウェアハウスも指定する必要があります。指定したウェアハウスが稼働中であることを確認してください。

その後、S3 バケットからデータをロードするためのロードジョブを開始できます。

INSERT+FILES() の使用

このメソッドは現在、Parquet と ORC ファイル形式のみをサポートしています。

INSERT+FILES() の利点

FILES() は、指定したパス関連のプロパティに基づいてクラウドストレージに保存されたファイルを読み取り、ファイル内のデータのテーブルスキーマを推測し、ファイルからデータをデータ行として返すことができます。FILES() を使用すると、以下のことが可能です。

  • SELECT を使用して S3 から直接データをクエリする
  • INSERT を使用してテーブルを作成し、ロードする
  • CREATE TABLE AS SELECT (CTAS) を使用して既存のテーブルにデータをロードする

典型的な例

FILES() テーブル関数を使用した 3 つの例があります。

  • SELECT を使用して S3 から直接データをクエリする
  • CTAS を使用してテーブルを作成し、ロードする
  • 手動でテーブルを作成し、INSERT を使用してデータをロードする

SELECT を使用して S3 から直接クエリする

SELECT+FILES() を使用して S3 から直接クエリすることで、テーブルを作成する前にデータセットの内容をプレビューできます。例えば:

  • データを保存せずにデータセットをプレビューする
  • 最小値と最大値をクエリし、使用するデータ型を決定する
  • NULL 値を確認する

以下の例は、サンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet をクエリします。

SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
LIMIT 3;

NOTE

上記のコマンドで AAABBB の部分をあなたの資格情報に置き換えてください。オブジェクトは AWS 認証済みユーザーであれば読み取り可能ですので、有効な aws.s3.access_keyaws.s3.secret_key を使用できます。

システムは以下のクエリ結果を返します。

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 543711 | 829192 | 2355072 | pv | 2017-11-27 08:22:37 |
| 543711 | 2056618 | 3645362 | pv | 2017-11-27 10:16:46 |
| 543711 | 1165492 | 3645362 | pv | 2017-11-27 10:17:00 |
+--------+---------+------------+--------------+---------------------+

NOTE

上記で返された列名は Parquet ファイルによって提供されています。

CTAS を使用してテーブルを作成し、ロードする

これは前の例の続きです。前のクエリは CREATE TABLE AS SELECT (CTAS) にラップされ、スキーマ推論を使用してテーブル作成を自動化します。これは、CelerData がテーブルスキーマを推測し、希望するテーブルを作成し、その後データをテーブルにロードすることを意味します。Parquet ファイルを使用する場合、Parquet 形式には列名が含まれているため、FILES() テーブル関数を使用する際にテーブルを作成するための列名と型は必要ありません。

データベースを作成し、切り替えます。

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

CTAS を使用してテーブルを作成し、サンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet のデータをテーブルにロードします。

CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

NOTE

上記のコマンドで AAABBB の部分をあなたの資格情報に置き換えてください。オブジェクトは AWS 認証済みユーザーであれば読み取り可能ですので、有効な aws.s3.access_keyaws.s3.secret_key を使用できます。

テーブルを作成した後、DESCRIBE を使用してそのスキーマを表示できます。

DESCRIBE user_behavior_inferred;

システムは以下のクエリ結果を返します。

+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+

推測されたスキーマと手動で作成されたスキーマを比較します。

  • データ型
  • NULL 許可
  • キーフィールド

宛先テーブルのスキーマをより良く制御し、クエリパフォーマンスを向上させるために、本番環境では手動でテーブルスキーマを指定することをお勧めします。

テーブルをクエリして、データがロードされたことを確認します。例:

SELECT * from user_behavior_inferred LIMIT 3;

以下のクエリ結果が返され、データが正常にロードされたことを示します。

+--------+--------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+--------+------------+--------------+---------------------+
| 58 | 158350 | 2355072 | pv | 2017-11-27 13:06:51 |
| 58 | 158590 | 3194735 | pv | 2017-11-27 02:21:04 |
| 58 | 215073 | 3002561 | pv | 2017-11-30 10:55:42 |
+--------+--------+------------+--------------+---------------------+

INSERT を使用して既存のテーブルにロードする

挿入するテーブルをカスタマイズしたい場合があります。例えば:

  • 列のデータ型、NULL 許可設定、またはデフォルト値
  • キーの種類と列
  • データのパーティショニングとバケッティング

NOTE

最も効率的なテーブル構造を作成するには、データの使用方法と列の内容に関する知識が必要です。このトピックではテーブル設計をカバーしていません。テーブル設計については、テーブルタイプ を参照してください。

この例では、テーブルがどのようにクエリされるか、Parquet ファイル内のデータに関する知識に基づいてテーブルを作成しています。Parquet ファイル内のデータに関する知識は、S3 でファイルを直接クエリすることで得られます。

  • S3 のデータセットをクエリすると、Timestamp 列が datetime データ型に一致するデータを含んでいることが示されるため、以下の DDL で列型が指定されています。
  • S3 のデータをクエリすることで、データセットに NULL 値がないことがわかるため、DDL ではどの列も NULL 許可として設定されていません。
  • 予想されるクエリタイプに基づいて、ソートキーとバケッティング列は UserID 列に設定されています。このデータに対するユースケースが異なる場合は、ソートキーとして ItemID を使用することも考えられます。

データベースを作成し、切り替えます。

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

手動でテーブルを作成します(AWS S3 からロードする Parquet ファイルと同じスキーマを持つことをお勧めします)。

CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
);

テーブルを作成した後、INSERT INTOSELECT FROM FILES() を使用してロードできます。

INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

NOTE

上記のコマンドで AAABBB の部分をあなたの資格情報に置き換えてください。オブジェクトは AWS 認証済みユーザーであれば読み取り可能ですので、有効な aws.s3.access_keyaws.s3.secret_key を使用できます。

ロードが完了した後、テーブルをクエリしてデータがロードされたことを確認します。例:

SELECT * from user_behavior_declared LIMIT 3;

以下のクエリ結果が返され、データが正常にロードされたことを示します。

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 58 | 4309692 | 1165503 | pv | 2017-11-25 14:06:52 |
| 58 | 181489 | 1165503 | pv | 2017-11-25 14:07:22 |
| 58 | 3722956 | 1165503 | pv | 2017-11-25 14:09:28 |
+--------+---------+------------+--------------+---------------------+

ロード進捗の確認

information_schema.loads ビューをクエリしてロード進捗を追跡します。

SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;

複数のロードジョブを送信した場合は、ジョブに関連付けられた LABEL でフィルタリングできます。例:

SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL

loads ビューで提供されるフィールドについての情報は、Information Schema を参照してください。

NOTE

INSERT は同期コマンドです。INSERT ジョブがまだ実行中の場合、その実行ステータスを確認するには別のセッションを開く必要があります。

Broker Load の使用

非同期の Broker Load プロセスは、S3 への接続を確立し、データを取得し、CelerData にデータを保存する処理を行います。

このメソッドは、Parquet、ORC、CSV、JSON ファイル形式をサポートしています。

Broker Load の利点

  • Broker Load はバックグラウンドで実行され、クライアントはジョブが続行するために接続を維持する必要がありません。
  • Broker Load は長時間実行されるジョブに適しており、デフォルトのタイムアウトは 4 時間です。
  • Parquet と ORC ファイル形式に加えて、Broker Load は CSV と JSON ファイル形式をサポートしています。

典型的な例

テーブルを作成し、S3 からサンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet を取得するロードプロセスを開始し、データロードの進捗と成功を確認します。

データベースとテーブルの作成

データベースを作成し、切り替えます。

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

手動でテーブルを作成します(AWS S3 からロードする Parquet ファイルと同じスキーマを持つことをお勧めします)。

CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
);

Broker Load の開始

以下のコマンドを実行して、サンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet から user_behavior テーブルにデータをロードする Broker Load ジョブを開始します。

NOTE

以下のコマンドで AAABBB の部分をあなたの資格情報に置き換えてください。オブジェクトは AWS 認証済みユーザーであれば読み取り可能ですので、有効な aws.s3.access_keyaws.s3.secret_key を使用できます。

LOAD LABEL user_behavior
(
DATA INFILE("s3://starrocks-examples/user_behavior_ten_million_rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(
"aws.s3.enable_ssl" = "true",
"aws.s3.use_instance_profile" = "false",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
PROPERTIES
(
"timeout" = "72000"
);

このジョブには 4 つの主要なセクションがあります。

  • LABEL: ロードジョブの状態をクエリする際に使用される文字列。
  • LOAD 宣言: ソース URI、ソースデータ形式、および宛先テーブル名。
  • BROKER: ソースの接続詳細。
  • PROPERTIES: タイムアウト値およびロードジョブに適用するその他のプロパティ。

詳細な構文とパラメータの説明については、BROKER LOAD を参照してください。

ロード進捗の確認

information_schema.loads ビューをクエリしてロード進捗を追跡します。

SELECT * FROM information_schema.loads;

loads ビューで提供されるフィールドについての情報は、Information Schema を参照してください。

複数のロードジョブを送信した場合は、ジョブに関連付けられた LABEL でフィルタリングできます。

SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';

以下の出力には、ロードジョブ user_behavior に対する 2 つのエントリがあります。

  • 最初のレコードは CANCELLED 状態を示しています。ERROR_MSG までスクロールすると、ジョブが listPath failed により失敗したことがわかります。
  • 2 番目のレコードは FINISHED 状態を示しており、ジョブが成功したことを意味します。
JOB_ID|LABEL                                      |DATABASE_NAME|STATE    |PROGRESS           |TYPE  |PRIORITY|SCAN_ROWS|FILTERED_ROWS|UNSELECTED_ROWS|SINK_ROWS|ETL_INFO|TASK_INFO                                           |CREATE_TIME        |ETL_START_TIME     |ETL_FINISH_TIME    |LOAD_START_TIME    |LOAD_FINISH_TIME   |JOB_DETAILS                                                                                                                                                                                                                                                    |ERROR_MSG                             |TRACKING_URL|TRACKING_SQL|REJECTED_RECORD_PATH|
------+-------------------------------------------+-------------+---------+-------------------+------+--------+---------+-------------+---------------+---------+--------+----------------------------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+------------+------------+--------------------+
10121|user_behavior |mydatabase |CANCELLED|ETL:N/A; LOAD:N/A |BROKER|NORMAL | 0| 0| 0| 0| |resource:N/A; timeout(s):72000; max_filter_ratio:0.0|2023-08-10 14:59:30| | | |2023-08-10 14:59:34|{"All backends":{},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":0,"InternalTableLoadRows":0,"ScanBytes":0,"ScanRows":0,"TaskNumber":0,"Unfinished backends":{}} |type:ETL_RUN_FAIL; msg:listPath failed| | | |
10106|user_behavior |mydatabase |FINISHED |ETL:100%; LOAD:100%|BROKER|NORMAL | 86953525| 0| 0| 86953525| |resource:N/A; timeout(s):72000; max_filter_ratio:0.0|2023-08-10 14:50:15|2023-08-10 14:50:19|2023-08-10 14:50:19|2023-08-10 14:50:19|2023-08-10 14:55:10|{"All backends":{"a5fe5e1d-d7d0-4826-ba99-c7348f9a5f2f":[10004]},"FileNumber":1,"FileSize":1225637388,"InternalTableLoadBytes":2710603082,"InternalTableLoadRows":86953525,"ScanBytes":1225637388,"ScanRows":86953525,"TaskNumber":1,"Unfinished backends":{"a5| | | | |

ロードジョブが完了したことを確認した後、宛先テーブルのサブセットを確認してデータが正常にロードされたかどうかを確認できます。例:

SELECT * from user_behavior LIMIT 3;

以下のクエリ結果が返され、データが正常にロードされたことを示します。

+--------+--------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+--------+------------+--------------+---------------------+
| 58 | 158350 | 2355072 | pv | 2017-11-27 13:06:51 |
| 58 | 158590 | 3194735 | pv | 2017-11-27 02:21:04 |
| 58 | 215073 | 3002561 | pv | 2017-11-30 10:55:42 |
+--------+--------+------------+--------------+---------------------+