Load streaming data from Confluent Cloud using Kafka connector

This topic introduces how to use a Kafka connector, starrocks-kafka-connector, to stream messages (events) from Confluent into CelerData. The Kafka connector guarantees at-least-once semantics.

The Kafka connector can seamlessly integrate with Kafka Connect, which allows CelerData better integrated with the Kafka ecosystem. It is a wise choice if you want to load real-time data into CelerData. Compared with Routine Load, the Kafka connector is recommended in the following scenarios:

  • Load data in various other formats such as Protobuf than Routine Load which supports only CSV, JSON, and Avro formats. As long as data can be converted into JSON and CSV formats using Kafka Connect's converters, data can be loaded into CelerData via the Kafka connector.
  • Customize data transformation, such as Debezium-formatted CDC data.
  • Load data from multiple Kafka topics.
  • Load data from Confluent Cloud.
  • Need finer control over load batch sizes, parallelism, and other parameters to achieve a balance between load speed and resource utilization.

Basic steps

The following example demonstrates how to load AVRO records from Confluent Cloud into CelerData.

  1. Use a source connector to generate data into a topic of the Confluent cluster. In the following example, the source connector is Datagen Source and the data format is AVRO.
  2. Create a CelerData table.
  3. Use a sink connector (a custom connector, starrocks-kafka-connector, needs to be used) to load data from the topic of the Confluent cluster into the CelerData table.

Generate data into a topic of a Confluent cluster

  1. Choose the Confluent cluster, enter its Connectors page, and click + Add Connector. Then choose Datagen Source as the source connector.

    img1

  2. Configure the Datagen Source connector.

    1. In the Topic selection section:

      Click + Add new topic and specify the name and the partition number of the topic. In this example, the name of the topic is specified as datagen_topic.

      img2

    2. In the Kafka credentials section:

      This example is a simple guide to quickly familiarize you with the loading process, so you can choose Global access and then click Generate API key & download.

      img3

    3. In the Configuration section:

      Select the output record value format and template. In this example, specify the format as AVRO and the template as orders.

      img4

    4. In the Sizing section:

      Use the default configurations.

    5. In the Review and launch section:

      Check the configurations for the Datagen source connector, and click Continue once you validate all the configurations.

      img5

  3. On the Connectors page, check the Datagen source connector you have added.

    img6

    After the Datagen source connector is running, you can verify that messages are populating the topic datagen_topic.

Create a table in CelerData

Create a table in the CelerData cluster according to the schema of Avro-formatted records in the topic datagen_topic.

CREATE TABLE test123 (
    ordertime      LARGEINT,
    orderid        int,
    itemid         string,
    orderunits     string,
    city           string,
    state          string,
    zipcode        bigint
)
DISTRIBUTED BY HASH(orderid);

Load data into the CelerData table

In this example, the sink connector, which is a custom connector (Kafka connector named starrocks-kafka-connector), is used to load Avro-formatted records from the Confluent topic into the CelerData table.

Note

