- Release Notes
- Get Started
- Clusters
- Cloud Settings
- Table Type
- Query Data Lakes
- Integration
- Query Acceleration
- Data Loading
- Concepts
- Batch load data from Amazon S3
- Batch load data from Azure cloud storage
- Load data from a local file system
- Load data from Confluent Cloud
- Load data from Amazon MSK
- Load data from Amazon Kinesis
- Data Unloading
- Data Backup
- Security
- Console Access Control
- Data Access Control
- Application keys
- Service accounts
- Use SSL connection
- Alarm
- Usage and Billing
- Organizations and Accounts
- Reference
- Amazon Web Services (AWS)
- Microsoft Azure
- SQL Reference
- Keywords
- ALL statements
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- SHOW FILE
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- SELECT
- SHOW ALTER
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- SQL Functions
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayofweek_iso
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- jodatime_format
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str_to_jodatime
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- to_iso8601
- to_tera_date
- to_tera_timestamp
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- count_if
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Geographic Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- substring_index
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Bit Functions
- Bitmap Functions
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Map Functions
- Binary Functions
- cast function
- hash function
- Cryptographic Functions
- Math Functions
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- AUTO_INCREMENT
- Generated columns
- System variables
- System limits
- Information Schema
- Overview
- be_bvars
- be_cloud_native_compactions
- be_compactions
- character_sets
- collations
- column_privileges
- columns
- engines
- events
- global_variables
- key_column_usage
- load_tracking_logs
- loads
- materialized_views
- partitions
- pipe_files
- pipes
- referential_constraints
- routines
- schema_privileges
- schemata
- session_variables
- statistics
- table_constraints
- table_privileges
- tables
- tables_config
- task_runs
- tasks
- triggers
- user_privileges
- views
- System Metadatabase
- API
- Overview
- Actions
- Clusters
- Create and Manage Clusters
- Query Clusters
- Identity and Access Management
- Organization and Account
- Usage and Billing
- Clusters
- Terraform Provider
- Run scripts
Load data from Confluent Cloud using Kafka connector
This topic introduces how to use a Kafka connector, starrocks-kafka-connector, to stream messages (events) from Confluent into CelerData. The Kafka connector guarantees at-least-once semantics.
The Kafka connector can seamlessly integrate with Kafka Connect, which allows CelerData better integrated with the Kafka ecosystem. It is a wise choice if you want to load real-time data into CelerData. Compared with Routine Load, the Kafka connector is recommended in the following scenarios:
- Load data in various other formats such as Protobuf than Routine Load which supports only CSV, JSON, and Avro formats. As long as data can be converted into JSON and CSV formats using Kafka Connect's converters, data can be loaded into CelerData via the Kafka connector.
- Customize data transformation, such as Debezium-formatted CDC data.
- Load data from multiple Kafka topics.
- Load data from Confluent Cloud.
- Need finer control over load batch sizes, parallelism, and other parameters to achieve a balance between load speed and resource utilization.
Before you begin
Make sure the Confluent cluster can connect to the CelerData cluster over the Internet.
Configure the networking for the CelerData cluster
Make sure the CelerData cluster can be connected from the Internet, especially when the CelerData cluster is in a private subnet. If the CelerData cluster is in a public subnet, the CelerData cluster can be connected over the Internet by default. This topic uses a CelerData cluster in a public subnet as an example.
Make sure that the security group with which the CelerData cluster resources are associated allows traffic from the Confluent cluster.
Sign in to the CelerData Cloud BYOC console and find the Security Group ID with which the CelerData cluster is associated:
Sign in to the Amazon VPC console.
In the left-side navigation pane, choose Security Groups. On the page that appears, find the security group with which the CelerData cluster resources are associated.
On the Inbound Rules tab, choose Edit inbound rules.
Add the following rules highlighted in the red box:
You must add ports
443
and9030
.
Basic steps
The following example demonstrates how to load Avro-formatted records from Confluent Cloud into CelerData.
- Use a source connector to generate data into a topic of the Confluent cluster. In the following example, the source connector is Datagen Source and the data format is Avro.
- Create a CelerData table.
- Use a sink connector (a custom connector, starrocks-kafka-connector, needs to be used) to load data from the topic of the Confluent cluster into the CelerData table.
Generate data into a topic of a Confluent cluster
Choose the Confluent cluster, enter its Connectors page, and click + Add Connector. Then choose Datagen Source as the source connector.
Configure the Datagen Source connector.
In the Topic selection section:
Click + Add new topic and specify the name and the partition number of the topic. In this example, the name of the topic is specified as
datagen_topic
.In the Kafka credentials section:
This example is a simple guide to quickly familiarize you with the loading process, so you can choose Global access and then click Generate API key & download.
In the Configuration section:
Select the output record value format and template. In this example, specify the format as
AVRO
and the template asorders
.In the Sizing section:
Use the default configurations.
In the Review and launch section:
Check the configurations for the Datagen source connector, and click Continue once you validate all the configurations.
On the Connectors page, check the Datagen source connector you have added.
After the Datagen source connector is running, you can verify that messages are populating the topic
datagen_topic
.
Create a table in the Celerdata cluster
Create a table in the Celerdata cluster according to the schema of Avro-formatted records in the topic datagen_topic
.
CREATE TABLE test123 (
ordertime LARGEINT,
orderid int,
itemid string,
orderunits string,
city string,
state string,
zipcode bigint
)
DISTRIBUTED BY HASH(orderid);
Load data into the Celerdata table
In this example, the sink connector, which is a custom connector (Kafka connector named starrocks-kafka-connector), is used to load Avro-formatted records from the Confluent topic into the CelerData table.
NOTE
The Kafka connector named starrocks-kafka-connector is a custom connector, and custom connectors can only be created in AWS regions supported by Confluent Cloud. For more information about the abilities and limitations of custom connectors, see Confluent Documentation.
Enter the Connectors page of the Confluent cluster, click + Add Connector and click Add plugin.
Upload the archive of Kafka connector.
Connector plugin detail:
- Connector plugin name: Enter the name of the Kafka connector, such as
StarRocks-kafka-connector
. - Custom plugin description: Enter a description for the Kafka connector.
- Connector class: Enter the Java class for the Kafka connector, which is
com.starrocks.connector.kafka.StarRocksSinkConnector
.
- Connector plugin name: Enter the name of the Kafka connector, such as
Connector type: Select the connector type as Sink.
Connector archive: Click Select connector archive and upload the ZIP file of the Kafka connector.
You can download the TAR file of the Kafka connector from Github and extract the TAR file. You need to compress all the files into a ZIP file and then upload the ZIP file.
Configure and launch the Kafka connector.
Enter the Connectors page of the Confluent cluster, click + Add Connector and select the StarRocks-kafka-connector.
In the Kafka credentials section:
This example is a simple guide to quickly familiarize you with the loading process, so you can choose Global access and click Generate API key & download.
In the Configuration section:
Add the configurations in key-value pair or JSON format. This example adds configurations in the JSON format.
The complete JSON-formatted configurations are as follows:
{ "topics": "datagen_topic", "confluent.custom.schema.registry.auto": "true", "value.converter": "io.confluent.connect.avro.AvroConverter", "connect.timeoutms": "6000", "starrocks.http.url": "https://81xxxxxxx.cloud-app.celerdata.org:443", "starrocks.username": "xxxxxx", "starrocks.password": "xxxxxx", "starrocks.database.name": "test", "starrocks.topic2table.map": "datagen_topic:test123", "sink.properties.strip_outer_array": "true", "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]" }
Configurations:
For the supported configurations and descriptions, see Parameters. Take special notice of the following configurations:
starrocks.http.url
: Enter the HTTP URL of your CelerData cluster in the format ofhttps://<endpoint>:443
. The endpoint can be found in the connection section on the Overview page of the CelerData cluster.Also, some configurations need to be added according to the source data format.
In this example, the source data is AVRO records and Confluent Cloud Schema Registry is used, so the required configurations also need to include
confluent.custom.schema.registry.auto
andvalue.converter
. For more information, see Confluent Documentation.{ "topics": "datagen_topic", "confluent.custom.schema.registry.auto": "true", "value.converter": "io.confluent.connect.avro.AvroConverter", "connect.timeoutms": "6000", "starrocks.http.url": "https://81xxxxxxx.cloud-app.celerdata.org:443", "starrocks.username": "xxxxxx", "starrocks.password": "xxxxxx", "starrocks.database.name": "test", "starrocks.topic2table.map": "datagen_topic:test123", "sink.properties.strip_outer_array": "true", "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]" }
If the source data is JSON records without Schema Registry, the required configurations also need to include:
"key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
The complete configurations can be as follows:
{ "topics": "datagen_topic", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "connect.timeoutms": "6000", "starrocks.http.url": "https://81xxxxxxx.cloud-app.celerdata.org:443", "starrocks.username": "xxxxxx", "starrocks.password": "xxxxxx", "starrocks.database.name": "test", "starrocks.topic2table.map": "datagen_topic:test123", "sink.properties.strip_outer_array": "true", "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]" }
In the Networking section:
Enter the format of a connection endpoint is
<endpoint>:443:TCP
where the endpoint is same as the one instarrocks.http.url
whose format ishttps://<endpoint>:443
.In the Sizing section:
Use the default settings.
In the Review and launch section:
Check the configurations for the Kafka connector, and click Continue once you validate all the configurations.
On the Connectors page, check the Kafka connector you have launched.
Once the Kafka connector completes provisioning, the status changes to running.
After both connectors are running, check the data in the CelerData table.
Parameters
name
- Required: YES
- Default value:
- Description: Name for this Kafka connector. It must be globally unique among all Kafka connectors within this Kafka Connect cluster. For example, starrocks-kafka-connector.
connector.class
- Required: YES
- Default value: com.starrocks.connector.kafka.SinkConnector
- Description: Class used by this Kafka connector's sink.
topics
- Required: YES
- Default value:
- Description: One or more topics to subscribe to, where each topic corresponds to a CelerData table. By default, CelerData assumes that the topic name matches the name of the CelerData table. So CelerData determines the target CelerData table by using the topic name. Please choose to fill in either
topics
ortopics.regex
(below), but not both. However, if the CelerData table name is not the same as the topic name, then use the optionalstarrocks.topic2table.map
parameter (below) to specify the mapping from topic name to table name.
topics.regex
- Required:
- Default value:
- Description: Regular expression to match the one or more topics to subscribe to. For more description, see
topics
. Please choose to fill in eithertopics.regex
ortopics
(above), but not both.
starrocks.topic2table.map
- Required: NO
- Default value:
- Description: Mapping of the CelerData table name and the topic name when the topic name is different from the CelerData table name. The format is
<topic-1>:<table-1>,<topic-2>:<table-2>,...
.
starrocks.http.url
- Required: YES
- Default value:
- Description: HTTP URL of your CelerData cluster in the format is
https://<endpoint>:443
. The endpoint can be found in the connection section on the CelerData cluster's Overview page.
starrocks.database.name
- Required: YES
- Default value:
- Description: Name of the CelerData database.
starrocks.username
- Required: YES
- Default value:
- Description: Username of your CelerData cluster account. The user needs the INSERT privilege on the CelerData table.
starrocks.password
- Required: YES
- Default value:
- Description: Password of your CelerData cluster account.
key.converter
- Required: NO
- Default value: Key converter used by Kafka Connect cluster
- Description: Key converter for the sink connector (Kafka-connector-starrocks), which is used to deserialize the keys of Kafka data. The default key converter is the one used by Kafka Connect cluster.
value.converter
- Required: NO
- Default value: Value converter used by Kafka Connect cluster
- Description: Value converter for the sink connector (kafka-connector-starrocks), which is used to deserialize the values of Kafka data. The default value converter is the one used by Kafka Connect cluster.
key.converter.schema.registry.url
- Required: NO
- Default value:
- Description: Schema registry URL for the key converter.
value.converter.schema.registry.url
- Required: NO
- Default value:
- Description: Schema registry URL for the value converter.
tasks.max
- Required: NO
- Default value: 1
- Description: Maximum number of task threads that the Kafka connector can create, which is usually the same as the number of CPU cores on the worker nodes in the Kafka Connect cluster. You can tune this parameter to control load performance.
bufferflush.maxbytes
- Required: NO
- Default value: 94371840(90M)
- Description: Maximum size of data that can be accumulated in memory before being sent to CelerData at a time. The maximum value ranges from 64 MB to 10 GB. Keep in mind that the Stream Load SDK buffer may create multiple Stream Load jobs to buffer data. Therefore, the threshold mentioned here refers to the total data size.
bufferflush.intervalms
- Required: NO
- Default value: 300000
- Description: Interval for sending a batch of data which controls the load latency. Range: [1000, 3600000].
connect.timeoutms
- Required: NO
- Default value: 1000
- Description: Timeout for connecting to the HTTP URL. Range: [100, 60000].
sink.properties.*
- Required:
- Default value:
- Description: Stream Load parameters o control load behavior. For example, the parameter
sink.properties.format
specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see STREAM LOAD.
sink.properties.format
- Required: NO
- Default value: json
- Description: Format used for Stream Load. The Kafka connector will transform each batch of data to the format before sending them to CelerData. Valid values:
csv
andjson
. For more information, see CSV parameters and JSON parameters.
Limits
- It is not supported to flatten a single message from a Kafka topic into multiple data rows and load into CelerData.
- The Kafka connector's sink guarantees at-least-once semantics.