- Release Notes
- Introduction to CelerData Cloud Serverless
- Quick Start
- Sign up for CelerData Cloud Serverless
- A quick tour of the console
- Connect to CelerData Cloud Serverless
- Create an IAM integration
- Create and assign a warehouse
- Create an external catalog
- Load data from cloud storage
- Load data from Apache Kafka/Confluent Cloud
- Try your first query
- Invite new users
- Design data access control policy
- Warehouses
- Catalog, database, table, view, and MV
- Overview of database objects
- Catalog
- Table types
- Asynchronous materialized views
- Data Loading
- Data access control
- Networking and private connectivity
- Usage and Billing
- Organization and Account
- Integration
- Query Acceleration
- Reference
- AWS IAM policies
- Information Schema
- Overview
- be_bvars
- be_cloud_native_compactions
- be_compactions
- character_sets
- collations
- column_privileges
- columns
- engines
- events
- global_variables
- key_column_usage
- load_tracking_logs
- loads
- materialized_views
- partitions
- pipe_files
- pipes
- referential_constraints
- routines
- schema_privileges
- schemata
- session_variables
- statistics
- table_constraints
- table_privileges
- tables
- tables_config
- task_runs
- tasks
- triggers
- user_privileges
- views
- Data Types
- System Metadatabase
- Keywords
- SQL Statements
- Account Management
- Data Definition
- CREATE TABLE
- ALTER TABLE
- DROP CATALOG
- CREATE TABLE LIKE
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- DROP TABLE
- RECOVER
- USE
- CREATE MATERIALIZED VIEW
- DROP DATABASE
- ALTER MATERIALIZED VIEW
- DROP REPOSITORY
- CANCEL RESTORE
- DROP INDEX
- DROP MATERIALIZED VIEW
- CREATE DATABASE
- CREATE TABLE AS SELECT
- BACKUP
- CANCEL BACKUP
- CREATE REPOSITORY
- CREATE INDEX
- Data Manipulation
- INSERT
- SHOW CREATE DATABASE
- SHOW BACKUP
- SHOW ALTER MATERIALIZED VIEW
- SHOW CATALOGS
- SHOW CREATE MATERIALIZED VIEW
- SELECT
- SHOW ALTER
- SHOW MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- ALTER ROUTINE LOAD
- SHOW TABLES
- STREAM LOAD
- SHOW PARTITIONS
- CANCEL REFRESH MATERIALIZED VIEW
- SHOW CREATE CATALOG
- SHOW ROUTINE LOAD TASK
- SHOW RESTORE
- CREATE ROUTINE LOAD
- STOP ROUTINE LOAD
- SHOW DATABASES
- BROKER LOAD
- SHOW ROUTINE LOAD
- PAUSE ROUTINE LOAD
- SHOW SNAPSHOT
- SHOW CREATE TABLE
- CANCEL LOAD
- REFRESH MATERIALIZED VIEW
- SHOW REPOSITORIES
- SHOW LOAD
- Administration
- DESCRIBE
- SQL Functions
- Function List
- String Functions
- CONCAT
- HEX
- LOWER
- SPLIT
- LPAD
- SUBSTRING
- PARSE_URL
- INSTR
- REPEAT
- LCASE
- REPLACE
- HEX_DECODE_BINARY
- RPAD
- SPLIT_PART
- STRCMP
- SPACE
- CHARACTER_LENGTH
- URL_ENCODE
- APPEND_TAILING_CHAR_IF_ABSENT
- LTRIM
- HEX_DECODE_STRING
- URL_DECODE
- LEFT
- STARTS_WITH
- CONCAT
- GROUP_CONCAT
- STR_TO_MAP
- STRLEFT
- STRRIGHT
- MONEY_FORMAT
- RIGHT
- SUBSTRING_INDEX
- UCASE
- TRIM
- FIND_IN_SET
- RTRIM
- ASCII
- UPPER
- REVERSE
- LENGTH
- UNHEX
- ENDS_WITH
- CHAR_LENGTH
- NULL_OR_EMPTY
- LOCATE
- CHAR
- Predicate Functions
- Map Functions
- Binary Functions
- Geospatial Functions
- Lambda Expression
- Utility Functions
- Bitmap Functions
- BITMAP_SUBSET_LIMIT
- TO_BITMAP
- BITMAP_AGG
- BITMAP_FROM_STRING
- BITMAP_OR
- BITMAP_REMOVE
- BITMAP_AND
- BITMAP_TO_BASE64
- BITMAP_MIN
- BITMAP_CONTAINS
- SUB_BITMAP
- BITMAP_UNION
- BITMAP_COUNT
- BITMAP_UNION_INT
- BITMAP_XOR
- BITMAP_UNION_COUNT
- BITMAP_HAS_ANY
- BITMAP_INTERSECT
- BITMAP_AND_NOT
- BITMAP_TO_STRING
- BITMAP_HASH
- INTERSECT_COUNT
- BITMAP_EMPTY
- BITMAP_MAX
- BASE64_TO_ARRAY
- BITMAP_TO_ARRAY
- Struct Functions
- Aggregate Functions
- RETENTION
- MI
- MULTI_DISTINCT_SUM
- WINDOW_FUNNEL
- STDDEV_SAMP
- GROUPING_ID
- HLL_HASH
- AVG
- HLL_UNION_AGG
- COUNT
- BITMAP
- HLL_EMPTY
- SUM
- MAX_BY
- PERCENTILE_CONT
- COVAR_POP
- PERCENTILE_APPROX
- HLL_RAW_AGG
- STDDEV
- CORR
- COVAR_SAMP
- MIN_BY
- MAX
- VAR_SAMP
- STD
- HLL_UNION
- APPROX_COUNT_DISTINCT
- MULTI_DISTINCT_COUNT
- VARIANCE
- ANY_VALUE
- COUNT_IF
- GROUPING
- PERCENTILE_DISC
- Array Functions
- ARRAY_CUM_SUM
- ARRAY_MAX
- ARRAY_LENGTH
- ARRAY_REMOVE
- UNNEST
- ARRAY_SLICE
- ALL_MATCH
- ARRAY_CONCAT
- ARRAY_SORT
- ARRAY_POSITION
- ARRAY_DIFFERENCE
- ARRAY_CONTAINS
- ARRAY_JOIN
- ARRAY_INTERSECT
- CARDINALITY
- ARRAY_CONTAINS_ALL
- ARRAYS_OVERLAP
- ARRAY_MIN
- ARRAY_MAP
- ELEMENT_AT
- ARRAY_APPEND
- ARRAY_SORTBY
- ARRAY_TO_BITMAP
- ARRAY_GENERATE
- ARRAY_AVG
- ARRAY_FILTER
- ANY_MATCH
- REVERSE
- ARRAY_AGG
- ARRAY_DISTINCT
- ARRAY_SUM
- Condition Functions
- Math Functions
- Date and Time Functions
- DAYNAME
- MINUTE
- FROM_UNIXTIME
- HOUR
- MONTHNAME
- MONTHS_ADD
- ADD_MONTHS
- DATE_SUB
- PREVIOUS_DAY
- TO_TERA_DATA
- MINUTES_SUB
- WEEKS_ADD
- HOURS_DIFF
- UNIX_TIMESTAMP
- DAY
- DATE_SLICE
- DATE
- CURTIME
- SECONDS_SUB
- MONTH
- WEEK
- TO_DATE
- TIMEDIFF
- MONTHS_DIFF
- STR_TO_JODATIME
- WEEK_ISO
- MICROSECONDS_SUB
- TIME_SLICE
- MAKEDATE
- DATE_TRUNC
- JODATIME
- DAYOFWEEK
- YEARS_SUB
- TIMESTAMP_ADD
- HOURS_SUB
- STR2DATE
- TIMESTAMP
- FROM_DAYS
- WEEK_OF_YEAR
- YEAR
- TIMESTAMP_DIFF
- TO_TERA_TIMESTAMP
- DAYOFMONTH
- DAYOFYEAR
- DATE_FORMAT
- MONTHS_SUB
- NEXT_DAY
- MINUTES_DIFF
- DATA_ADD
- MINUTES_ADD
- CURDATE
- DAY_OF_WEEK_ISO
- CURRENt_TIMESTAMP
- STR_TO_DATE
- LAST_DAY
- WEEKS_SUB
- TO_DAYS
- DATEDIFF
- NOW
- TO_ISO8601
- TIME_TO_SEC
- QUARTER
- SECONDS_DIFF
- UTC_TIMESTAMP
- DATA_DIFF
- SECONDS_ADD
- ADDDATE
- WEEKSDIFF
- CONVERT_TZ
- MICROSECONDS_ADD
- SECOND
- YEARS_DIFF
- YEARS_ADD
- HOURS_ADD
- DAYS_SUB
- DAYS_DIFF
- Cryptographic Functions
- Percentile Functions
- Bit Functions
- JSON Functions
- Hash Functions
- Scalar Functions
- Table Functions
Load streaming data from Confluent Cloud using Kafka connector
This topic introduces how to use a Kafka connector, starrocks-kafka-connector, to stream messages (events) from Confluent into CelerData. The Kafka connector guarantees at-least-once semantics.
The Kafka connector can seamlessly integrate with Kafka Connect, which allows CelerData better integrated with the Kafka ecosystem. It is a wise choice if you want to load real-time data into CelerData. Compared with Routine Load, the Kafka connector is recommended in the following scenarios:
- Load data in various other formats such as Protobuf than Routine Load which supports only CSV, JSON, and Avro formats. As long as data can be converted into JSON and CSV formats using Kafka Connect's converters, data can be loaded into CelerData via the Kafka connector.
- Customize data transformation, such as Debezium-formatted CDC data.
- Load data from multiple Kafka topics.
- Load data from Confluent Cloud.
- Need finer control over load batch sizes, parallelism, and other parameters to achieve a balance between load speed and resource utilization.
Basic steps
The following example demonstrates how to load AVRO records from Confluent Cloud into CelerData.
- Use a source connector to generate data into a topic of the Confluent cluster. In the following example, the source connector is Datagen Source and the data format is AVRO.
- Create a CelerData table.
- Use a sink connector (a custom connector, starrocks-kafka-connector, needs to be used) to load data from the topic of the Confluent cluster into the CelerData table.
Generate data into a topic of a Confluent cluster
Choose the Confluent cluster, enter its Connectors page, and click + Add Connector. Then choose Datagen Source as the source connector.
Configure the Datagen Source connector.
In the Topic selection section:
Click + Add new topic and specify the name and the partition number of the topic. In this example, the name of the topic is specified as
datagen_topic
.In the Kafka credentials section:
This example is a simple guide to quickly familiarize you with the loading process, so you can choose Global access and then click Generate API key & download.
In the Configuration section:
Select the output record value format and template. In this example, specify the format as
AVRO
and the template asorders
.In the Sizing section:
Use the default configurations.
In the Review and launch section:
Check the configurations for the Datagen source connector, and click Continue once you validate all the configurations.
On the Connectors page, check the Datagen source connector you have added.
After the Datagen source connector is running, you can verify that messages are populating the topic
datagen_topic
.
Create a table in CelerData
Create a table in the CelerData cluster according to the schema of Avro-formatted records in the topic datagen_topic
.
CREATE TABLE test123 (
ordertime LARGEINT,
orderid int,
itemid string,
orderunits string,
city string,
state string,
zipcode bigint
)
DISTRIBUTED BY HASH(orderid);
Load data into the CelerData table
In this example, the sink connector, which is a custom connector (Kafka connector named starrocks-kafka-connector), is used to load Avro-formatted records from the Confluent topic into the CelerData table.
Note
The Kafka connector named starrocks-kafka-connector is a custom connector, and custom connectors can only be created in AWS regions supported by Confluent Cloud. For more information about the abilities and limitations of custom connectors, see Confluent Documentation.
Enter the Connectors page of the Confluent cluster, click + Add Connector and click Add plugin.
Upload the archive of Kafka connector.
Connector plugin details:
- Connector plugin name: Enter the name of the Kafka connector, such as
starrocks-kafka-connector
. - Custom plugin description: Enter a description for the Kafka connector.
- Connector class: Enter the Java class for the Kafka connector, which is
com.starrocks.connector.kafka.StarRocksSinkConnector
.
- Connector plugin name: Enter the name of the Kafka connector, such as
Connector type: Select the connector type as Sink.
Connector archive: Click Select connector archive and upload the ZIP file of the Kafka connector. starrocks-kafka-connector.
You can download the TAR file of the Kafka connector from Github and extract the TAR file. You need to compress all the files into a ZIP file and then upload the ZIP file.
Configure and launch the Kafka connector.
Enter the Connectors page of the Confluent cluster, click
+ Add Connector
and select the StarRocks-kafka-connector.In the Kafka credentials section:
This example is a simple guide to quickly familiarize you with the loading process, so you can choose Global access and click Generate API key & download.
In the Configuration section:
Add the configurations in key-value pair or JSON format. This example adds configurations in the JSON format.
The complete JSON-formatted configurations:
{ "topics": "datagen_topic", "confluent.custom.schema.registry.auto": "true", "value.converter": "io.confluent.connect.avro.AvroConverter", "connect.timeoutms": "6000", "starrocks.http.url": "https://ingest-xxxxxx.celerdata.com:443", "starrocks.username": "xxxxxx", "starrocks.password": "xxxxxx", "starrocks.database.name": "test", "starrocks.topic2table.map": "datagen_topic:test123", "sink.properties.strip_outer_array": "true", "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]" }
Configurations:
For the supported configurations and descriptions, see Parameters. Take special notice of the following configuration:
starrocks.http.url
: Enter the HTTP URL of your CelerData cloud in the format ofhttps://<endpoint>:443
. The endpoint can be found on the Streaming API page of the Integration section from the left-side navigation tree in the CelerData Cloud Serverless console.NOTICE
The Streaming API page is hidden by default. If you need access to the Streaming API page, contact the support team.
starrocks.username
: The username you use to log in to CelerData, in the format of<account_id>.<username>
. The user needs the INSERT privilege on the CelerData table.starrocks.password
: The password you use to log in to CelerData.Other configurations:
Some configurations need to be added according to the source data format. In this example, the source data is Avro-formatted records and Confluent Cloud Schema Registry is used, so the required configurations also need to include
confluent.custom.schema.registry.auto
andvalue.converter
. For more information, see Confluent Documentation.When the source data is JSON-formatted records without Schema Registry, the required configurations also need to include:
"key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
The complete configurations can be as follows:
{ "topics": "datagen_topic", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "connect.timeoutms": "6000", "starrocks.http.url": "https://ingest-xxxxxx.celerdata.com:443", "starrocks.username": "xxxxxx", "starrocks.password": "xxxxxx", "starrocks.database.name": "test", "starrocks.topic2table.map": "datagen_topic:test123", "sink.properties.strip_outer_array": "true", "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]" } ```
In the Networking section:
Enter the connection endpoint whose format is
<endpoint>:443:TCP
. The endpoint is same as the one instarrocks.http.url
whose format ishttps://<endpoint>:443
.In the Sizing section:
Use the default settings.
In the Review and launch section:
Check the configurations for the Kafka connector, and click Continue once you validate all the configurations.
On the Connectors page, check the Kafka connector you have launched.
Once the Kafka connector completes provisioning, the status changes to running.
After both connectors are running, you can check the data in the CelerData table.
Parameters
name
- Required: YES
- Default value:
- Description: Name for this Kafka connector. It must be globally unique among all Kafka connectors within this Kafka Connect cluster. For example, starrocks-kafka-connector.
connector.class
- Required: YES
- Default value: com.starrocks.connector.kafka.SinkConnector
- Description: Class used by this Kafka connector's sink.
topics
- Required: YES
- Default value:
- Description: One or more topics to subscribe to, where each topic corresponds to a CelerData table. By default, CelerData assumes that the topic name matches the name of the CelerData table. So CelerData determines the target CelerData table by using the topic name. Please choose to fill in either
topics
ortopics.regex
(below), but not both. However, if the CelerData table name is not the same as the topic name, then use the optionalstarrocks.topic2table.map
parameter (below) to specify the mapping from topic name to table name.
topics.regex
- Required:
- Default value:
- Description: Regular expression to match the one or more topics to subscribe to. For more description, see
topics
. Choose to fill in eithertopics.regex
ortopics
(above), but not both.
starrocks.topic2table.map
- Required: NO
- Default value:
- Description: Mapping of the CelerData table name and the topic name when the topic name is different from the CelerData table name. The format is
<topic-1>:<table-1>,<topic-2>:<table-2>,...
.
starrocks.http.url
- Required: YES
- Default value:
- Description: HTTP URL of your CelerData cloud in the format of
https://<endpoint>:443
. The endpoint can be found on the Streaming API page of the Integration section from the left-side navigation tree in the CelerData Cloud Serverless console.
starrocks.database.name
- Required: YES
- Default value:
- Description: Name of CelerData database.
starrocks.username
- Required: YES
- Default value:
- Description: Username you use to log in to CelerData in the format of
<account_id>.<username>
. The user needs the INSERT privilege on the CelerData table.
starrocks.password
- Required: YES
- Default value:
- Description: Password you use to log in to CelerData.
key.converter
- Required: NO
- Default value: Key converter used by Kafka Connect cluster
- Description: Key converter for the sink connector (kafka-connector-starrocks), which is used to deserialize the keys of Kafka data. The default key converter is the one used by the Kafka Connect cluster.
value.converter
- Required: NO
- Default value: Value converter used by Kafka Connect cluster
- Description: Value converter for the sink connector (kafka-connector-starrocks), which is used to deserialize the values of Kafka data. The default value converter is the one used by the Kafka Connect cluster.
key.converter.schema.registry.url
- Required: NO
- Default value:
- Description: Schema registry URL for the key converter.
value.converter.schema.registry.url
- Required: NO
- Default value:
- Description: Schema registry URL for the value converter.
tasks.max
- Required: NO
- Default value: 1
- Description: rMaximum number of task threads that the Kafka connector can create, which is usually the same as the number of CPU cores on the worker nodes in the Kafka Connect cluster. You can tune this parameter to control load performance.
bufferflush.maxbytes
- Required: NO
- Default value: 94371840(90M)
- Description: Maximum size of data that can be accumulated in memory before being sent to CelerData at a time. The maximum size ranges from 64 MB to 10 GB. Keep in mind that the Stream Load SDK buffer may create multiple Stream Load jobs to buffer data. Therefore, the threshold mentioned here refers to the total data size.
bufferflush.intervalms
- Required: NO
- Default value: 300000
- Description: Interval for sending a batch of data which controls the load latency. Range: [1000, 3600000].
connect.timeoutms
- Required: NO
- Default value: 1000
- Description: Timeout for connecting to the HTTP URL. Range: [100, 60000].
sink.properties.*
- Required:
- Default value:
- Description: Stream Load parameters to control load behavior. For example, the parameter
sink.properties.format
specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see [STREAM LOAD](../sql-reference/sql-statements/data-manipulation/STREAM LOAD.md).
sink.properties.format
- Required: NO
- Default value: json
- Description: Format used for Stream Load. The Kafka connector will transform each batch of data to the format before sending them to CelerData. Valid values:
csv
andjson
. For more information, see CSV parameters and JSON parameters.
Limits
- It is not supported to flatten a single message from a Kafka topic into multiple data rows and load into CelerData.
- The Kafka connector's sink guarantees at-least-once semantics.