- Release Notes
- Get Started
- Clusters
- Cloud Settings
- Table Type
- Query Data Lakes
- Integration
- Query Acceleration
- Data Loading
- Concepts
- Batch load data from Amazon S3
- Batch load data from Azure cloud storage
- Load data from a local file system
- Load data from Confluent Cloud
- Load data from Amazon MSK
- Load data from Amazon Kinesis
- Data Unloading
- Data Backup
- Security
- Console Access Control
- Data Access Control
- Application keys
- Service accounts
- Use SSL connection
- Alarm
- Usage and Billing
- Organizations and Accounts
- Reference
- Amazon Web Services (AWS)
- Microsoft Azure
- SQL Reference
- Keywords
- ALL statements
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- SHOW FILE
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- SELECT
- SHOW ALTER
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- SQL Functions
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayofweek_iso
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- jodatime_format
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str_to_jodatime
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- to_iso8601
- to_tera_date
- to_tera_timestamp
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- count_if
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Geographic Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- substring_index
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Bit Functions
- Bitmap Functions
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Map Functions
- Binary Functions
- cast function
- hash function
- Cryptographic Functions
- Math Functions
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- AUTO_INCREMENT
- Generated columns
- System variables
- System limits
- Information Schema
- Overview
- be_bvars
- be_cloud_native_compactions
- be_compactions
- character_sets
- collations
- column_privileges
- columns
- engines
- events
- global_variables
- key_column_usage
- load_tracking_logs
- loads
- materialized_views
- partitions
- pipe_files
- pipes
- referential_constraints
- routines
- schema_privileges
- schemata
- session_variables
- statistics
- table_constraints
- table_privileges
- tables
- tables_config
- task_runs
- tasks
- triggers
- user_privileges
- views
- System Metadatabase
- API
- Overview
- Actions
- Clusters
- Create and Manage Clusters
- Query Clusters
- Identity and Access Management
- Organization and Account
- Usage and Billing
- Clusters
- Terraform Provider
- Run scripts
CREATE ROUTINE LOAD
Description
Routine Load is an asynchronous loading method based on the MySQL protocol. It continuously consumes messages from Apache Kafka® and loads data into CelerData. Routine Load can consume CSV, JSON, and Avro data from a Kafka cluster and access Kafka via SSL encryption, SASL authentication, or unsecured authentication. This topic describes the syntax, parameters, and examples of the CREATE ROUTINE LOAD statement.
NOTE
You can load data into tables only as a user who has the INSERT privilege on those tables. If you do not have the INSERT privilege, follow the instructions provided in GRANT to grant the INSERT privilege to the user that you use to connect to your CelerData cluster.
Syntax
CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
Parameters
database_name
, job_name
, table_name
database_name
Optional. The name of the database.
job_name
Required. The name of the Routine Load job. A table may receive data from multiple Routine Load jobs. We recommend that you set a meaningful Routine Load job name by using identifiable information, for example, Kafka topic name and the approximate job creation time, to distinguish multiple Routine Load jobs. The name of the Routine Load job must be unique within the same database.
table_name
Required. The name of the table to which data is loaded.
load_properties
Optional. The properties of the data. Syntax:
[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]
COLUMNS TERMINATED BY
The column separator for CSV-formatted data. The default column separator is \t
(Tab). For example, you can use COLUMNS TERMINATED BY ","
to specify the column separator as a comma.
Note
- Ensure that the column separator specified here is the same as the column separator in the data to be ingested.
- You can use a UTF-8 string, such as a comma (,), tab, or pipe (|), whose length does not exceed 50 bytes as a text delimiter.
- Null values are denoted by using
\N
. For example, a data record consists of three columns, and the data record holds data in the first and third columns but does not hold data in the second column. In this situation, you need to use\N
in the second column to denote a null value. This means the record must be compiled asa,\N,b
instead ofa,,b
.a,,b
denotes that the second column of the record holds an empty string.
ROWS TERMINATED BY
The row separator for CSV-formatted data. The default row separator is \n
.
COLUMNS
The mapping between the columns in the source data and the columns in the table. For more information, see Column mapping in this topic.
column_name
: If a column of the source data can be mapped to a column of the table without any computation, you only need to specify the column name. These columns can be referred to as mapped columns.column_assignment
: If a column of the source data cannot be directly mapped to a column of the table, and the column's values must be computed by using functions before data loading, you must specify the computation function inexpr
. These columns can be referred to as derived columns. It is recommended to place derived columns after mapped columns because CelerData first parses mapped columns.
WHERE
The filter condition. Only data that meets the filter condition can be loaded into CelerData. For example, if you only want to ingest rows whose col1
value is greater than 100
and col2
value is equal to 1000
, you can use WHERE col1 > 100 and col2 = 1000
.
NOTE
The columns specified in the filter condition can be source columns or derived columns.
PARTITION
If a table is distributed on partitions p0, p1, p2 and p3, and you want to load the data only to p1, p2, and p3 in CelerData and filter out the data that will be stored in p0, then you can specify PARTITION(p1, p2, p3)
as a filter condition. By default, if you do not specify this parameter, the data will be loaded into all the partitions. Example:
PARTITION (p1, p2, p3)
TEMPORARY PARTITION
The name of the temporary partition into which you want to load data. You can specify multiple temporary partitions, which must be separated by commas (,).
job_properties
Required. The properties of the load job. Syntax:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
Property | Required | Description |
---|---|---|
desired_concurrent_number | No | The expected task parallelism of a single Routine Load job. Default value: 3 . The actual task parallelism is determined by the minimum value of the multiple parameters: min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num) .
|
max_batch_interval | No | The scheduling interval for a task, that is, how often a task is executed. Unit: seconds. Value range: 5 ~ 60 . Default value: 10 . It is recommended to set a value larger than 10 . If the scheduling is shorter than 10 seconds, too many tablet versions are generated due to an excessively high loading frequency. |
max_batch_rows | No | This property is only used to define the window of error detection. The window is the number of rows of data consumed by a single Routine Load task. The value is 10 * max_batch_rows . The default value is 10 * 200000 = 200000 . The Routine Load task detects error data in the error detection window. Error data refers to data that CelerData cannot parse, such as invalid JSON-formatted data. |
max_error_number | No | The maximum number of error data rows allowed within an error detection window. If the number of error data rows exceeds this value, the load job will pause. You can execute SHOW ROUTINE LOAD and view the error logs by using ErrorLogUrls . After that, you can correct the error in Kafka according to the error logs. The default value is 0 , which means error rows are not allowed.NOTE Error data rows do not include data rows that are filtered out by the WHERE clause. |
strict_mode | No | Specifies whether to enable the strict mode. Valid values: true and false . Default value: false . When the strict mode is enabled, if the value for a column in the loaded data is NULL but the destination table does not allow a NULL value for this column, the data row will be filtered out. |
log_rejected_record_num | No | Specifies the maximum number of unqualified data rows that can be logged. Valid values: 0 , -1 , and any non-zero positive integer. Default value: 0 .
|
timezone | No | The time zone used by the load job. Default value: Asia/Shanghai . The value of this parameter affects the results returned by functions such as strftime(), alignment_timestamp(), and from_unixtime(). The time zone specified by this parameter is a session-level time zone. |
merge_condition | No | Specifies the name of the column you want to use as the condition to determine whether to update data. Data will be updated only when the value of the data to be loaded into this column is greater than or equal to the current value of this column. NOTE Only Primary Key tables support conditional updates. The column that you specify cannot be a primary key column. |
format | No · | The format of the data to be loaded. Valid values: CSV , JSON , and Avro . Default value: CSV . |
trim_space | No | Specifies whether to remove spaces preceding and following column separators from the data file when the data file is in CSV format. Type: BOOLEAN. Default value: false .For some databases, spaces are added to column separators when you export data as a CSV-formatted data file. Such spaces are called leading spaces or trailing spaces depending on their locations. By setting the trim_space parameter, you can enable CelerData to remove such unnecessary spaces during data loading.Note that CelerData does not remove the spaces (including leading spaces and trailing spaces) within a field wrapped in a pair of enclose -specified characters. For example, the following field values use pipe (| ) as the column separator and double quotation marks (" ) as the enclose -specified character: | "Love CelerData" | . If you set trim_space to true , CelerData processes the preceding field values as |"Love CelerData"| . |
enclose | No | Specifies the character that is used to wrap the field values in the data file according to RFC4180 when the data file is in CSV format. Type: single-byte character. Default value: NONE . The most prevalent characters are single quotation mark (' ) and double quotation mark (" ).All special characters (including row separators and column separators) wrapped by using the enclose -specified character are considered normal symbols. CelerData can do more than RFC4180 as it allows you to specify any single-byte character as the enclose -specified character.If a field value contains an enclose -specified character, you can use the same character to escape that enclose -specified character. For example, you set enclose to " , and a field value is a "quoted" c . In this case, you can enter the field value as "a ""quoted"" c" into the data file. |
escape | No | Specifies the character that is used to escape various special characters, such as row separators, column separators, escape characters, and enclose -specified characters, which are then considered by CelerData to be common characters and are parsed as part of the field values in which they reside. Type: single-byte character. Default value: NONE . The most prevalent character is slash (\ ), which must be written as double slashes (\\ ) in SQL statements.NOTE The character specified by escape is applied to both inside and outside of each pair of enclose -specified characters.Two examples are as follows:
|
strip_outer_array | No | Specifies whether to strip the outermost array structure of the JSON-formatted data. Valid values: true and false . Default value: false . In real-world business scenarios, JSON-formatted data may have an outermost array structure as indicated by a pair of square brackets [] . In this situation, we recommend that you set this parameter to true , so CelerData removes the outermost square brackets [] and loads each inner array as a separate data record. If you set this parameter to false , CelerData parses the entire JSON-formatted data into one array and loads the array as a single data record. Use the JSON-formatted data [{"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] as an example. If you set this parameter to true , {"category" : 1, "author" : 2} and {"category" : 3, "author" : 4} are parsed as two separate data records and are loaded into two CelerData data rows. |
jsonpaths | No | The names of the fields that you want to load from JSON-formatted data. The value of this parameter is a valid JsonPath expression. For more information, see Configure column mapping for loading JSON-formatted data in this topic. |
json_root | No | The root element of the JSON-formatted data to load. CelerData extracts the elements of the root node through json_root for parsing. By default, the value of this parameter is empty, indicating that all JSON-formatted data will be loaded. For more information, see Specify the root element of the JSON-formatted data to be loaded in this topic. |
task_consume_second | No | The maximum time for each Routine Load task within the specified Routine Load job to consume data. Unit: second. Unlike the FE dynamic parameter routine_load_task_consume_second (which applies to all Routine Load jobs within the CelerData cluster), this parameter is specific to an individual Routine Load job, which is more flexible.
|
task_timeout_second | No | The timeout duration for each Routine Load task within the specified Routine Load job. Unit: second. Unlike the FE dynamic parameter routine_load_task_timeout_second (which applies to all Routine Load jobs within the CelerData cluster), this parameter is specific to an individual Routine Load job, which is more flexible.
|
data_source
, data_source_properties
Required. The data source and relevant properties.
FROM <data_source>
("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
data_source
Required. The source of the data you want to load. Valid value: KAFKA
.
data_source_properties
The properties of the data source.
Property | Required | Description |
---|---|---|
kafka_broker_list | Yes | Kafka's broker connection information. The format is <kafka_broker_ip>:<broker_ port> . Multiple brokers are separated by commas (,). The default port used by Kafka brokers is 9092 . Example:"kafka_broker_list" = ""xxx.xx.xxx.xx:9092,xxx.xx.xxx.xx:9092" . |
kafka_topic | Yes | The Kafka topic to be consumed. A Routine Load job can only consume messages from one topic. |
kafka_partitions | No | The Kafka partitions to be consumed, for example, "kafka_partitions" = "0, 1, 2, 3" . If this property is not specified, all partitions are consumed by default. |
kafka_offsets | No | The starting offset from which to consume data in a Kafka partition as specified in kafka_partitions . If this property is not specified, the Routine Load job consumes data starting from the latest offsets in kafka_partitions . Valid values:
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000" . |
property.kafka_default_offsets | No | The default starting offset for all consumer partitions. The supported values for this property are same as those for the kafka_offsets property. |
confluent.schema.registry.url | No | The URL of the Schema Registry where the Avro schema is registered. CelerData retrieves the Avro schema by using this URL. The format is as follows:confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname or ip address>[:<port>] |
More data source-related properties
You can specify additional data source (Kafka) related properties, which are equivalent to using the Kafka command line --property
. For more supported properties, see the properties for a Kafka consumer client in librdkafka configuration properties.
- Specify the default initial offset for all the partitions to be consumed
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
- Specify the ID of the consumer group used by the Routine Load job
"property.group.id" = "group_id_0"
If property.group.id
is not specified, CelerData generates a random value based on the name of the Routine Load job, in the format of {job_name}_{random uuid}
, such as simple_job_0a64fe25-3983-44b2-a4d8-f52d3af4c3e8
.
Specify the authentication mechanism used by BEs when accessing Kafka.
Access Kafka with SASL authentication:
-- enable SASL mechanism and non-encrypted channel "property.security.protocol" = "SASL_PLAINTEXT" -- specify the SASL mechanism as PLAIN which is a simple username/password authentication mechanism "property.sasl.mechanism" = "PLAIN" -- SASL username "property.sasl.username" = "admin" -- SASL password "property.sasl.password" = "xxxxxx"
Column mapping
Configure column mapping for loading CSV-formatted data
If the columns of the CSV-formatted data can be mapped one on one in sequence to the columns of the table, you do not need to configure the column mapping between the data and the table.
If the columns of the CSV-formatted data cannot be mapped one on one in sequence to the columns of the table, you need to use the columns
parameter to configure the column mapping between the data file and the table. This includes the following two use cases:
Same number of columns but different column sequence. Also, the data from the data file does not need to be computed by functions before it is loaded into the matching table columns.
In the
columns
parameter, you need to specify the names of the table columns in the same sequence as how the data file columns are arranged.For example, the table consists of three columns, which are
col1
,col2
, andcol3
in sequence, and the data file also consists of three columns, which can be mapped to the table columnscol3
,col2
, andcol1
in sequence. In this case, you need to specify"columns: col3, col2, col1"
.
Different number of columns and different column sequence. Also, the data from the data file needs to be computed by functions before it is loaded into the matching table columns.
In the
columns
parameter, you need to specify the names of the table columns in the same sequence as how the data file columns are arranged and specify the functions you want to use to compute the data. Two examples are as follows:- The table consists of three columns, which are
col1
,col2
, andcol3
in sequence. The data file consists of four columns, among which the first three columns can be mapped in sequence to the table columnscol1
,col2
, andcol3
and the fourth column cannot be mapped to any of the table columns. In this case, you need to temporarily specify a name for the fourth column of the data file, and the temporary name must be different from any of the table column names. For example, you can specify"columns: col1, col2, col3, temp"
, in which the fourth column of the data file is temporarily namedtemp
. - The table consists of three columns, which are
year
,month
, andday
in sequence. The data file consists of only one column that accommodates date and time values inyyyy-mm-dd hh:mm:ss
format. In this case, you can specify"columns: col, year = year(col), month=month(col), day=day(col)"
, in whichcol
is the temporary name of the data file column and the functionsyear = year(col)
,month=month(col)
, andday=day(col)
are used to extract data from the data file columncol
and loads the data into the mapping table columns. For example,year = year(col)
is used to extract theyyyy
data from the data file columncol
and loads the data into the table columnyear
.
- The table consists of three columns, which are
For more examples, see Configure column mapping.
Configure column mapping for loading JSON-formatted or Avro-formatted data
NOTE
When you load JSON or Avro data, the configuration for column mapping and transformation is the same. Therefore, in this section, JSON data is used as an example to introduce the configuration.
If the keys of the JSON-formatted data have the same names as the columns of the table, you can load the JSON-formatted data by using the simple mode. In simple mode, you do not need to specify the jsonpaths
parameter. This mode requires that the JSON-formatted data must be an object as indicated by curly brackets {}
, such as {"category": 1, "author": 2, "price": "3"}
. In this example, category
, author
, and price
are key names, and these keys can be mapped one on one by name to the columns category
, author
, and price
of the table. For examples, please see [simple mode](#Column names of the destination table are consistent with JSON keys).
If the keys of the JSON-formatted data have different names than the columns of the table, you can load the JSON-formatted data by using the matched mode. In matched mode, you need to use the jsonpaths
and COLUMNS
parameters to specify the column mapping between the JSON-formatted data and the table:
- In the
jsonpaths
parameter, specify the JSON keys in the sequence as how they are arranged in the JSON-formatted data. - In the
COLUMNS
parameter, specify the mapping between the JSON keys and the table columns:- The column names specified in the
COLUMNS
parameter are mapped one on one in sequence to the JSON-formatted data. - The column names specified in the
COLUMNS
parameter are mapped one on one by name to the table columns.
- The column names specified in the
For examples, see matched mode.
Examples
Load CSV-formatted data
This section uses CSV-formatted data as an example to describe how you can employ various parameter settings and combinations to meet your diverse loading requirements.
Prepare a dataset
Suppose you want to load CSV-formatted data from a Kafka topic named ordertest1
.Every message in the dataset includes six columns: order ID, payment date, customer name, nationality, gender, and price.
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina,Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924
Create a table
According to the columns of CSV-formatted data, create a table named example_tbl1
in the database example_db
.
CREATE TABLE example_db.example_tbl1 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`gender` varchar(26) NULL COMMENT "Gender",
`price` double NULL COMMENT "Price")
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);
Consume data starting from specified offsets for specified partitions
If the Routine Load job needs to consume data starting from specified partitions and offsets, you need to configure the parameters kafka_partitions
and kafka_offsets
.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" ="0,1,2,3,4", -- partitions to be consumed
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000" -- corresponding initial offsets
);
Improve loading performance by increasing task parallelism
To improve loading performance and avoid accumulative consumption, you can increase task parallelism by increasing the desired_concurrent_number
value when you create the Routine Load job. Task parallelism allows splitting one Routine Load job into as many parallel tasks as possible.
Note that the actual task parallelism is determined by the minimum value among the following multiple parameters:
min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
Note
The maximum actual task parallelism is either the number of alive BE nodes or the number of partitions to be consumed.
Therefore, when the number of alive BE nodes and the number of partitions to be consumed are greater than the values of the other two parameters max_routine_load_task_concurrent_num
and desired_concurrent_number
, you can increase the values of the other two parameters to increase the actual task parallelism.
Assume that the number of partitions to be consumed is 7, the number of alive BE nodes is 5, and max_routine_load_task_concurrent_num
is the default value 5
. If you want to increase the actual task parallelism, you can set desired_concurrent_number
to 5
(the default value is 3
). In this case, the actual task parallelism min(5,7,5,5)
is configured at 5
.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5" -- set the value of desired_concurrent_number to 5
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Configure column mapping
If the sequence of columns in the CSV-formatted data is inconsistent with the columns in the destination table, assuming that the fifth column in the CSV-formatted data does not need to be imported to the destination table, you need to specify the column mapping between the CSV-formatted data and the destination table through the COLUMNS
parameter.
Destination database and table
Create the destination table example_tbl2
in the destination database example_db
according to the columns in the CSV-formatted data. In this scenario, you need to create five columns corresponding to the five columns in the CSV-formatted data, except for the fifth column that stores gender.
CREATE TABLE example_db.example_tbl2 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price` double NULL COMMENT "Price"
)
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(order_id);
Routine Load job
In this example, since the fifth column in the CSV-formatted data does not need to be loaded to the destination table, the fifth column is temporarily named temp_gender
in COLUMNS
, and the other columns are directly mapped to the table example_tbl2
.
CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Set filter conditions
If you want to load only data that meets certain conditions, you can set filter conditions in the WHERE
clause, for example, price > 100.
CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price),
WHERE price > 100 -- set the filter condition
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Enable strict mode to filter out rows with NULL values
In PROPERTIES
, you can set "strict_mode" = "true"
, which means that the Routine Load job is in strict mode. If there is a NULL
value in a source column, but the destination table column does not allow NULL values, the row that holds a NULL value in the source column is filtered out.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"strict_mode" = "true" -- enable the strict mode
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Set error tolerance
If your business scenario has low tolerance for unqualified data, you need to set the error detection window and the maximum number of error data rows by configuring the parameters max_batch_rows
and max_error_number
. When the number of error data rows within an error detection window exceeds the value of max_error_number
, the Routine Load job pauses.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"max_batch_rows" = "100000",-- The value of max_batch_rows multiplied by 10 equals the error detection window.
"max_error_number" = "100" -- The maximum number of error data rows allowed within an error detection window.
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Setting trim_space, enclose, and escape
Suppose you want to load CSV-formatted data from a Kafka topic named test_csv
. Every message in the dataset includes six columns: order ID, payment date, customer name, nationality, gender, and price.
"2020050802" , "2020-05-08" , "Johann Georg Faust" , "Deutschland" , "male" , "895"
"2020050802" , "2020-05-08" , "Julien Sorel" , "France" , "male" , "893"
"2020050803" , "2020-05-08" , "Dorian Grey\,Lord Henry" , "UK" , "male" , "1262"
"2020050901" , "2020-05-09" , "Anna Karenina" , "Russia" , "female" , "175"
"2020051001" , "2020-05-10" , "Tess Durbeyfield" , "US" , "female" , "986"
"2020051101" , "2020-05-11" , "Edogawa Conan" , "japan" , "male" , "8924"
If you want to load all data from the Kafka topic test_csv
into example_tbl1
, with the intention of removing the spaces preceding and following column separators and setting enclose
to "
and escape
to \
, run the following command:
CREATE ROUTINE LOAD example_db.example_tbl1_test_csv ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"trim_space"="true",
"enclose"="\"",
"escape"="\\",
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic"="test_csv",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);
Load JSON-formatted data
Table column names consistent with JSON key names
Prepare a dataset
For example, the following JSON-formatted data exists in the Kafka topic ordertest2
.
{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}
Note Each JSON object must be in one Kafka message. Otherwise, an error that indicates a failure in parsing JSON-formatted data occurs.
Destination database and table
Create table example_tbl3
in the destination database example_db
in the CelerData cluster. The column names are consistent with the keys names in the JSON-formatted data.
CREATE TABLE example_db.example_tbl3 (
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL COMMENT "Price")
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
DISTRIBUTED BY HASH(commodity_id);
Routine Load job
You can use the simple mode for the Routine Load job. That is, you do not need to specify jsonpaths
and COLUMNS
parameters when creating the Routine Load job. CelerData extracts the keys of JSON-formatted data in the topic ordertest2
of the Kafka cluster according to the column names of the destination table example_tbl3
and loads the JSON-formatted data into the destination table.
CREATE ROUTINE LOAD example_db.example_tbl3_ordertest2 ON example_tbl3
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
Note
- If the outermost layer of the JSON-formatted data is an array structure, you need to set
"strip_outer_array"="true"
inPROPERTIES
to strip the outermost array structure. Additionally, when you need to specifyjsonpaths
, the root element of the entire JSON-formatted data is the flattened JSON object because the outermost array structure of the JSON-formatted data is stripped.- You can use
json_root
to specify the root element of the JSON-formatted data.
Table column names different from JSON key names
Prepare a dataset
For example, the following JSON-formatted data exists in the topic ordertest2
of the Kafka cluster.
{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}
Destination database and table
Create a table named example_tbl4
in the database example_db
in the CelerData cluster. The column pay_dt
is a derived column whose values are generated by computing values of the key pay_time
in the JSON-formatted data.
CREATE TABLE example_db.example_tbl4 (
`commodity_id` varchar(26) NULL,
`customer_name` varchar(26) NULL,
`country` varchar(26) NULL,
`pay_time` bigint(20) NULL,
`pay_dt` date NULL,
`price` double SUM NULL)
AGGREGATE KEY(`commodity_id`,`customer_name`,`country`,`pay_time`,`pay_dt`)
DISTRIBUTED BY HASH(`commodity_id`);
Routine Load job
You can use the matched mode for the Routine Load job. That is, you need to specify jsonpaths
and COLUMNS
parameters when creating the Routine Load job.
You need to specify the keys of the JSON-formatted data and arrange them in sequence in the jsonpaths
parameter.
And since the values in the key pay_time
of the JSON-formatted data need to be converted to the DATE type before the values are stored in the pay_dt
column of the example_tbl4
table, you need to specify the computation by using pay_dt=from_unixtime(pay_time,'%Y%m%d')
in COLUMNS
. The values of other keys in the JSON-formatted data can be directly mapped to the example_tbl4
table.
CREATE ROUTINE LOAD example_db.example_tbl4_ordertest2 ON example_tbl4
COLUMNS(commodity_id, customer_name, country, pay_time, pay_dt=from_unixtime(pay_time, '%Y%m%d'), price)
PROPERTIES
(
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
Note
- If the outermost layer of the JSON data is an array structure, you need to set
"strip_outer_array"="true"
in thePROPERTIES
to strip the outermost array structure. Additionally, when you need to specifyjsonpaths
, the root element of the entire JSON data is the flattened JSON object because the outermost array structure of the JSON data is stripped.- You can use
json_root
to specify the root element of the JSON-formatted data.
Specify the root element of the JSON-formatted data to be loaded
You need to use json_root
to specify the root element of the JSON-formatted data to be loaded and the value must be a valid JsonPath expression.
Prepare a dataset
For example, the following JSON-formatted data exists in the topic ordertest3
of the Kafka cluster. And the root element of the JSON-formatted data to be loaded is $.RECORDS
.
{"RECORDS":[{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875},{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895},{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}]}
Destination database and table
Create a table named example_tbl3
in the database example_db
in the CelerData cluster.
CREATE TABLE example_db.example_tbl3 (
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL)
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
ENGINE=OLAP
DISTRIBUTED BY HASH(commodity_id);
Routine Load job
You can set "json_root" = "$.RECORDS"
in PROPERTIES
to specify the root element of the JSON-formatted data to be loaded. Also, since the JSON-formatted data to be loaded is in an array structure, you must also set "strip_outer_array" = "true"
to strip the outermost array structure.
CREATE ROUTINE LOAD example_db.example_tbl3_ordertest3 ON example_tbl3
PROPERTIES
(
"format" = "json",
"json_root" = "$.RECORDS",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
Load Avro-formatted data
Avro schema is simple
Suppose the Avro schema is relatively simple, and you need to load all fields of the Avro data.
Prepare a dataset
Avro schema
Create the following Avro schema file
avro_schema1.avsc
:{ "type": "record", "name": "sensor_log", "fields" : [ {"name": "id", "type": "long"}, {"name": "name", "type": "string"}, {"name": "checked", "type" : "boolean"}, {"name": "data", "type": "double"}, {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}} ] }
Register the Avro schema in the Schema Registry.
Avro data
Prepare the Avro data and send it to the Kafka topic topic_1
.
Destination database and table
According to the fields of Avro data, create a table sensor_log1
in the destination database sensor
in the CelerData cluster. The column names of the table must match the field names in the Avro data. For the data types mapping when Avro data is loaded into CelerData, see [Data types mapping](#Data types mapping).
CREATE TABLE sensor.sensor_log1 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`data` double NULL COMMENT "sensor data",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);
Routine Load job
You can use the simple mode for the Routine Load job. That is, you do not need to specify the parameter jsonpaths
when creating the Routine Load job. Execute the following statement to submit a Routine Load job named sensor_log_load_job1
to consume the Avro messages in the Kafka topic topic_1
and load the data into the table sensor_log1
in the database sensor
.
CREATE ROUTINE LOAD sensor.sensor_log_load_job1 ON sensor_log1
PROPERTIES
(
"format" = "avro"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic"= "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
Avro schema contains a nested record-type field
Suppose the Avro schema contains a nested record-type field, and you need to load the subfield in a nested record-type field into CelerData.
Prepare a dataset
Avro schema
Create the following Avro schema file
avro_schema2.avsc
. The outer Avro record includes five fields which areid
,name
,checked
,sensor_type
, anddata
in sequence. And the fielddata
has a nested recorddata_record
.{ "type": "record", "name": "sensor_log", "fields" : [ {"name": "id", "type": "long"}, {"name": "name", "type": "string"}, {"name": "checked", "type" : "boolean"}, {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}}, {"name": "data", "type": { "type": "record", "name": "data_record", "fields" : [ {"name": "data_x", "type" : "boolean"}, {"name": "data_y", "type": "long"} ] } } ] }
Register the Avro schema in the Schema Registry.
Avro data
Prepare the Avro data and send it to the Kafka topic topic_2
.
Destination database and table
According to the fields of Avro data, create a table sensor_log2
in the destination database sensor
in the CelerData cluster.
Suppose that in addition to loading the fields id
, name
, checked
, and sensor_type
of the outer Record, you also need to load the subfield data_y
in the nested Record data_record
.
CREATE TABLE sensor.sensor_log2 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type",
`data_y` long NULL COMMENT "sensor data"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);
Routine Load job
Submit the load job, use jsonpaths
to specify the fields of the Avro data that need to be loaded. Note that for the subfield data_y
in the nested Record, you need to specify its jsonpath
as "$.data.data_y"
.
CREATE ROUTINE LOAD sensor.sensor_log_load_job2 ON sensor_log2
PROPERTIES
(
"format" = "avro",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.checked\",\"$.sensor_type\",\"$.data.data_y\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
Avro schema contains a Union field
Prepare a dataset
Suppose the Avro schema contains a Union field, and you need to load the Union field into CelerData.
Avro schema
Create the following Avro schema file
avro_schema3.avsc
. The outer Avro record includes five fields which areid
,name
,checked
,sensor_type
, anddata
in sequence. And the fielddata
is of Union type and includes two elements,null
and a nested recorddata_record
.{ "type": "record", "name": "sensor_log", "fields" : [ {"name": "id", "type": "long"}, {"name": "name", "type": "string"}, {"name": "checked", "type" : "boolean"}, {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}}, {"name": "data", "type": [null, { "type": "record", "name": "data_record", "fields" : [ {"name": "data_x", "type" : "boolean"}, {"name": "data_y", "type": "long"} ] } ] } ] }
Register the Avro schema in the Schema Registry.
Avro data
Prepare the Avro data and send it to the Kafka topic topic_3
.
Destination database and table
According to the fields of Avro data, create a table sensor_log3
in the destination database sensor
in the CelerData cluster.
Suppose that in addition to loading the fields id
, name
, checked
, and sensor_type
of the outer Record, you also need to load the field data_y
of the element data_record
in the Union type field data
.
CREATE TABLE sensor.sensor_log3 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type",
`data_y` long NULL COMMENT "sensor data"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);
Routine Load job
Submit the load job, use jsonpaths
to specify the fields that need to be loaded in the Avro data. Note that for the field data_y
, you need to specify its jsonpath
as "$.data.data_y"
.
CREATE ROUTINE LOAD sensor.sensor_log_load_job3 ON sensor_log3
PROPERTIES
(
"format" = "avro",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.checked\",\"$.sensor_type\",\"$.data.data_y\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
When the value for the Union type field data
is null
, the value loaded into the column data_y
in the table is null
. When the value for the Union type field data
is a data record, the value loaded into the column data_y
is of Long type.
- CREATE ROUTINE LOAD
- Description
- Syntax
- Parameters
- Column mapping
- Examples
- Load CSV-formatted data
- Prepare a dataset
- Create a table
- Consume data starting from specified offsets for specified partitions
- Improve loading performance by increasing task parallelism
- Configure column mapping
- Destination database and table
- Routine Load job
- Set filter conditions
- Enable strict mode to filter out rows with NULL values
- Set error tolerance
- Setting trim_space, enclose, and escape
- Load JSON-formatted data
- Load Avro-formatted data
- Load CSV-formatted data