The Kafka connector named starrocks-kafka-connector is a custom connector, and custom connectors can only be created in AWS regions supported by Confluent Cloud. For more information about the abilities and limitations of custom connectors, see Confluent Documentation.

  1. Enter the Connectors page of the Confluent cluster, click + Add Connector and click Add plugin.

  2. Upload the archive of Kafka connector.

    1. Connector plugin details:

      • Connector plugin name: Enter the name of the Kafka connector, such as starrocks-kafka-connector.
      • Custom plugin description: Enter a description for the Kafka connector.
      • Connector class: Enter the Java class for the Kafka connector, which is com.starrocks.connector.kafka.StarRocksSinkConnector.
    2. Connector type: Select the connector type as Sink.

    3. Connector archive: Click Select connector archive and upload the ZIP file of the Kafka connector. starrocks-kafka-connector.

      You can download the TAR file of the Kafka connector from Github and extract the TAR file. You need to compress all the files into a ZIP file and then upload the ZIP file.

      img7

  3. Configure and launch the Kafka connector.

    1. Enter the Connectors page of the Confluent cluster, click + Add Connector and select the StarRocks-kafka-connector.

      img8

    2. In the Kafka credentials section:

      This example is a simple guide to quickly familiarize you with the loading process, so you can choose Global access and click Generate API key & download.

    3. In the Configuration section:

      Add the configurations in key-value pair or JSON format. This example adds configurations in the JSON format.

      img9

      The complete JSON-formatted configurations:

      {
        "topics": "datagen_topic",
        "confluent.custom.schema.registry.auto": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "connect.timeoutms": "6000",
        "starrocks.http.url": "https://ingest-xxxxxx.celerdata.com:443",
        "starrocks.username": "xxxxxx",
        "starrocks.password": "xxxxxx",
        "starrocks.database.name": "test",
        "starrocks.topic2table.map": "datagen_topic:test123",
        "sink.properties.strip_outer_array": "true",
        "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]"
      }

      Configurations:

      For the supported configurations and descriptions, see Parameters. Take special notice of the following configuration:

      • starrocks.http.url: Enter the HTTP URL of your CelerData cloud in the format of https://<endpoint>:443. The endpoint can be found on the Streaming API page of the Integration section from the left-side navigation tree in the CelerData Cloud Serverless console.

        NOTICE

        The Streaming API page is hidden by default. If you need access to the Streaming API page, contact the support team.

        img10

      • starrocks.username: The username you use to log in to CelerData, in the format of <account_id>.<username>. The user needs the INSERT privilege on the CelerData table.

      • starrocks.password: The password you use to log in to CelerData.

      • Other configurations:

        Some configurations need to be added according to the source data format. In this example, the source data is Avro-formatted records and Confluent Cloud Schema Registry is used, so the required configurations also need to include confluent.custom.schema.registry.auto and value.converter. For more information, see Confluent Documentation.

        When the source data is JSON-formatted records without Schema Registry, the required configurations also need to include:

        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",

        The complete configurations can be as follows:

            {
              "topics": "datagen_topic",
              "key.converter": "org.apache.kafka.connect.json.JsonConverter",
              "key.converter.schemas.enable": "false",
              "value.converter": "org.apache.kafka.connect.json.JsonConverter",
              "value.converter.schemas.enable": "false",
              "connect.timeoutms": "6000",
              "starrocks.http.url": "https://ingest-xxxxxx.celerdata.com:443",
              "starrocks.username": "xxxxxx",
              "starrocks.password": "xxxxxx",
              "starrocks.database.name": "test",
              "starrocks.topic2table.map": "datagen_topic:test123",
              "sink.properties.strip_outer_array": "true",
              "sink.properties.jsonpaths": "[\"$.ordertime\",\"$.orderid\",\"$.itemid\",\"$.orderunits\",\"$.address.city\",\"$.address.state\",\"$.address.zipcode\"]"
            }
            ```
    4. In the Networking section:

      Enter the connection endpoint whose format is <endpoint>:443:TCP. The endpoint is same as the one in starrocks.http.url whose format is https://<endpoint>:443.

      img11

    5. In the Sizing section:

      Use the default settings.

    6. In the Review and launch section:

      Check the configurations for the Kafka connector, and click Continue once you validate all the configurations.

  4. On the Connectors page, check the Kafka connector you have launched.

    img12

    Once the Kafka connector completes provisioning, the status changes to running.

  5. After both connectors are running, you can check the data in the CelerData table.

    img13

Parameters

name

  • Required: YES
  • Default value:
  • Description: Name for this Kafka connector. It must be globally unique among all Kafka connectors within this Kafka Connect cluster. For example, starrocks-kafka-connector.

connector.class

  • Required: YES
  • Default value: com.starrocks.connector.kafka.SinkConnector
  • Description: Class used by this Kafka connector's sink.

topics

  • Required: YES
  • Default value:
  • Description: One or more topics to subscribe to, where each topic corresponds to a CelerData table. By default, CelerData assumes that the topic name matches the name of the CelerData table. So CelerData determines the target CelerData table by using the topic name. Please choose to fill in either topics or topics.regex (below), but not both. However, if the CelerData table name is not the same as the topic name, then use the optional starrocks.topic2table.map parameter (below) to specify the mapping from topic name to table name.

topics.regex

  • Required:
  • Default value:
  • Description: Regular expression to match the one or more topics to subscribe to. For more description, see topics. Choose to fill in either topics.regexor topics (above), but not both.

starrocks.topic2table.map

  • Required: NO
  • Default value:
  • Description: Mapping of the CelerData table name and the topic name when the topic name is different from the CelerData table name. The format is <topic-1>:<table-1>,<topic-2>:<table-2>,....

starrocks.http.url

  • Required: YES
  • Default value:
  • Description: HTTP URL of your CelerData cloud in the format of https://<endpoint>:443. The endpoint can be found on the Streaming API page of the Integration section from the left-side navigation tree in the CelerData Cloud Serverless console.

starrocks.database.name

  • Required: YES
  • Default value:
  • Description: Name of CelerData database.

starrocks.username

  • Required: YES
  • Default value:
  • Description: Username you use to log in to CelerData in the format of <account_id>.<username>. The user needs the INSERT privilege on the CelerData table.

starrocks.password

  • Required: YES
  • Default value:
  • Description: Password you use to log in to CelerData.

key.converter

  • Required: NO
  • Default value: Key converter used by Kafka Connect cluster
  • Description: Key converter for the sink connector (kafka-connector-starrocks), which is used to deserialize the keys of Kafka data. The default key converter is the one used by the Kafka Connect cluster.

value.converter

  • Required: NO
  • Default value: Value converter used by Kafka Connect cluster
  • Description: Value converter for the sink connector (kafka-connector-starrocks), which is used to deserialize the values of Kafka data. The default value converter is the one used by the Kafka Connect cluster.

key.converter.schema.registry.url

  • Required: NO
  • Default value:
  • Description: Schema registry URL for the key converter.

value.converter.schema.registry.url

  • Required: NO
  • Default value:
  • Description: Schema registry URL for the value converter.

tasks.max

  • Required: NO
  • Default value: 1
  • Description: rMaximum number of task threads that the Kafka connector can create, which is usually the same as the number of CPU cores on the worker nodes in the Kafka Connect cluster. You can tune this parameter to control load performance.

bufferflush.maxbytes

  • Required: NO
  • Default value: 94371840(90M)
  • Description: Maximum size of data that can be accumulated in memory before being sent to CelerData at a time. The maximum size ranges from 64 MB to 10 GB. Keep in mind that the Stream Load SDK buffer may create multiple Stream Load jobs to buffer data. Therefore, the threshold mentioned here refers to the total data size.

bufferflush.intervalms

  • Required: NO
  • Default value: 300000
  • Description: Interval for sending a batch of data which controls the load latency. Range: [1000, 3600000].

connect.timeoutms

  • Required: NO
  • Default value: 1000
  • Description: Timeout for connecting to the HTTP URL. Range: [100, 60000].

sink.properties.*

  • Required:
  • Default value:
  • Description: Stream Load parameters to control load behavior. For example, the parameter sink.properties.format specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see [STREAM LOAD](../sql-reference/sql-statements/data-manipulation/STREAM LOAD.md).

sink.properties.format

  • Required: NO
  • Default value: json
  • Description: Format used for Stream Load. The Kafka connector will transform each batch of data to the format before sending them to CelerData. Valid values: csv and json. For more information, see CSV parameters and JSON parameters.

Limits

  • It is not supported to flatten a single message from a Kafka topic into multiple data rows and load into CelerData.
  • The Kafka connector's sink guarantees at-least-once semantics.