CREATE ROUTINE LOAD
Routine Load は Apache Kafka® からメッセージを継続的に消費し、StarRocks にデータをロードできます。Routine Load は Kafka クラスターから CSV、JSON、Avro (v3.0.1 以降でサポート) データを消費し、plaintext
、ssl
、sasl_plaintext
、sasl_ssl
などの複数のセキュリティプロトコルを介して Kafka にアクセスできます。
このトピックでは、CREATE ROUTINE LOAD ステートメントの構文、パラメーター、および例について説明します。
NOTE
- Routine Load のアプリケーションシナリオ、原則、および基本操作については、Load data using Routine Load を参照してください。
- StarRocks テーブルにデータをロードするには、StarRocks テーブルに対して INSERT 権限を持つユーザーとしてのみ可能です。INSERT 権限がない場合は、GRANT に従って、使用する StarRocks クラスターに接続するユーザーに INSERT 権限を付与してください。
Syntax
CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
Parameters
database_name
, job_name
, table_name
database_name
オプション。StarRocks データベースの名前。
job_name
必須。Routine Load ジョブの名前。テーブルは複数の Routine Load ジョブからデータを受け取ることができます。識別可能な情報、例えば Kafka トピック名やおおよそのジョブ作成時間を使用して、意味のある Routine Load ジョブ 名を設定することをお勧めします。同じデータベース内で Routine Load ジョブの名前は一意でなければなりません。
table_name
必須。データがロードされる StarRocks テーブルの名前。
load_properties
オプション。データのプロパティ。構文:
[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]
COLUMNS TERMINATED BY
CSV 形式データのカラム区切り文字。デフォルトのカラム区切り文字は \t
(タブ) です。例えば、カラム区切り文字をカンマに指定するには COLUMNS TERMINATED BY ","
を使用します。
Note
- ここで指定したカラム区切り文字が、取り込むデータのカラム区切り文字と同じであることを確認してください。
- カンマ (,) やタブ、パイプ (|) などの UTF-8 文字列をテキストデリミタとして使用できますが、その長さは 50 バイトを超えてはいけません。
- Null 値は
\N
を使用して示されます。例えば、データレコードが 3 つのカラムで構成され、データレコードが最初と 3 番目のカラムにデータを保持し、2 番目のカラムにデータを保持しない場合、この状況では 2 番目のカラムに\N
を使用して Null 値を示す必要があります。これは、レコードをa,\N,b
としてコンパイルする必要があり、a,,b
ではないことを意味します。a,,b
は、レコードの 2 番目のカラムが空の文字列を保持していることを示します。
ROWS TERMINATED BY
CSV 形式データの行区切り文字。デフォルトの行区切り文字は \n
です。
COLUMNS
ソースデータのカラムと StarRocks テーブルのカラム間のマッピング。詳細については、このトピックの Column mapping を参照してください。
column_name
: ソースデータのカラムが StarRocks テーブルのカラムに計算なしでマッピングできる場合、カラム名を指定するだけで済みます。これらのカラムはマッピングされたカラムと呼ばれます。column_assignment
: ソースデータのカラムが StarRocks テーブルのカラムに直接マッピングできず、データロード前に関数を使用してカラムの値を計算する必要がある場合、expr
に計算関数を指定する必要があります。これらのカラムは派生カラムと呼ばれます。StarRocks は最初にマッピングされたカラムを解析するため、派生カラムはマッピングされたカラムの後に配置することをお勧めします。
WHERE
フィルター条件。フィルター条件を満たすデータのみが StarRocks にロードされます。例えば、col1
の値が 100
より大きく、col2
の値が 1000
に等しい行のみを取り込みたい場合、WHERE col1 > 100 and col2 = 1000
を使用できます。
NOTE
フィルター条件で指定されたカラムは、ソースカラムまたは派生カラムであることができます。
PARTITION
StarRocks テーブルがパーティション p0, p1, p2, p3 に分散されており、StarRocks で p1, p2, p3 にのみデータをロードし、p0 に保存されるデータをフィルタリングしたい場合、フィルター条件として PARTITION(p1, p2, p3)
を指定できます。デフォルトでは、このパラメーターを指定しない場合、データはすべてのパーティションにロードされます。例:
PARTITION (p1, p2, p3)
TEMPORARY PARTITION
データをロードしたい temporary partition の名前。複数の一時パーティションを指定する場合、カンマ (,) で区切る必要があります。
job_properties
必須。ロードジョブのプロパティ。構文:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
Property | Required | Description |
---|---|---|
desired_concurrent_number | No | 単一の Routine Load ジョブの期待されるタスク並行性。デフォルト値: 3 。実際のタスク並行性は、複数のパラメーターの最小値によって決まります: min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num) 。
|
max_batch_interval | No | タスクのスケジューリング間隔、つまりタスクが実行される頻度。単位: 秒。値の範囲: 5 ~ 60 。デフォルト値: 10 。10 より大きい値を設定することをお勧めします。スケジューリングが 10 秒未満の場合、ロード頻度が高すぎるために多くのタブレットバージョンが生成されます。 |
max_batch_rows | No | このプロパティは、エラーデータ検出ウィンドウを定義するためにのみ使用されます。ウィンドウは、単一の Routine Load タスクによって消費されるデータ行数です。値は 10 * max_batch_rows です。デフォルト値は 10 * 200000 = 2000000 です。Routine Load タスクは、エラーデータ検出ウィンドウ内でエラーデータを検出します。エラーデータとは、StarRocks が解析できないデータを指します。例えば、無効な JSON 形式のデータです。 |
max_error_number | No | エラーデータ検出ウィンドウ内で許容されるエラーデータ行の最大数。この値を超えると、ロードジョブは一時停止します。SHOW ROUTINE LOAD を実行し、ErrorLogUrls を使用してエラーログを表示できます。その後、エラーログに従って Kafka のエラーを修正できます。デフォルト値は 0 で、エラーデータ行は許可されません。 NOTE
|
max_filter_ratio | No | ロードジョブの最大エラー許容度。エラー許容度は、ロードジョブによって要求されたすべてのデータレコードの中で、不適格なデータ品質のためにフィルタリングされるデ ータレコードの最大割合です。有効な値: 0 から 1 。デフォルト値: 1 (実際には効果を発揮しません)。0 に設定することをお勧めします。これにより、不適格なデータレコードが検出された場合、ロードジョブが一時停止し、データの正確性が保証されます。不適格なデータレコードを無視したい場合は、このパラメーターを 0 より大きい値に設定できます。これにより、データファイルに不適格なデータレコードが含まれていても、ロードジョブは成功します。 NOTE
|
strict_mode | No | strict mode を有効にするかどうかを指定します。有効な値: true と false 。デフォルト値: false 。strict mode が有効な場合、ロードされたデータのカラムの値が NULL であり、ターゲットテーブルがそのカラムに NULL 値を許可しない場合、データ行はフィルタリングされます。 |
log_rejected_record_num | No | ログに記録できる不適格なデータ行の最大数を指定します。このパラメーターは v3.1 以降でサポートされています。有効な値: 0 、-1 、および任意の非ゼロ正の整数。デフォルト値: 0 。
|
timezone | No | ロードジョブで使用されるタイムゾーン。デフォルト値: Asia/Shanghai 。このパラメーターの値は、strftime()、alignment_timestamp()、from_unixtime() などの関数によって返される結果に影響します。このパラメーターで指定されたタイムゾーンはセッションレベルのタイムゾーンです。詳細については、Configure a time zone を参照してください。 |
partial_update | No | 部分更新を使用するかどうか。 有効な値: TRUE と FALSE 。 デフォルト値: FALSE 、この機能を無効にすることを示します。 |
merge_condition | No | データを更新するかどうかを判断する条件として使用するカラムの名前 を指定します。このカラムにロードされるデータの値がこのカラムの現在の値以上の場合にのみデータが更新されます。 NOTE 条件付き更新をサポートするのは主キーテーブルのみです。指定するカラムは主キーのカラムであってはなりません。 |
format | No | ロードされるデータの形式。有効な値: CSV 、JSON 、および Avro (v3.0.1 以降でサポート)。デフォルト値: CSV 。 |
trim_space | No | データファイルが CSV 形式の場合、カラム区切り文字の前後のスペースを削除するかどうかを指定します。タイプ: BOOLEAN。デフォルト値: false 。一部のデータベースでは、データを CSV 形式のデータファイルとしてエクスポートする際にカラム区切り文字にスペースが追加されます。これらのスペースは、位置に応じて先行スペースまたは後続スペースと呼ばれます。 trim_space パラメーターを設定することで、StarRocks がデータロード中にこれらの不要なスペースを削除できるようにします。StarRocks は、 enclose で指定された文字で囲まれたフィールド内のスペース (先行スペースと後続スペースを含む) を削除しないことに注意してください。例えば、次のフィールド値はパイプ (| ) をカラム区切り文字として使用し、二重引用符 (" ) を enclose で指定された文字として使用しています: | "Love StarRocks" | 。trim_space を true に設定すると、StarRocks は前述のフィールド値を |"Love StarRocks"| として 処理します。 |
enclose | No | データファイルが CSV 形式の場合、RFC4180 に従ってフィールド値を囲むために使用される文字を指定します。タイプ: 単一バイト文字。デフォルト値: NONE 。最も一般的な文字は単一引用符 (' ) と二重引用符 (" ) です。enclose で指定された文字で囲まれたすべての特殊文字 (行区切り文字やカラム区切り文字を含む) は通常の記号と見なされます。StarRocks は、enclose で指定された文字として任意の単一バイト文字を指定できるため、RFC4180 よりも多くのことができます。フィールド値に enclose で指定された文字が含まれている場合、同じ文字を使用してその enclose で指定された文字をエスケープできます。例えば、enclose を " に設定し、フィールド値が a "quoted" c の場合、このフィールド値をデータファイルに "a ""quoted"" c" として入力できます。 |
escape | No | 行区切り文字、カラム区切り文字、エスケープ文字、enclose で指定された文字などのさまざまな特殊文字をエスケープするために使用される文字を指定します。これらは通常の文字と見なされ、存在するフィールド値の一部として解析されます。タイプ: 単一バイト文字。デフォルト値: NONE 。最も一般的な文字はスラッシュ (\ ) で、SQL ステートメントではダブルスラッシュ (\\ ) として記述する必要があります。NOTE escape で指定された文字は、各ペアの enclose で指定された文字の内側と外側の両方に適用されます。以下の 2 つの例を示します:
|
strip_outer_array | No | JSON 形式のデータの最外部の配列構造を削除するかどうかを指定します。有効な値: true と false 。デフォルト値: false 。実際のビジネスシナリオでは、JSON 形式のデータには [] で示される最外部の配列構造がある場合があります。この状況では、このパラメーターを true に設定することをお勧めします。これにより、StarRocks は最外部の角括弧 [] を削除し、各内部配列を個別のデータレコードとしてロードします。このパラメーターを false に設定すると、StarRocks は JSON 形式のデータ全体を 1 つの配列として解析し、その配列を単一のデータレコードとしてロードします。JSON 形式のデータ [{"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] を例にとります。このパラメーターを true に設定すると、{"category" : 1, "author" : 2} と {"category" : 3, "author" : 4} は 2 つの別々のデータレコ ードとして解析され、2 つの StarRocks データ行にロードされます。 |
jsonpaths | No | JSON 形式のデータからロードしたいフィールドの名前。このパラメーターの値は有効な JsonPath 式です。詳細については、StarRocks table contains derived columns whose values are generated by using expressions を参照してください。 |
json_root | No | ロードする JSON 形式のデータのルート要素。StarRocks は json_root を通じてルートノードの要素を抽出して解析します。デフォルトでは、このパラメーターの値は空で、すべての JSON 形式のデータがロードされることを示します。詳細については、Specify the root element of the JSON-formatted data to be loaded を参照してください。 |
task_consume_second | No | 指定された Routine Load ジョブ内の各 Routine Load タスクがデータを消費する最大時間。単位: 秒。FE dynamic parameters routine_load_task_consume_second (クラスター内のすべての Routine Load ジョブに適用される) とは異なり、このパラメーターは個々の Routine Load ジョブに特有であり、より柔軟です。このパラメーターは v3.1.0 以降でサポートされています。
|
task_timeout_second | No | 指定された Routine Load ジョブ内の各 Routine Load タスクのタイムアウト期間。単位: 秒。FE dynamic parameter routine_load_task_timeout_second (クラスター内のすべての Routine Load ジョブに適用される) とは異なり、このパラメーターは個々の Routine Load ジョブに特有であり、より柔軟です。このパラメーターは v3.1.0 以降でサポートされています。
|
pause_on_fatal_parse_error | NO | 回復不能なデータ解析エラーが発生した場合にジョブを自動的に一時停止するかどうかを指定します。有効な値: true と false 。デフォルト値: false 。このパラメーターは v3.3.12/v3.4.2 以降でサポートされています。 このような解析エラーは通常、 strip_outer_array を設定せずに JSON 配列をインポートする場合や、Kafka メッセージに abcd のような不正な JSON が含まれている場合など、違法なデータ形式によって引き起こされます。 |
data_source
, data_source_properties
必須。データソースと関連するプロパティ。
FROM <data_source>
("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
data_source
必須。ロードしたいデータのソース。有効な値: KAFKA
。
data_source_properties
データソースのプロパティ。
Property | Required | Description |
---|---|---|
kafka_broker_list | Yes | Kafka のブローカー接続情報。形式は <kafka_broker_ip>:<broker_ port> 。複数のブローカーはカンマ (,) で区切られます。Kafka ブローカーで使用されるデフォルトポートは 9092 です。例: "kafka_broker_list" = ""xxx.xx.xxx.xx:9092,xxx.xx.xxx.xx:9092" 。 |
kafka_topic | Yes | 消費される Kafka トピック。Routine Load ジョブは 1 つのトピックからのみメッセージを消費できます。 |
kafka_partitions | No | 消費される Kafka パーティション。例: "kafka_partitions" = "0, 1, 2, 3" 。このプロパティが指定されていない場合、デフォルトですべてのパーティションが消費されます。 |
kafka_offsets | No | kafka_partitions で指定された Kafka パーティションでデータを消費し始めるオフセット。指定されていない場合、Routine Load ジョブは kafka_partitions の最新のオフセットからデータを消費します。有効な値:
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000" 。 |
property.kafka_default_offsets | No | すべての消費者パーティションのデフォルトの開始オフセット。このプロパティでサポートされる値は kafka_offsets プロパティと同じです。 |
confluent.schema.registry.url | No | Avro スキーマが登録されている Schema Registry の URL。StarRocks はこの URL を使用して Avro スキーマを取得します。形式は次のとおりです:confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname or ip address>[:<port>] |