- Release Notes
- Get Started
- Clusters
- Cloud Settings
- Table Type
- Query Data Lakes
- Integration
- Query Acceleration
- Data Loading
- Concepts
- Batch load data from Amazon S3
- Batch load data from Azure cloud storage
- Load data from a local file system
- Load data from Confluent Cloud
- Load data from Amazon MSK
- Load data from Amazon Kinesis
- Data Unloading
- Data Backup
- Security
- Console Access Control
- Data Access Control
- Application keys
- Service accounts
- Use SSL connection
- Alarm
- Usage and Billing
- Organizations and Accounts
- Reference
- Amazon Web Services (AWS)
- Microsoft Azure
- SQL Reference
- Keywords
- ALL statements
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- SHOW FILE
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- SELECT
- SHOW ALTER
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- SQL Functions
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayofweek_iso
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- jodatime_format
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str_to_jodatime
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- to_iso8601
- to_tera_date
- to_tera_timestamp
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- count_if
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Geographic Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- substring_index
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Bit Functions
- Bitmap Functions
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Map Functions
- Binary Functions
- cast function
- hash function
- Cryptographic Functions
- Math Functions
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- AUTO_INCREMENT
- Generated columns
- System variables
- System limits
- Information Schema
- Overview
- be_bvars
- be_cloud_native_compactions
- be_compactions
- character_sets
- collations
- column_privileges
- columns
- engines
- events
- global_variables
- key_column_usage
- load_tracking_logs
- loads
- materialized_views
- partitions
- pipe_files
- pipes
- referential_constraints
- routines
- schema_privileges
- schemata
- session_variables
- statistics
- table_constraints
- table_privileges
- tables
- tables_config
- task_runs
- tasks
- triggers
- user_privileges
- views
- System Metadatabase
- API
- Overview
- Actions
- Clusters
- Create and Manage Clusters
- Query Clusters
- Identity and Access Management
- Organization and Account
- Usage and Billing
- Clusters
- Terraform Provider
- Run scripts
Load data from Amazon MSK
This topic introduces how to create a Routine Load job to stream messages (events) from Amazon Managed Streaming for Apache Kafka (Amazon MSK) into CelerData.
To continuously load messages of a stream from an Amazon MSK cluster into CelerData, you can store the message stream in a Kafka topic on the Amazon MSK cluster and create a Routine Load job to consume the messages. The Routine Load job persists in CelerData, generates a series of load tasks to consume the messages in all or some of the partitions in the topic, and loads the messages into CelerData.
A Routine Load job supports exactly-once delivery semantics to ensure the data loaded into CelerData is neither lost nor duplicated.
Supported data file formats
Routine Load now supports consuming CSV and JSON formatted data from an MSK cluster.
NOTE
As for data in CSV format, CelerData supports UTF-8 encoded strings within 50 bytes as column separators. Commonly used column separators include comma (,), tab, and pipe (|).
How it works
Loading data from an MSK cluster and a Confluent Cloud cluster both use Routine Load. For basic concepts and principals about Routine Load, see Load data from Confluent Cloud through Routine Load.
Before you begin
Prepare an MSK cluster. You can quickly create an MSK cluster by following the instructions provided in MSK getting started. You can also use an existing MSK cluster.
Notice
The VPC and subnets of an MSK cannot be changed after the cluster is created.
If the MSK cluster and CelerData cluster are in the same VPC:
Make sure that the security group to which the CelerData cluster resources belong can accept traffic that comes from the security group to which the MSK cluster resources belong.
- Sign in to the Amazon VPC console.
- In the left-side navigation pane, choose Security Groups. Find the security group to which the CelerData cluster resources belong.
- On the Inbound Rules tab, choose Edit inbound rules.
- Choose Add rule.
- Set Type to All traffic.
- For Source, select Custom and then select the security group to which the MSK cluster resources belong.
- Click Save rules.
Configure the authentication and encryption for the CelerData cluster to access the MSK cluster:
Configure the security settings for the MSK cluster.
Choose the MSK cluster.
Select Actions > Edit security settings.
In the Edit security settings dialog box, select SASL/SCRAM for Access control method.
Note
- If you select SASL/SCRAM for the Access control method, Amazon MSK turns on TLS encryption for all traffic between clients and brokers.
- You can also select Unauthenticated access for Access control method and PLAINTEXT for Encryption method to have a simple test. Because these security settings don't use any authentication and encryption,these are not safe for production environments.
Create a customer managed key and a secret for authentication and encryption for the MSK cluster.
Create a customer managed key on Key Management Service (KMS) by following the instructions provided in Creating keys.
Create a secret on AWS secrets Manager by following the instructions provided in Set up SASL/SCRAM authentication. Take note of the following configurations:
In the Choose secret type section, configure the parameters as follows:
Secret type: Select Other type of secrets.
Key/value pairs: Enter your secret. The secret is used as the property.sasl.username and property.sasl.password for SASL/SCRAM authentication when you create a Routine Load job.
Encryption Key: Associate the secret with the customer managed key created above for encryption.
Notice
Don't select aws/secretsmanager as Encryption Key. Because Secrets Manager uses the default AWS KMS key for a secret by default, and a secret created with the default AWS KMS key cannot be associated with an Amazon MSK cluster.
In the Configure secret section, check that Secret name begins with the prefix AmazonMSK_.
After you create the secret, copy the Secret ARN of the secret. You need to use the Secret ARN when associating the secret with the MSK cluster.
Associate the secret with the MSK cluster.
Choose the MSK cluster.
Select Actions > Edit security settings.
Click Associate secrets, and paste the Secret ARN copied above.
If the MSK cluster and CelerData are in the different VPCs, configure the networking, authentication, and encryption for the CelerData cluster to access the MSK cluster as follows:
Networking
Option 1: The CelerData cluster connects to the MSK cluster within AWS You can use AWS VPC peering. For other connection options within AWS, see other methods.
Option 2: The CelerData cluster connects to the MSK cluster over the Internet
Configure the MSK networking Turn on public access.
Notice
make sure that the MSK cluster meets all of the conditions listed in the public access.
Configure the CelerData networking
Make sure the subnet of the CelerData cluster can connect to the Internet.
- If the CelerData cluster is in a private subnet, make sure that a NAT gateway is configured for the private subnet so that the CelerData cluster can connect to the internet.
- If the CelerData cluster is in a public subnet, the CelerData cluster can connect to the Internet by default.
For security purposes, add the following outbound rule to the security group for the CelerData cluster, so that the CelerData cluster can access the MSK cluster.
- Set Port range. You can find the port range in the Public endpoint in the Bootstrap servers section from the MSK cluster's View client information page. The port range can be 9092 or 9096 according to MSK cluster's authentication.
- For Destination, select Custom and then set the public IP address of the MSK cluster which can be found in the Public endpoint. You can also use 0.0.0.0/0.
Authentication and encryption
The authentication and encryption configuration when the MSK cluster and CelerData cluster are in different VPCs are the same as the configuration when the two clusters are in the same VPC.
Basic operations
Because loading data from an MSK cluster and a Confluent Cloud cluster both use Routine Load, the following only introduces the information you need to pay special attention to when you load data from the MSK cluster. For more details about Routine Load, see Load data from Confluent Cloud.
Create a Routine Load job
Load CSV-formatted data
This section describes how to create a Routine Load job to consume CSV-formatted data from an MSK cluster, and load the data into CelerData.
Notice
Before create a Routine Load job, you need to prepare a dataset as source data and create a table in the CelerData cluster.
Execute the following statement to create a Routine Load job named example_tbl1_ordertest1
to consume the messages from the topic ordertest1
and load the data into the table example_tbl1
. The load job consumes the messages from the initial offset in the specified partitions of the topic.
Pay attention to the properties in bold fonts.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol"="SASL_SSL",
"property.sasl.mechanism"="SCRAM-SHA-512",
"property.sasl.username"="<username>",
"property.sasl.password"="<password>"
);
Kafka broker list
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
You need to use the property
kafka_broker_list
to specify a list of host and port pairs ts for establishing the connection to the MSK cluster. You can find the list of host and port pairs in Private endpoint in the Bootstrap servers section from the MSK cluster's View client information page.Notice
If the CelerData cluster accesses the MSK cluster through the Internet, you need to use the
Public endpoint
in theBootstrap servers
section from the MSK cluster's View client information page.Authentication and encryption
"property.security.protocol"="SASL_SSL", "property.sasl.mechanism"="SCRAM-SHA-512", "property.sasl.username"="***", "property.sasl.password"="***",
property.security.protocol
: You can useSASL_SSL
.Notice
We do not recommend using
SASL_PLAINTEXT
in production environments.property.sasl.mechanism
: You can useSCRAM-SHA-512
.property.sasl.username
andproperty.sasl.password
: the secret created above which is associated with the MSK cluster.
Load JSON-formatted data
This section describes how to create a Routine Load job to consume JSON-formatted data in an MSK cluster, and load the data into CelerData.
Notice
Before create a Routine Load job, you need to prepare a dataset as source data and create a table in the CelerData cluster.
Execute the following statement to create a Routine Load job named example_tbl2_ordertest2
to consume the messages from the topic ordertest2
and load the data into the table example_tbl2
. The load task consumes the messages from the initial offset in the specified partitions of the topic.
CREATE ROUTINE LOAD example_db.example_tbl2_ordertest2 ON example_tbl2
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
"format" ="json",
"jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2",
"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol"="SASL_SSL",
"property.sasl.mechanism"="SCRAM-SHA-512",
"property.sasl.username"="<username>",
"property.sasl.password"="<password>"
);
Check a load job and task
Check a load job
Execute the SHOW ROUTINE LOAD statement to check the status of the load job.
Check a load task
Execute the SHOW ROUTINE LOAD TASK statement to check the load tasks of the load job.
Pause a load job
You can execute the PAUSE ROUTINE LOAD statement to pause a load job.
PAUSE ROUTINE LOAD FOR example_tbl2_ordertest2;
Resume a load job
You can execute the RESUME ROUTINE LOAD statement to resume a paused load job.
RESUME ROUTINE LOAD FOR example_tbl2_ordertest2;
Alter a load job
Before altering a load job, you must pause it with the PAUSE ROUTINE LOAD statement. Then you can execute the ALTER ROUTINE LOAD to alter the load job. After altering the load job, you can execute the RESUME ROUTINE LOAD statement to resume it, and check the status of the load job by using the SHOW ROUTINE LOAD statement.
ALTER ROUTINE LOAD FOR example_tbl2_ordertest2
PROPERTIES
(
"desired_concurrent_number" = "6"
)
FROM kafka
(
"kafka_partitions" = "0,1,2,3,4,5,6,7",
"kafka_offsets" = "OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_END,OFFSET_END,OFFSET_END,OFFSET_END"
);
Stop a load job
You can execute the STOP ROUTINE LOAD statement to stop a load job.
STOP ROUTINE LOAD FOR example_tbl2_ordertest2;
FAQ
Issue description: When you run the SHOW ROUTINE LOAD statement on a Routine Load job, the job reports an error
failed to create kafka consumer: sasl.username and sasl.password must be set
in theReasonOfStateChanged
field. Solutions: You should set theproperty.sasl.username
andproperty.sasl.password
properties when using SASL/SCRAM as the authentication mechanism.Issue description: When you run the SHOW ROUTINE LOAD statement on a Routine Load job, the job reports an error
failed to get partition meta: Local: Broker transport failure
in theReasonOfStateChanged
field.Error 1064: failed to send proxy request to TNetworkAddress(hostname:10.0.1.2, port:8060) err failed to send proxy request to TNetworkAddress(hostname:10.0.1.2, port:8060) err [failed to get partition meta: Local: Broker transport failure]
Possible causes: TLS channel cannot be established.
Solutions: You can perform troubleshooting by disabling the SSL certificate verification from the CelerData cluster to the MSK cluster.
Make sure the Kafka broker list is correct.
Shut down the TLS channel temporarily by disabling the SSL certificate verification from CelerData to the MSK cluster.
"property.enable.ssl.certificate.verification"="false"
Notice
- Disabling the SSL certificate verification is not safe, you must turn it off in your production environments.
- You should generate a temporary secret for the MSK cluster as the
property.sasl.username
andproperty.sasl.password
properties for security purposes, because the information likeproperty.sasl.username
andproperty.sasl.password
is not secure when the CelerData cluster connects to the MSK cluster without using the SSL certificate verification.
If the Routine Load job then successfully loads data into CelerData, there may be some problems with the SSL certificate. For example, CelerData cannot find the CA root certificate automatically. In this situation, you can manually specify the CA root certificate for CelerData by adding the property below. For more information about the CA root certificate, see librdkafka - INTRODUCTION.md # SSL.
"property.ssl.ca.location"="/etc/ssl/certs/ca-bundle.crt"