BROKER LOAD

Description

CelerData provides the MySQL-based loading method Broker Load. After you submit a load job, CelerData asynchronously runs the job. You need to use SHOW LOAD or curl to check the job result.

You can use a Broker Load job to load data from your AWS S3 bucket. See Batch load data from Amazon S3.

Syntax

LOAD LABEL [<database_name>.]<label_name>
(
    data_desc[, data_desc ...]
)
WITH BROKER
(
    StorageCredentialParams
)
[PROPERTIES
(
    opt_properties
)
]

Note that in CelerData some literals are used as reserved keywords by the SQL language. Do not directly use these keywords in SQL statements. If you want to use such a keyword in an SQL statement, enclose it in a pair of backticks (`). See Keywords.

Parameters

database_name and label_name

label_name specifies the label of the load job.

database_name optionally specifies the name of the database to which the destination table belongs.

Each load job has a label that is unique across the entire database. You can use the label of a load job to view the execution status of the load job and prevent repeatedly loading the same data. When a load job enters the FINISHED state, its label cannot be reused. Only the label of a load job that has entered the CANCELLED state can be reused. In most cases, the label of a load job is reused to retry that load job and load the same data, thereby implementing Exactly-Once semantics.

data_desc

The description of a batch of data to be loaded. Each data_desc descriptor declares information such as the data source, ETL functions, destination table, and destination partitions.

Batch loading supports loading multiple data files at a time. In one load job, you can use multiple data_desc descriptors to declare multiple data files you want to load, or use one data_desc descriptor to declare one file path from which you want to load all data files in it. Batch loading can also ensure the transactional atomicity of each job that loads multiple data files. Atomicity means that the loading of multiple data files in one load job must all succeed or fail. It never happens that the loading of some data files succeeds while the loading of the other files fails.

data_desc supports the following syntax:

DATA INFILE ("<file_path>"[, "<file_path>" ...])
[NEGATIVE]
INTO TABLE <table_name>
[PARTITION (<partition_name>[, <partition_name> ...])]
[FORMAT AS "CSV | Parquet | ORC"]
[COLUMNS TERMINATED BY "<column_separator>"]
[(column_list)]
[COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])]
[SET <destination_column_name=function(temp_column_name)>[, <destination_column_name=function(temp_column_name)> ...]]
[WHERE predicate]

data_desc must include the following parameters:

  • DATA INFILE

    Specifies the save path of one or more data files you want to load.

    AWS S3 can be accessed according to the S3 or S3A protocol. Therefore, you can include s3:// or s3a:// as the prefix in the S3 URI that you pass as the file path (DATA INFILE).

    For example, you can specify this parameter as "s3a://<bucket_name>/<folder_name>/<folder_name>/<file_name>" to load a single data file from the specified path. You can also specify this parameter as "s3a://<bucket_name>/<folder_name>/<folder_name>/<wildcard>" to load all data files from the specified path. The supported wildcards are ?, *, [], {}, and ^. For more information, see Wildcard reference.

    NOTE

    Wildcards can also be used to specify intermediate paths.

    The parameters in file_path are as follows:

    • bucket_name: the name of the S3 bucket in which your data files are stored.

    • folder_name: the name of the folder in which your data files are stored.

    • file_name: the name of the data file that you want to load.

    • wildcard: the wildcard that you want to use in the file path.

  • INTO TABLE

    Specifies the name of the destination table.

data_desc can also optionally include the following parameters:

  • NEGATIVE

    Revokes the loading of a specific batch of data. To achieve this, you need to load the same batch of data with the NEGATIVE keyword specified.

    NOTE

    This parameter is valid only when the destination table uses the Aggregate table and all its value columns are computed by the sum function.

  • PARTITION

    Specifies the partitions into which you want to load data. By default, if you do not specify this parameter, the source data will be loaded into all partitions of the destination table.

  • FORMAT AS

    Specifies the format of the data file. Valid values: CSV, Parquet, and ORC. If you include the filename extension .csv, .parquet, or .orc in <file_path>, you can skip this parameter. Otherwise, you must specify this parameter.

  • COLUMNS TERMINATED BY

    Specifies the column separator used in the data file. If you do not specify this parameter, this parameter defaults to \t, indicating tab. The column separator you specify must be the same as the column separator used in the data file. Otherwise, the load job fails due to inadequate data quality, and its State is displayed as CANCELLED.

    NOTE

    • You need to set this parameter only when you load CSV data.
    • For CSV data, you can use a UTF-8 string, such as a comma (,), tab, or pipe (|), whose length does not exceed 50 bytes as a text delimiter.
  • column_list

    Specifies the column mapping between the data file and the destination table. Syntax: (<column_name>[, <column_name> ...]). The columns declared in column_list are mapped by name onto the destination table columns.

    NOTE

    If the columns of the data file are mapped in sequence onto the columns of the destination table, you do not need to specify column_list. For more information, see the "Column mapping" section of this topic.

    If you want to skip a specific column of the data file, you only need to temporarily name that column as different from any of the destination table columns.

  • SET

    Specifies one or more functions that you want to use to convert a column of the data file. Examples:

    • The destination table consists of three columns, which are col1, col2, and col3 in sequence. The data file consists of four columns, among which the first two columns are mapped in sequence onto col1 and col2 of the destination table and the sum of the last two columns is mapped onto col3 of the destination table. In this case, you can specify column_list as (col1,col2,tmp_col3,tmp_col4) and pass (col3=tmp_col3+tmp_col4) into the SET clause to implement data conversion.
    • The destination table consists of three columns, which are year, month, and day in sequence. The data file consists of only one column that accommodates date and time values in yyyy-mm-dd hh:mm:ss format. In this case, you can specify column_list as (tmp_time) and pass (year = year(tmp_time), month=month(tmp_time), day=day(tmp_time)) into the SET clause to implement data conversion.
  • WHERE

    Specifies the conditions based on which you want to filter the source data. CelerData loads only the source data that meets the filter conditions specified in the WHERE clause.

StorageCredentialParams

A set of parameters about how the system integrates with your S3 bucket.

  • To choose the instance profile-based authentication method, configure StorageCredentialParams as follows:

    "aws.s3.use_instance_profile" = "true",
    "aws.s3.region" = "<aws_s3_region>"
  • To choose the assumed role-based authentication method, configure StorageCredentialParams as follows:

    "aws.s3.use_instance_profile" = "true",
    "aws.s3.iam_role_arn" = "<iam_role_arn>",
    "aws.s3.region" = "<aws_s3_region>"
  • To choose the IAM user-based authentication method, configure StorageCredentialParams as follows:

    "aws.s3.use_instance_profile" = "false",
    "aws.s3.access_key" = "<iam_user_access_key>",
    "aws.s3.secret_key" = "<iam_user_secret_key>",
    "aws.s3.region" = "<aws_s3_region>"

The following table describes the parameters you need to configure in StorageCredentialParams.

ParameterRequiredDescription
aws.s3.use_instance_profileYesSpecifies whether to enable the credential methods instance profile and assumed role. Valid values: true and false. Default value: false.
aws.s3.iam_role_arnNoThe ARN of the IAM role that has privileges on your AWS S3 bucket. If you choose assumed role as the credential method for accessing AWS S3, you must specify this parameter.
aws.s3.regionYesThe region in which your AWS S3 bucket resides. Example: us-west-1.
aws.s3.access_keyNoThe access key of your IAM user. If you choose IAM user as the credential method for accessing AWS S3, you must specify this parameter.
aws.s3.secret_keyNoThe secret key of your IAM user. If you choose IAM user as the credential method for accessing AWS S3, you must specify this parameter.

For information about how to choose a credential method for accessing AWS S3 and how to configure an access control policy in AWS IAM Console, see Authentication parameters for accessing AWS S3.

opt_properties

Specifies some optional parameters whose settings are applied to the entire load job. Syntax:

PROPERTIES ("<key>" = "<value>"[, "<key>" = "<value>" ...])

The following parameters are supported:

  • timeout

    Specifies the timeout period of the load job. Unit: second. The default timeout period is 4 hours. We recommend that you specify a timeout period shorter than 6 hours. If the load job does not finish within the timeout period, CelerData cancels the load job and the status of the load job becomes CANCELLED.

    NOTE

    In most cases, you do not need to set the timeout period. We recommend that you set the timeout period only when the load job cannot finish within the default timeout period.

    Use the following formula to infer the timeout period:

    Timeout period > (Total size of the data files to be loaded x Total number of the data files to be loaded and the materialized views created on the data files)/(Average load speed x Maximum number of concurrent instances allowed per task)

    NOTE

    • "Average load speed" is the average load speed aat the account level. The average load speed varies for each CelerData cloud account depending on the server configuration and the maximum number of concurrent query tasks allowed for the CelerData cloud account. You can infer the average load speed based on the load speeds of historical load jobs.

    • "Maximum number of concurrent instances allowed per task" is specified by the FE dynamic parameter max_broker_concurrency.

    Suppose that you want to load a 1-GB data file on which two materialized views are created into a CelerData cloud account whose average load speed is 10 MB/s and maximum number of concurrent instances allowed per task is 3. The amount of time required for the data load is approximately 102 seconds.

    (1 x 1024 x 3)/(10 x 3) = 102(second)

    For this example, we recommend that you set the timeout period to a value greater than 102 seconds.

  • max_filter_ratio

    Specifies the maximum error tolerance of the load job. The maximum error tolerance is the maximum percentage of rows that can be filtered out as a result of inadequate data quality. Valid values: 0~1. Default value: 0.

    • If you set this parameter to 0, CelerData does not ignore unqualified rows during loading. As such, if the source data contains unqualified rows, the load job fails. This helps ensure the correctness of the data loaded into CelerData.

    • If you set this parameter to a value greater than 0, CelerData can ignore unqualified rows during loading. As such, the load job can succeed even if the source data contains unqualified rows.

      NOTE

      Rows that are filtered out due to inadequate data quality do not include rows that are filtered out by the WHERE clause.

    If the load job fails because the maximum error tolerance is set to 0, you can use SHOW LOAD to view the job result. Then, determine whether unqualified rows can be filtered out. If unqualified rows can be filtered out, calculate the maximum error tolerance based on the values returned for dpp.abnorm.ALL and dpp.norm.ALL in the job result, adjust the maximum error tolerance, and submit the load job again. The formula for calculating the maximum error tolerance is as follows:

    max_filter_ratio = [dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL)]

    The sum of the values returned for dpp.abnorm.ALL and dpp.norm.ALL is the total number of rows to be loaded.

  • log_rejected_record_num

    Specifies the maximum number of unqualified data rows that can be logged. Valid values: 0, -1, and any non-zero positive integer. Default value: 0.

    • The value 0 specifies that no data rows that are filtered out will be logged.
    • The value -1 specifies that all data rows that are filtered out will be logged.
    • A non-zero positive integer such as n specifies that up to n data rows that are filtered out can be logged on each BE.
  • load_mem_limit

    Specifies the maximum amount of memory that can be provided to the load job. Unit: bytes: The default memory limit is 2 GB.

  • strict_mode

    Specifies whether to enable the strict mode. Valid values: true and false. Default value: false. true specifies to enable the strict mode, and false specifies to disable the strict mode.

    If you enable the strict mode, CelerData returns errors in case that data conversions fail. If you disable the strict mode, CelerData fills NULL values in the destination table in case that data conversions fail.

    The strict mode does not take effect in the following situations:

    • The data from a source column is generated by functions.
    • The data type of a source column imposes range limits on the column values. As a result, the values from that source column can be properly converted to other data types, but cannot pass the range limit checks. For example, a source column is defined as the DECIMAL(1,0) data type, and the values in that source column are 10.
  • timezone

    Specifies the time zone of the load job. Default value: Asia/Shanghai. The time zone setting affects the results returned by functions such as strftime, alignment_timestamp, and from_unixtime. The time zone specified in the timezone parameter is a session-level time zone.

  • priority

    Specifies the priority of the load job. Valid values: LOWEST, LOW, NORMAL, HIGH, and HIGHEST. Default value: NORMAL. Batch loading provides a specific task pool size, which determines the maximum number of tasks that can be concurrently run within a specific time period. If the number of tasks to run for jobs that are submitted within the specified time period exceeds the maximum number, the jobs in the task pool will be waiting to be scheduled based on their priorities.

  • merge_condition

    Specifies the name of the column you want to use as the condition to determine whether updates can take effect. The update from a source record to a destination record takes effect only when the source data record has a greater or equal value than the destination data record in the specified column.

    NOTE

    The column that you specify cannot be a primary key column. Additionally, only tables that use the Primary Key table support conditional updates.

For the loading of JSON data, CelerData Cloud Serverless provides the following parameters:

  • jsonpaths

    The names of the keys that you want to load from the JSON data file. You need to specify this parameter only when you load JSON data by using the matched mode. The value of this parameter is in JSON format. See Configure column mapping for JSON data loading.

  • strip_outer_array

    Specifies whether to strip the outermost array structure. Valid values: true and false. Default value: false.

    In real-world business scenarios, the JSON data may have an outermost array structure as indicated by a pair of square brackets []. In this situation, we recommend that you set this parameter to true, so CelerData Cloud Serverless removes the outermost square brackets [] and loads each inner array as a separate data record. If you set this parameter to false, CelerData Cloud Serverless parses the entire JSON data file into one array and loads the array as a single data record. For example, the JSON data is [ {"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ]. If you set this parameter to true, {"category" : 1, "author" : 2} and {"category" : 3, "author" : 4} are parsed into separate data records that are loaded into separate rows of the destination table.

  • json_root

    The root element of the JSON data that you want to load from the JSON data file. You need to specify this parameter only when you load JSON data by using the matched mode. The value of this parameter is a valid JsonPath string. By default, the value of this parameter is empty, indicating that all data of the JSON data file will be loaded.

When you load JSON data, also note that the size per JSON object cannot exceed 4 GB. If an individual JSON object in the JSON data file exceeds 4 GB in size, an error "This parser can't support a document that big." is reported.

Column mapping

Configure column mapping for CSV data loading

If the columns of the data file can be mapped one on one in sequence to the columns of the destination table, you do not need to configure the column mapping between the data file and the destination table.

If the columns of the data file cannot be mapped one on one in sequence to the columns of the destination table, you need to use the columns parameter to configure the column mapping between the data file and the destination table. This includes the following two use cases:

  • Same number of columns but different column sequence. Also, the data from the data file does not need to be computed by functions before it is loaded into the matching destination table columns.

    In the columns parameter, you need to specify the names of the destination table columns in the same sequence as how the data file columns are arranged.

    For example, the destination table consists of three columns, which are col1, col2, and col3 in sequence, and the data file also consists of three columns, which can be mapped to the destination table columns col3, col2, and col1 in sequence. In this case, you need to specify "columns: col3, col2, col1".

  • Different number of columns and different column sequence. Also, the data from the data file needs to be computed by functions before it is loaded into the matching destination table columns.

    In the columns parameter, you need to specify the names of the destination table columns in the same sequence as how the data file columns are arranged and specify the functions you want to use to compute the data. Two examples are as follows:

    • The destination table consists of three columns, which are col1, col2, and col3 in sequence. The data file consists of four columns, among which the first three columns can be mapped in sequence to the destination table columns col1, col2, and col3 and the fourth column cannot be mapped to any of the destination table columns. In this case, you need to temporarily specify a name for the fourth column of the data file, and the temporary name must be different from any of the destination table column names. For example, you can specify "columns: col1, col2, col3, temp", in which the fourth column of the data file is temporarily named temp.
    • The destination table consists of three columns, which are year, month, and day in sequence. The data file consists of only one column that accommodates date and time values in yyyy-mm-dd hh:mm:ss format. In this case, you can specify "columns: col, year = year(col), month=month(col), day=day(col)", in which col is the temporary name of the data file column and the functions year = year(col), month=month(col), and day=day(col) are used to extract data from the data file column col and loads the data into the mapping destination table columns. For example, year = year(col) is used to extract the yyyy data from the data file column col and loads the data into the destination table column year.

Configure column mapping for JSON data loading

If the keys of the JSON document have the same names as the columns of the destination table, you can load the JSON-formatted data by using the simple mode. In simple mode, you do not need to specify the jsonpaths parameter. This mode requires that the JSON-formatted data must be an object as indicated by curly brackets {}, such as {"category": 1, "author": 2, "price": "3"}. In this example, category, author, and price are key names, and these keys can be mapped one on one by name to the columns category, author, and price of the destination table.

If the keys of the JSON document have different names than the columns of the destination table, you can load the JSON-formatted data by using the matched mode. In matched mode, you need to use the jsonpaths and COLUMNS parameters to specify the column mapping between the JSON document and the destination table:

  • In the jsonpaths parameter, specify the JSON keys in the sequence as how they are arranged in the JSON document.
  • In the COLUMNS parameter, specify the mapping between the JSON keys and the destination table columns:
    • The column names specified in the COLUMNS parameter are mapped one on one in sequence to the JSON keys.
    • The column names specified in the COLUMNS parameter are mapped one on one by name to the destination table columns.

Examples

The following examples assume that you choose instance profile as the credential method for accessing AWS S3.

Perform a load

Your CelerData database test_db contains a table named table1. The table consists of three columns, which are col1, col2, and col3 in sequence.

Your s3a://celerdata_bucket/folder_a/folder_b path stores a number of CSV data files, one of which is test_file.csv. These data files each also consist of three columns, which are mapped in sequence onto col1, col2, and col3 of table1.

Create a load job with the label label_1 to load only the data records whose values in the first column are greater than 20180601 from test_file.csv into partitions p1 and p2 of table1, and specify that the load job will time out in 3600 seconds:

LOAD LABEL test_db.label1
(
    DATA INFILE("s3a://celerdata_bucket/folder_a/folder_b/test_file.csv")
    INTO TABLE table1
    COLUMNS TERMINATED BY ","
    PARTITION (p1, p2)
    (col1, col2, col3)
    where col1 > 20180601
)
WITH BROKER
(
    "aws.s3.use_instance_profile" = "true",
    "aws.s3.region" = "<aws_s3_region>"
)
PROPERTIES
(
    "timeout" = "3600"
);

Revoke a load

Create a load job with the label label_2 to revoke the previous load whose label is label_1:

LOAD LABEL test_db.label2
(
    DATA INFILE("s3a://celerdata_bucket/folder_a/folder_b/test_file.csv")
    NEGATIVE
    INTO TABLE table1
    COLUMNS TERMINATED BY ","
    PARTITION (p1, p2)
    (col1, col2, col3)
    where col1 > 20180601
)
WITH BROKER
(
    "aws.s3.use_instance_profile" = "true",
    "aws.s3.region" = "<aws_s3_region>"
);

Load data into a table containing HLL-type columns

Your CelerData database test_db contains a table named table2. The table consists of four columns, which are id, col1, col2, and col3 in sequence. col1 and col2 are defined as HLL-type columns.

Your data file test_file.csv consists of three columns, among which the first column is mapped onto id of table2 and the second and third columns are mapped in sequence onto col1 and col2 of table2. The values in the second and third columns of test_file.csv can be converted into HLL-type data by using functions before they are loaded into col1 and col2 of table2.

Create a load job with the label label_3 to load all data from test_file.csv into table2:

LOAD LABEL test_db.label3
(
    DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/celerdata/data/input/test_file.csv")
    INTO TABLE table2
    COLUMNS TERMINATED BY ","
    (id, temp1, temp2)
    SET
    (
        col1 = hll_hash(temp1),
        col2 = hll_hash(temp2),
        col3 = empty_hll()
     )
 )
 WITH BROKER
 (
    "aws.s3.use_instance_profile" = "true",
    "aws.s3.region" = "<aws_s3_region>"
 );

NOTE

In the preceding example, the three columns of test_file.csv are named id, temp1, and temp2 in sequence by using column_list. Then, functions are used to convert data as follows:

  • The hll_hash function is used to convert the values in temp1 and temp2 of test_file.csv into HLL-type data and map temp1 and temp2 of test_file.csv onto col1 and col2 of table2.

  • The empty_hll function is used to fill the specified default value into col3 of table2.