- Release Notes
- Get Started
- Clusters
- Cloud Settings
- Table Type
- Query Data Lakes
- Integration
- Query Acceleration
- Data Loading
- Concepts
- Batch load data from Amazon S3
- Batch load data from Azure cloud storage
- Load data from a local file system
- Load data from Confluent Cloud
- Load data from Amazon MSK
- Load data from Amazon Kinesis
- Data Unloading
- Data Backup
- Security
- Console Access Control
- Data Access Control
- Application keys
- Service accounts
- Use SSL connection
- Alarm
- Usage and Billing
- Organizations and Accounts
- Reference
- Amazon Web Services (AWS)
- Microsoft Azure
- SQL Reference
- Keywords
- ALL statements
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- SHOW FILE
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- SELECT
- SHOW ALTER
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- SQL Functions
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayofweek_iso
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- jodatime_format
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str_to_jodatime
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- to_iso8601
- to_tera_date
- to_tera_timestamp
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- count_if
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Geographic Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- substring_index
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Bit Functions
- Bitmap Functions
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Map Functions
- Binary Functions
- cast function
- hash function
- Cryptographic Functions
- Math Functions
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- AUTO_INCREMENT
- Generated columns
- System variables
- System limits
- Information Schema
- Overview
- be_bvars
- be_cloud_native_compactions
- be_compactions
- character_sets
- collations
- column_privileges
- columns
- engines
- events
- global_variables
- key_column_usage
- load_tracking_logs
- loads
- materialized_views
- partitions
- pipe_files
- pipes
- referential_constraints
- routines
- schema_privileges
- schemata
- session_variables
- statistics
- table_constraints
- table_privileges
- tables
- tables_config
- task_runs
- tasks
- triggers
- user_privileges
- views
- System Metadatabase
- API
- Overview
- Actions
- Clusters
- Create and Manage Clusters
- Query Clusters
- Identity and Access Management
- Organization and Account
- Usage and Billing
- Clusters
- Terraform Provider
- Run scripts
Colocate Join
For shuffle join and broadcast join, if the join condition is met, the data rows of the two joining tables are merged into a single node to complete the join. Neither of these two join methods can avoid latency or overhead caused by data network transmission between nodes.
The core idea is to keep bucketing key, number of copies, and copy placement consistent for tables in the same Colocation Group. If the join column is a bucketing key, the computing node only needs to do local join without getting data from other nodes.
This document introduces the principle, implementation, usage, and considerations of Colocate Join.
Terminology
- Colocation Group (CG): A CG will contain one or more Tables. The Tables within a CG have the same bucketing and replica placement, and are described using the Colocation Group Schema.
- Colocation Group Schema (CGS): A CGS contains the bucketing key, number of buckets, and number of replicas of a CG.
Principle
Colocate Join is to form a CG with a set of Tables having the same CGS, and ensure that the corresponding bucket copies of these Tables will fall on the same set of BE nodes. When the tables in the CG perform Join operations on the bucketed columns, the local data can be joined directly, saving time from transferring data between nodes.
Bucket Seq is obtained by hash(key) mod buckets
. Suppose a Table has 8 buckets, then there are [0, 1, 2, 3, 4, 5, 6, 7] 8 buckets, and each Bucket has one or more sub-tables, the number of sub-tables depends on the number of partitions. If it is a multi-partitioned table, there will be multiple tablets.
In order to have the same data distribution, tables within the same CG must comply with the following.
- Tables within the same CG must have the identical bucketing key (type, number, order) and the same number of buckets so that the data slices of multiple tables can be distributed and controlled one by one. The bucketing key is the columns specified in the table creation statement
DISTRIBUTED BY HASH(col1, col2, ...)
. The bucketing key determines which columns of data are Hashed into different Bucket Seqs. The name of the bucketing key can vary for tables within the same CG.The bucketing columns can be different in the creation statement, but the order of the corresponding data types inDISTRIBUTED BY HASH(col1, col2, ...)
should be exactly the same . - Tables within the same CG must have the same number of partition copies. If not, it may happen that a tablet copy has no corresponding copy in the partition of the same BE.
- Tables within the same CG may have different numbers of partitions and different partition keys.
When creating a table, the CG is specified by the attribute "colocate_with" = "group_name"
in the table PROPERTIES. If the CG does not exist, it means the table is the first table of the CG and called Parent Table. The data distribution of the Parent Table (type, number and order of split bucket keys, number of copies and number of split buckets) determines the CGS. If the CG exists, check whether the data distribution of the table is consistent with the CGS.
The copy placement of tables within the same CG satisfies:
- The mapping between the Bucket Seq and BE nodes of all the Tables is the same as that of the Parent Table.
- The mapping between the Bucket Seq and BE nodes of all the Partitions in the Parent Table is the same as that of the first Partition.
- The mapping between the Bucket Seq and BE nodes of the first Partition of the Parent Table is determined using the native Round Robin algorithm.
The consistent data distribution and mapping guarantee that the data rows with the same value taken by bucketing key fall on the same BE. Therefore, when using the bucketing key to join columns, only local joins are required.
Usage
Table creation
When creating a table, you can specify the attribute "colocate_with" = "group_name"
in PROPERTIES to indicate that the table is a Colocate Join table and belongs to a specified Colocation Group.
For example:
CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);
If the specified Group does not exist, CelerData automatically creates a Group that only contains the current table. If the Group exists, CelerData checks to see if the current table meets the Colocation Group Schema. If so, it creates the table and adds it to the Group. At the same time, the table creates a partition and a tablet based on the data distribution rules of the existing Group.
The Group belongs to a Database, and the name of the Group is unique within the Database. The full name of the Group is dbId_groupName in the internal storage, but the user only needs the groupName.
Delete
A complete deletion is a deletion from the Recycle Bin. Normally, after a table is deleted with the DROP TABLE
command, by default it will stay in the recycle bin for a day before being deleted). When the last table in a Group is completely deleted, the Group will also be deleted automatically.
View group information
The following command allows you to view the Group information that already exists in the cluster.
SHOW PROC '/colocation_group';
+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
- GroupId: The cluster-wide unique identifier of a Group, with the first half being the db id and the second half being the group id.
- GroupName: The full name of the Group.
- TabletIds: The list of ids of the Tables in the Group.
- BucketsNum: The number of buckets.
- ReplicationNum: The number of replicas.
- DistCols: Distribution columns, i.e. bucketing column type.
- IsStable: Whether the Group is stable (for the definition of stability, see the section of Colocation Replica Balancing and Repair).
You can further view the data distribution of a Group with the following command.
SHOW PROC '/colocation_group/10005.10008';
+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004, 10002, 10001 |
| 1 | 10003, 10002, 10004 |
| 2 | 10002, 10004, 10001 |
| 3 | 10003, 10002, 10004 |
| 4 | 10002, 10004, 10003 |
| 5 | 10003, 10002, 10001 |
| 6 | 10003, 10004, 10001 |
| 7 | 10003, 10004, 10002 |
+-------------+---------------------+
- BucketIndex: Subscript of the sequence of buckets.
- BackendIds: The ids of BE nodes where the bucketing data slices t are located.
Note: The above command requires AMDIN privilege. Regular users cannot access it.
Modifying Table Group Properties
You can modify the Colocation Group properties of a table. For example:
ALTER TABLE tbl SET ("colocate_with" = "group2");
If the table has not been assigned to a Group before, the command will check the Schema and add the table to the Group (the Group will be created first if it does not exist). If the table has been previously assigned to another Group, the command will remove the table from the original Group and add it to the new Group (the Group will be created first if it does not exist).
You can also remove the Colocation properties of a table with the following command.
ALTER TABLE tbl SET ("colocate_with" = "");
Other related operations
When adding a partition using ADD PARTITION
or modifying the number of copies to a table with the Colocation attribute, CelerData checks if the operation will violate the Colocation Group Schema and rejects it if it does.
Colocation replica balancing and repair
The replicas distribution of a Colocation table needs to follow the distribution rules specified in the Group schema, so it differs from normal sharding in terms of replica repair and balancing.
The Group itself has a stable
property. When stable
is true
, it means that no changes are being made to table slices in the Group and the Colocation feature is working properly. When stable
is false
, it means that some table slices in the current Group are being repaired or migrated, and the Colocate Join of the affected tables will degrade to a normal Join.
Replica repair
Replicas can only be stored on the specified BE node. CelerData will look for the least loaded BE to replace an unavailable BE (e.g.,down, decommission),. After the replacement, all the bucketing data slices on the old BE are repaired. During migration, the Group is marked as Unstable.
Replica balancing
CelerData tries to distribute the Colocation table slices evenly across all BE nodes. Balancing for normal tables is at the replica level, that is, each replica individually finds a lower-load BE node. Balancing for Colocation tables is at the Bucket level, that is, all replicas within a Bucket are migrated together. We use a simple balancing algorithm that distributes BucketsSequence
evenly across all BE nodes without considering the actual size of the replicas but only the number of replicas. The exact algorithm can be found in the code comments in ColocateTableBalancer.java
.
Note 1: The current Colocation replica balancing and repair algorithm may not work well for CelerData clusters with heterogeneous deployment. The so-called heterogeneous deployment means that the disk capacity, number of disk, and disk type (SSD and HDD) of BE nodes are not consistent. In the case of heterogeneous deployment, it may happen that a small-capacity BE node stores the same number of replicas as a large-capacity BE node.
Note 2: When a Group is in the Unstable state, the Join of its tables will degrade to a normal Join, which may significantly degrade the query performance of the cluster. If you do not want the system to be automatically balanced, set the FE configuration
disable_colocate_balance
to disable automatic balancing and enable it back at the appropriate time. (See the section Advanced Operations (#Advanced Operations) for details)
Query
The Colocation table is queried in the same way as a normal table. If the Group where the Colocation table is located is in Unstable state, it will automatically degrade to a normal Join, as illustrated by the following example.
Table 1:
CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);
Table 2:
CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);
View query plan:
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN |
| | hash predicates: |
| | colocate: true |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl2 |
| | PREAGGREGATION: OFF. Reason: null |
| | partitions=0/1 |
| | rollup: null |
| | buckets=0/0 |
| | cardinality=-1 |
| | avgRowSize=0.0 |
| | numNodes=0 |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
+----------------------------------------------------+
If a Colocate Join takes effect, the Hash Join node displays colocate: true
.
If it doesn’t take effect, the query plan is as follows:
+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: group is not stable |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----3:EXCHANGE |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/1 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 1 |
+----------------------------------------------------+
The HASH JOIN node will show the corresponding reason: colocate: false, reason: group is not stable
. An EXCHANGE node will be generated at the same time.
Advanced operations
FE configuration items
- disable_colocate_relocate
Whether to disable automatic Colocation replica repair for CelerData. The default is false, which means it is turned on. This parameter only affects replica repair for Colocation tables, not for normal tables.
- disable_colocate_balance
Whether to disable automatic Colocation replica balancing for CelerData. The default is false, which means it is turned on. This parameter only affects the replica balancing of Colocation tables, not for normal tables.
disable_colocate_join
You can disable Colocate join at session granularity by changing this variable.
disable_colocate_join
The Colocate join function can be disabled by changing this variable.
HTTP Restful API
CelerData provides several HTTP Restful APIs related to Colocate Join for viewing and modifying Colocation Groups.
This API is implemented on the FE and can be accessed using fe_host:fe_http_port
with ADMIN permissions.
View all Colocation information of a cluster
GET /api/colocate
// Returns the internal Colocation information in Json format. { "colocate_meta": { "groupName2Id": { "g1": { "dbId": 10005, "grpId": 10008 } }, "group2Tables": {}, "table2Group": { "10007": { "dbId": 10005, "grpId": 10008 }, "10040": { "dbId": 10005, "grpId": 10008 } }, "group2Schema": { "10005.10008": { "groupId": { "dbId": 10005, "grpId": 10008 }, "distributionColTypes": [{ "type": "INT", "len": -1, "isAssignedStrLenInColDefinition": false, "precision": 0, "scale": 0 }], "bucketsNum": 10, "replicationNum": 2 } }, "group2BackendsPerBucketSeq": { "10005.10008": [ [10004, 10002], [10003, 10002], [10002, 10004], [10003, 10002], [10002, 10004], [10003, 10002], [10003, 10004], [10003, 10004], [10003, 10004], [10002, 10004] ] }, "unstableGroups": [] }, "status": "OK" }
Mark the Group as Stable or Unstable
Mark as Stable
POST /api/colocate/group_stable?db_id=10005&group_id=10008
Return:200
Mark as Unstable
DELETE /api/colocate/group_stable?db_id=10005&group_id=10008
Return:200
Set the data distribution of a Group
This interface allows you to force the number distribution of a Group.
POST /api/colocate/bucketseq?db_id=10005&group_id= 10008
Body:
[[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]
returns: 200
Where
Body
isBucketsSequence
represented as a nested array and the ids of the BEs in which bucketing slices are located.Note that to use this command, you may need to set the FE configuration
disable_colocate_relocate
anddisable_colocate_balance
to true, that is, to disable the system to perform automatic Colocation replica repair and balancing. Otherwise, it may be automatically reset by the system after modification.