- Release Notes
- Introduction to CelerData Cloud Serverless
- Quick Start
- Sign up for CelerData Cloud Serverless
- A quick tour of the console
- Connect to CelerData Cloud Serverless
- Create an IAM integration
- Create and assign a warehouse
- Create an external catalog
- Load data from cloud storage
- Load data from Apache Kafka/Confluent Cloud
- Try your first query
- Invite new users
- Design data access control policy
- Warehouses
- Catalog, database, table, view, and MV
- Overview of database objects
- Catalog
- Table types
- Asynchronous materialized views
- Data Loading
- Data access control
- Networking and private connectivity
- Usage and Billing
- Organization and Account
- Integration
- Query Acceleration
- Reference
- AWS IAM policies
- Information Schema
- Overview
- be_bvars
- be_cloud_native_compactions
- be_compactions
- character_sets
- collations
- column_privileges
- columns
- engines
- events
- global_variables
- key_column_usage
- load_tracking_logs
- loads
- materialized_views
- partitions
- pipe_files
- pipes
- referential_constraints
- routines
- schema_privileges
- schemata
- session_variables
- statistics
- table_constraints
- table_privileges
- tables
- tables_config
- task_runs
- tasks
- triggers
- user_privileges
- views
- Data Types
- System Metadatabase
- Keywords
- SQL Statements
- Account Management
- Data Definition
- CREATE TABLE
- ALTER TABLE
- DROP CATALOG
- CREATE TABLE LIKE
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- DROP TABLE
- RECOVER
- USE
- CREATE MATERIALIZED VIEW
- DROP DATABASE
- ALTER MATERIALIZED VIEW
- DROP REPOSITORY
- CANCEL RESTORE
- DROP INDEX
- DROP MATERIALIZED VIEW
- CREATE DATABASE
- CREATE TABLE AS SELECT
- BACKUP
- CANCEL BACKUP
- CREATE REPOSITORY
- CREATE INDEX
- Data Manipulation
- INSERT
- SHOW CREATE DATABASE
- SHOW BACKUP
- SHOW ALTER MATERIALIZED VIEW
- SHOW CATALOGS
- SHOW CREATE MATERIALIZED VIEW
- SELECT
- SHOW ALTER
- SHOW MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- ALTER ROUTINE LOAD
- SHOW TABLES
- STREAM LOAD
- SHOW PARTITIONS
- CANCEL REFRESH MATERIALIZED VIEW
- SHOW CREATE CATALOG
- SHOW ROUTINE LOAD TASK
- SHOW RESTORE
- CREATE ROUTINE LOAD
- STOP ROUTINE LOAD
- SHOW DATABASES
- BROKER LOAD
- SHOW ROUTINE LOAD
- PAUSE ROUTINE LOAD
- SHOW SNAPSHOT
- SHOW CREATE TABLE
- CANCEL LOAD
- REFRESH MATERIALIZED VIEW
- SHOW REPOSITORIES
- SHOW LOAD
- Administration
- DESCRIBE
- SQL Functions
- Function List
- String Functions
- CONCAT
- HEX
- LOWER
- SPLIT
- LPAD
- SUBSTRING
- PARSE_URL
- INSTR
- REPEAT
- LCASE
- REPLACE
- HEX_DECODE_BINARY
- RPAD
- SPLIT_PART
- STRCMP
- SPACE
- CHARACTER_LENGTH
- URL_ENCODE
- APPEND_TAILING_CHAR_IF_ABSENT
- LTRIM
- HEX_DECODE_STRING
- URL_DECODE
- LEFT
- STARTS_WITH
- CONCAT
- GROUP_CONCAT
- STR_TO_MAP
- STRLEFT
- STRRIGHT
- MONEY_FORMAT
- RIGHT
- SUBSTRING_INDEX
- UCASE
- TRIM
- FIND_IN_SET
- RTRIM
- ASCII
- UPPER
- REVERSE
- LENGTH
- UNHEX
- ENDS_WITH
- CHAR_LENGTH
- NULL_OR_EMPTY
- LOCATE
- CHAR
- Predicate Functions
- Map Functions
- Binary Functions
- Geospatial Functions
- Lambda Expression
- Utility Functions
- Bitmap Functions
- BITMAP_SUBSET_LIMIT
- TO_BITMAP
- BITMAP_AGG
- BITMAP_FROM_STRING
- BITMAP_OR
- BITMAP_REMOVE
- BITMAP_AND
- BITMAP_TO_BASE64
- BITMAP_MIN
- BITMAP_CONTAINS
- SUB_BITMAP
- BITMAP_UNION
- BITMAP_COUNT
- BITMAP_UNION_INT
- BITMAP_XOR
- BITMAP_UNION_COUNT
- BITMAP_HAS_ANY
- BITMAP_INTERSECT
- BITMAP_AND_NOT
- BITMAP_TO_STRING
- BITMAP_HASH
- INTERSECT_COUNT
- BITMAP_EMPTY
- BITMAP_MAX
- BASE64_TO_ARRAY
- BITMAP_TO_ARRAY
- Struct Functions
- Aggregate Functions
- RETENTION
- MI
- MULTI_DISTINCT_SUM
- WINDOW_FUNNEL
- STDDEV_SAMP
- GROUPING_ID
- HLL_HASH
- AVG
- HLL_UNION_AGG
- COUNT
- BITMAP
- HLL_EMPTY
- SUM
- MAX_BY
- PERCENTILE_CONT
- COVAR_POP
- PERCENTILE_APPROX
- HLL_RAW_AGG
- STDDEV
- CORR
- COVAR_SAMP
- MIN_BY
- MAX
- VAR_SAMP
- STD
- HLL_UNION
- APPROX_COUNT_DISTINCT
- MULTI_DISTINCT_COUNT
- VARIANCE
- ANY_VALUE
- COUNT_IF
- GROUPING
- PERCENTILE_DISC
- Array Functions
- ARRAY_CUM_SUM
- ARRAY_MAX
- ARRAY_LENGTH
- ARRAY_REMOVE
- UNNEST
- ARRAY_SLICE
- ALL_MATCH
- ARRAY_CONCAT
- ARRAY_SORT
- ARRAY_POSITION
- ARRAY_DIFFERENCE
- ARRAY_CONTAINS
- ARRAY_JOIN
- ARRAY_INTERSECT
- CARDINALITY
- ARRAY_CONTAINS_ALL
- ARRAYS_OVERLAP
- ARRAY_MIN
- ARRAY_MAP
- ELEMENT_AT
- ARRAY_APPEND
- ARRAY_SORTBY
- ARRAY_TO_BITMAP
- ARRAY_GENERATE
- ARRAY_AVG
- ARRAY_FILTER
- ANY_MATCH
- REVERSE
- ARRAY_AGG
- ARRAY_DISTINCT
- ARRAY_SUM
- Condition Functions
- Math Functions
- Date and Time Functions
- DAYNAME
- MINUTE
- FROM_UNIXTIME
- HOUR
- MONTHNAME
- MONTHS_ADD
- ADD_MONTHS
- DATE_SUB
- PREVIOUS_DAY
- TO_TERA_DATA
- MINUTES_SUB
- WEEKS_ADD
- HOURS_DIFF
- UNIX_TIMESTAMP
- DAY
- DATE_SLICE
- DATE
- CURTIME
- SECONDS_SUB
- MONTH
- WEEK
- TO_DATE
- TIMEDIFF
- MONTHS_DIFF
- STR_TO_JODATIME
- WEEK_ISO
- MICROSECONDS_SUB
- TIME_SLICE
- MAKEDATE
- DATE_TRUNC
- JODATIME
- DAYOFWEEK
- YEARS_SUB
- TIMESTAMP_ADD
- HOURS_SUB
- STR2DATE
- TIMESTAMP
- FROM_DAYS
- WEEK_OF_YEAR
- YEAR
- TIMESTAMP_DIFF
- TO_TERA_TIMESTAMP
- DAYOFMONTH
- DAYOFYEAR
- DATE_FORMAT
- MONTHS_SUB
- NEXT_DAY
- MINUTES_DIFF
- DATA_ADD
- MINUTES_ADD
- CURDATE
- DAY_OF_WEEK_ISO
- CURRENt_TIMESTAMP
- STR_TO_DATE
- LAST_DAY
- WEEKS_SUB
- TO_DAYS
- DATEDIFF
- NOW
- TO_ISO8601
- TIME_TO_SEC
- QUARTER
- SECONDS_DIFF
- UTC_TIMESTAMP
- DATA_DIFF
- SECONDS_ADD
- ADDDATE
- WEEKSDIFF
- CONVERT_TZ
- MICROSECONDS_ADD
- SECOND
- YEARS_DIFF
- YEARS_ADD
- HOURS_ADD
- DAYS_SUB
- DAYS_DIFF
- Cryptographic Functions
- Percentile Functions
- Bit Functions
- JSON Functions
- Hash Functions
- Scalar Functions
- Table Functions
Loading from Apache Kafka/Confluent Cloud
CelerData Cloud Serverless can continuously load data from Kafka service, such as self-managed Apache Kafka, Confluent Cloud, and Amazon MSK. This tutorial uses Confluent Cloud as an example to demonstrate how to load data from Kafka service into CelerData.
NOTICE
Loading data from other Kafka service, such as a self-managed Kafka cluster or Amazon MSK to CelerData may involve complex concepts and operations, such as VPC. For users who are not familiar with these concepts and operations, it is recommended to go through this tutorial first which already prepares a Confluent cluster public to users.
After reading this tutorial, you will be able to load data by using Routine Load.
This tutorial loads JSON formatted data. For loading data of other formats, see CREATE ROUTINE LOAD.
Interface: SQL
Estimated time: 20 minutes
Prerequisites
The tutorial assumes the following:
You have a CelerData cloud account and a user with the necessary privileges to create databases and tables. If you do not fulfill these prerequisites, follow the instructions in Sign up for CelerData Cloud Serverless.
To navigate you though this tutorial, CelerData already prepares a Confluent cluster with sample data. You just need to create a database and a table, before beginning the journey of data loading.
Sample data
The sample data from Confluent Cloud is generated based on the Confluent Cloud tutorial. Each message is a JSON object representing a row of data to be loaded. One sample message is as follows:
{
"registertime": 1493987388929,
"userid": "User_3",
"regionid": "Region_8",
"gender": "OTHER"
}
Create a database and a table
Create a database mydatabase
and a table myusers
in CelerData.
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
CREATE TABLE myusers (
userid VARCHAR(50),
regionid VARCHAR(50),
gender VARCHAR(50),
registertime BIGINT
)
PROPERTIES (
"replication_num" = "3"
);
Load data
Load data using Routine Load
Execute the CREATE ROUTINE LOAD command to create a Routine Load job tutorials_users_test1
. This job runs continuously, consuming messages from the topic users
of the Confluent cluster and loading data into the table myusers
. To ensure a smooth experience, CelerData has prepared the Confluent cluster. You can access the Confluent cluster by using the connection and authentication parameters as follows.
CREATE ROUTINE LOAD mydatabase.tutorials_users_test1 ON myusers
COLUMNS(userid, regionid, gender, registertime)
PROPERTIES (
'format' = 'json',
'jsonpaths' = '["$.userid","$.regionid","$.gender", "$.registertime"]'
)
FROM KAFKA (
'kafka_broker_list' = 'pkc-n98pk.us-west-2.aws.confluent.cloud:9092',
'kafka_topic' = 'users',
'property.kafka_default_offsets' = 'OFFSET_BEGINNING',
'property.security.protocol' = 'SASL_SSL',
'property.sasl.mechanism' = 'PLAIN',
'property.sasl.username' = 'HALNAFT2X4IKVWYY',
'property.sasl.password' = 'PF1L6aZ5kEXG5/q8heq5onDmLs6BeekC9X3pFWyuc+AZxw5DYUqBihJjXhKjFJw/'
);
Configurations:
'format' = 'json'
: the format of messages as JSON.jsonpaths
: the names of the fields that you want to load from JSON-formatted messages into the tablemyusers
. The names of fields need to be matched one-to-one with the values inCOLUMNS
.kafka_broker_list
: the Bootstrap server address of the Confluent cluster.kafka_topic
: the topic that contains JSON-formatted messages.'property.kafka_default_offsets' = 'OFFSET_BEGINNING'
: specifies that data is consumed from the beginning of each partition.property.security.protocol
andproperty.sasl.*
: the security and authentication information used by CelerData to connect the Confluent cluster.
Check the load status
Execute the SHOW ROUTINE LOAD command to check the status of that Routine Load job.
SHOW ROUTINE LOAD for tutorials_users_test1;
The returned result displays the job execution information:
+-------+-----------------------+---------------------+-----------+---------+------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------------+--------------+----------+
| Id | Name | CreateTime | PauseTime | EndTime | DbName | TableName | State | DataSourceType | CurrentTaskNum | JobProperties | DataSourceProperties | CustomProperties | Statistic | Progress | ReasonOfStateChanged | ErrorLogUrls | OtherMsg |
+-------+-----------------------+---------------------+-----------+---------+------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------------+--------------+----------+
| 32070 | tutorials_users_test1 | 2023-10-18 01:34:51 | NULL | NULL | mydatabase | myusers | RUNNING | KAFKA | 1 | {"partitions":"*","partial_update":"false","columnToColumnExpr":"userid,regionid,gender,registertime","maxBatchIntervalS":"10","partial_update_mode":"null","whereExpr":"*","dataFormat":"json","timezone":"America/Los_Angeles","format":"json","log_rejected_record_num":"0","taskTimeoutSecond":"60","json_root":"","maxFilterRatio":"1.0","strict_mode":"false","jsonpaths":"[\"$.userid\",\"$.regionid\",\"$.gender\", \"$.registertime\"]","taskConsumeSecond":"15","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"} | {"topic":"users","currentKafkaPartitions":"0","brokerList":"pkc-n98pk.us-west-2.aws.confluent.cloud:9092"} | {"security.protocol":"SASL_SSL","sasl.username":"HALNAFT2X4IKVWYY","sasl.mechanism":"PLAIN","kafka_default_offsets":"OFFSET_BEGINNING","group.id":"tutorials_users_test1_e6d9543c-fdca-47ac-a9af-30c779c84fb6","sasl.password":"******"} | {"receivedBytes":756781,"errorRows":0,"committedTaskNum":1,"loadedRows":8699,"loadRowsRate":14000,"abortedTaskNum":0,"totalRows":8699,"unselectedRows":0,"receivedBytesRate":1246000,"taskExecuteTimeMs":607} | {"0":"8698"} | | | |
+-------+-----------------------+---------------------+-----------+---------+------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------------+--------------+----------+
- The
Statistic
field shows the current loading information of that Routine Load job. For example, theloadedRows
key shows the number of loaded data rows. - The
Progress
field shows the current offset being consumed of each Kafka partition.
When the job is running successfully, you can query the data loaded into the table.
SELECT * FROM myusers LIMIT 3;
Clean up the environment
Congratulations! You have finished your journey with a data load on CelerData Cloud Serverless!
Use the following statement to drop mydatabase
you created for a test run.
DROP DATABASE IF EXISTS mydatabase;