Skip to main content

Accelerate COUNT(DISTINCT) and Joins with AUTO INCREMENT and Global Dictionary

This topic describes how to accelerate COUNT(DISTINCT) calculation and Joins using AUTO INCREMENT columns and Global Dictionary.

Use Cases

  • Scenario one: Suppose you need to perform exact deduplication on massive data (such as retail or delivery orders). However, the column for deduplication is of STRING type, which may lead to suboptimal performance during counting. For instance, in the orders table, the order_uuid column representing order ID is of STRING type, typically 32 to 36 bytes in size, generated by UUID() or similar functions. In this case, COUNT(DISTINCT) on the STRING column order_uuid with a query like SELECT count(DISTINCT order_uuid) FROM orders WHERE create_date >= CURDATE(); might not deliver satisfactory performance. Using an INTEGER column for exact deduplication would significantly enhance performance.

  • Scenario two: Suppose you want to accelerate exact deduplication in multi-dimensional analysis using bitmap functions. The bitmap_count() function requires INTEGER input, but if the column to be deduplicated is of STRING type, you need to use the bitmap_hash() function. This may result in approximate, slightly lower deduplication counts and could also reduce query performance and increase storage requirements, as the INTEGER values generated by bitmap_hash() are more dispersed compared to sequentially assigned INTEGER values.

  • Scenario three: Suppose you need to query the number of orders with a short time span between placing and paying for the order, where the order placement and payment times might be stored in different tables maintained by different business teams. You might need to join these tables based on the order ID and then deduplicate the orders. For example:

    SELECT count(distinct order_uuid)
    FROM orders_t1 as t1 JOIN orders_t2 as t2
    ON t1.order_uuid = t2.order_uuid
    WHERE t2.payment_time - t1.create_time <= 3600
    AND create_date >= CURDATE();

    However, using a STRING type order_uuid column for joins is less efficient than using an INTEGER column.

Optimization Approach

To address the the issues in the above scenarios, the optimization approach involves loading order data into a target table and establishing a mapping between STRING and INTEGER values. Subsequent query analysis will be based on the INTEGER column. This approach can be divided into the following stages:

  1. Stage 1: Create a global dictionary and establish a mapping between STRING and INTEGER values. In this dictionary, the key column is of STRING type, and the value column is an AUTO INCREMENT INTEGER type. When data is loaded, the system automatically generates a unique ID for each STRING value, creating a mapping between STRING and INTEGER values.

  2. Stage 2: Load the mapping relationship between order data and the global dictionary into the target table.

  3. Stage 3: Use the INTEGER column from the target table for exact deduplication or joins during subsequent query analysis, which can significantly improve performance.

  4. Stage 4: For further performance optimization, you can use bitmap functions on the INTEGER column to accelerate exact deduplication.

Solution

Before v3.2.5, Stage 2 could be implemented through two methods:

  • Using an external table or an internal table as intermediate table to join with the dictionary table to obtain the corresponding dictionary ID before loading.
  • Using a Primary Key table for data loading, and then using an UPDATE statement with a JOIN operation to update the dictionary ID. However, this data loading process can be inconvenient and comes with many constraints.

From v3.2.5 onwards, StarRocks introduced the dict_mapping() function, allowing you to define the dictionary ID column in the target table as a generated column using the dict_mapping() expression. Subsequent data loading tasks are handled like regular data loading, without needing UPDATE statements with JOIN operations to write dictionary IDs. During data loading, the system automatically associates the original table with the dictionary table and inserts the corresponding dictionary ID, greatly simplifying the data loading process with a global dictionary table, regardless of table types and supporting various loading methods.

Business Scenario

The following example uses two example CSV files, batch1.csv and batch2.csv, each containing two columns: id and order_uuid.

  • batch1.csv

    1, a1
    2, a2
    3, a3
    11, a1
    11, a2
    12, a1
  • batch2.csv

    1, a2
    2, a2
    3, a2
    11, a2
    12, a101
    12, a102
    13, a102

Process

Stage one

Create a global dictionary table and load the order ID column values from the CSV files to establish a mapping between STRING and INTEGER values.

  1. Create a Primary Key table to serve as the global dictionary. Define the Primary Key, order_uuid (of STRING type), and the value column, order_id_int (of AUTO INCREMENT INTEGER type).

    info

    The dict_mapping function requires the global dictionary table to be a Primary Key table.

    CREATE TABLE dict (
    order_uuid STRING,
    order_id_int BIGINT AUTO_INCREMENT -- Automatically assign an ID to each order_uuid value.
    )
    PRIMARY KEY (order_uuid)
    DISTRIBUTED BY HASH (order_uuid)
    PROPERTIES("replicated_storage" = "true");
  2. Use Stream Load to batch load the order_uuid column from the two CSV files into the order_uuid column of the dictionary table dict. Make sure you have used the Partial Update in column mode.

    curl --location-trusted -u root: \
    -H "partial_update: true" \
    -H "format: CSV" -H "column_separator:," -H "columns: id, order_uuid" \
    -T batch1.csv \
    -XPUT http://<fe_host>:<fe_http_port>/api/example_db/dict/_stream_load

    curl --location-trusted -u root: \
    -H "partial_update: true" \
    -H "format: CSV" -H "column_separator:," -H "columns: id, order_uuid" \
    -T batch2.csv \
    -XPUT http://<fe_host>:<fe_http_port>/api/example_db/dict/_stream_load

NOTE

If new data is added to the data source before proceeding to the next stage, all new data must be loaded into the dictionary table to ensure the mapping exists.

Stage two

Create a target table that includes a dictionary ID column with the dict_mapping attribute. When order data is loaded into the target table, the system will automatically associate it with the dictionary table and insert the corresponding dictionary ID.

  1. Create a table dest_table that includes all columns from the CSV files. You also need to define an INTEGER column order_id_int (typically BIGINT) to map with the STRING type order_uuid column and has the dict_mapping column attribute. Future query analysis will be based on this order_id_int column.

    -- In the target table, define a BIGINT dict_mapping column `order_id_int` to map with the STRING-type column `order_uuid`.
    CREATE TABLE dest_table (
    id BIGINT,
    order_uuid STRING, -- This column records STRING-type Order ID.
    batch INT comment 'Used to distinguish different batch loading',
    order_id_int BIGINT AS dict_mapping('dict', order_uuid) -- Dictionary ID dict_mapping column corresponds to `order_uuid`.
    )
    DUPLICATE KEY (id, order_uuid)
    DISTRIBUTED BY HASH(id);
  2. Load data into the target table with Stream Load or any other available methods. Since the order_id_int column has the dict_mapping attribute, the system will automatically fetch the dictionary ID from dict during loading.

    curl --location-trusted -u root: \
    -H "format: CSV" -H "column_separator:," -H "columns: id, order_uuid, batch=1" \
    -T batch1.csv \
    -XPUT http://<fe_host>:<fe_http_port>/api/example_db/dest_table/_stream_load

    curl --location-trusted -u root: \
    -H "format: CSV" -H "column_separator:," -H "columns: id, order_uuid, batch=2" \
    -T batch2.csv \
    -XPUT http://<fe_host>:<fe_http_port>/api/example_db/dest_table/_stream_load

Stage three

During query analysis, you can perform exact deduplication or joins on the INTEGER column order_id_int, significantly enhancing performance compared to using the STRING column order_uuid.

-- Exact deduplication based on BIGINT-type order_id_int.
SELECT id, COUNT(DISTINCT order_id_int) FROM dest_table GROUP BY id ORDER BY id;
-- Exact deduplication based on STRING-type order_uuid .
SELECT id, COUNT(DISTINCT order_uuid) FROM dest_table GROUP BY id ORDER BY id;

You can also use bitmap functions to accelerate exact deduplication.

Use bitmap functions to accelerate exact deduplication

To further accelerate the calculation, you can insert the INTEGER column values of the dictionary table directly into a bitmap column after creating a global dictionary. Subsequently, you can use bitmap functions on this bitmap column for exact deduplication.

Method one

If you have built a global dictionary and already imported the order data into dest_table, follow these steps:

  1. Create an Aggregate table dest_table_bitmap with two columns: a BITMAP type order_id_bitmap column for aggregation using the bitmap_union() function, and an INTEGER type column id. This table does not include the original STRING column, otherwise, each bitmap would contain only one value, negating the acceleration benefit.

    CREATE TABLE dest_table_bitmap (
    id BIGINT,
    order_id_bitmap BITMAP BITMAP_UNION
    )
    AGGREGATE KEY (id)
    DISTRIBUTED BY HASH(id) BUCKETS 6;
  2. Insert data into dest_table_bitmap. Insert the data from the id column of dest_table into the id column, and insert the INTEGER column order_id_int data from the dictionary table dict (processed with the to_bitmap() function) into the order_id_bitmap column.

    INSERT INTO dest_table_bitmap (id, order_id_bitmap)
    SELECT id, to_bitmap(dict_mapping('dict', order_uuid))
    FROM dest_table
    WHERE dest_table.batch = 1; -- Indicates different batches.

    INSERT INTO dest_table_bitmap (id, order_id_bitmap)
    SELECT id, to_bitmap(dict_mapping('dict', order_uuid))
    FROM dest_table
    WHERE dest_table.batch = 2;
  3. Use the BITMAP_UNION_COUNT() function on the BITMAP column for exact deduplication.

    SELECT id, BITMAP_UNION_COUNT(order_id_bitmap) FROM dest_table_bitmap
    GROUP BY id ORDER BY id;

Method two

If you do not need to retain specific order data after creating a global dictionary, and you want to load data directly into the dest_table_bitmap table, follow these steps:

  1. Create an Aggregate table dest_table_bitmap with two columns: a BITMAP type order_id_bitmap column for aggregation using the bitmap_union() function, and an INTEGER type column id. This table does not include the original STRING column, otherwise, each bitmap would contain only one value, negating the acceleration benefit.

    CREATE TABLE dest_table_bitmap (
    id BIGINT,
    order_id_bitmap BITMAP BITMAP_UNION
    )
    AGGREGATE KEY (id)
    DISTRIBUTED BY HASH(id) BUCKETS 6;
  2. Insert data into the Aggregate table. Insert the data from the id column of the CSV files into the id column and the INTEGER column order_id_int data from the dictionary table dict (processed with the to_bitmap() function) into the order_id_bitmap column.

    curl --location-trusted -u root: \
    -H "format: CSV" -H "column_separator:," \
    -H "columns: id, order_uuid, order_id_bitmap=to_bitmap(dict_mapping('dict', order_uuid))" \
    -T batch1.csv \
    -XPUT http://<fe_host>:<fe_http_port>/api/example_db/dest_table_bitmap/_stream_load

    curl --location-trusted -u root: \
    -H "format: CSV" -H "column_separator:," \
    -H "columns: id, order_uuid, order_id_bitmap=to_bitmap(dict_mapping('dict', order_uuid))" \
    -T batch2.csv \
    -XPUT http:///<fe_host>:<fe_http_port>/api/example_db/dest_table_bitmap/_stream_load
  3. Use the BITMAP_UNION_COUNT() function on the BITMAP column for exact deduplication.

    SELECT id, BITMAP_UNION_COUNT(order_id_bitmap) FROM dest_table_bitmap
    GROUP BY id ORDER BY id;