Colocate Join
Shuffle Join と Broadcast Join では、ジョイン条件が満たされると、2 つのジョインテーブルのデータ行が単一ノードにマージされてジョインが完了します。これら 2 つのジョイン方法のいずれも、ノード間のデータネットワーク伝送によって引き起こされる遅延やオーバーヘッドを回避することはできません。
コアアイデアは、同じ Colocation Group 内のテーブルに対してバケッティングキー、コピー数、およびコピー配置を一貫して保つことです。ジョイン列がバケッティングキーである場合、計算ノードは他のノードからデータを取得せずにローカルジョインを行うだけで済みます。
このドキュメントでは、Colocate Join の原理、実装、使用法、および考慮事項を紹介します。
用語
- Colocation Group (CG): CG は 1 つ以上のテーブルを含みます。CG 内のテーブルは同じバケッティングとレプリカ配置を持ち、Colocation Group Schema を使用して説明されます。
- Colocation Group Schema (CGS): CGS には CG のバケッティングキー、バケット数、およびレプリカ数が含まれます。
原理
Colocate Join は、同じ CGS を持つ一連のテーブルで CG を形成し、これらのテーブルの対応するバケットコピーが同じ BE ノードセットに配置されることを保証します。CG 内のテーブルがバケッティング列でジョイン操作を行うとき、ローカルデータは直接ジョインされ、ノード間でデータを転送する時間を節約できます。
バケットシーケンスは hash(key) mod buckets
によって取得されます。例えば、テーブルが 8 バケットを持つ場合、[0, 1, 2, 3, 4, 5, 6, 7] の 8 バケットがあり、各バケットには 1 つ以上のサブテーブルがあります。サブテーブルの数はパーティションの数に依存します。マルチパーティションテーブルの場合、複数のタブレットが存在します。
同じデータ分布を持つために、同じ CG 内のテーブルは次の条件を満たす必要があります。
- 同じ CG 内のテーブルは、同一のバケッティングキー(タイプ、数、順序)と同じ数のバケットを持たなければなりません。これにより、複数のテーブルのデータスライスが一対一で分配および制御されます。バケッティングキーは、テーブル作成文
DISTRIBUTED BY HASH(col1, col2, ...)
で指定された列です。バケッティングキーは、データのど の列が異なるバケットシーケンスにハッシュされるかを決定します。同じ CG 内のテーブルでバケッティングキーの名前は異なる場合があります。作成文でバケッティング列は異なる場合がありますが、DISTRIBUTED BY HASH(col1, col2, ...)
の対応するデータ型の順序は完全に同じである必要があります。 - 同じ CG 内のテーブルは、同じ数のパーティションコピーを持たなければなりません。そうでない場合、タブレットコピーが同じ BE のパーティションに対応するコピーを持たないことがあるかもしれません。
- 同じ CG 内のテーブルは、異なる数のパーティションと異なるパーティションキーを持つことができます。
テーブルを作成するとき、CG はテーブルプロパティで属性 "colocate_with" = "group_name"
によって指定されます。CG が存在しない場合、それは CG の最初のテーブルであり、親テーブルと呼ばれます。親テーブルのデータ分布(スプリットバケットキーのタイプ、数、順序、コピー数、スプリットバケット数)が CGS を決定します。CG が存在する場合、テーブルのデータ分布が CGS と一致しているかどうかを確認します。
同じ CG 内のテーブルのコピー配置は次の条件を満たします。
- すべてのテーブルのバケットシーケンスと BE ノードの間のマッピングは、親テーブルと同じです。
- 親テーブル内のすべてのパーティションのバケットシーケンスと BE ノードの間のマッピングは、最初のパーティションと同じです。
- 親テーブルの最初のパーティションのバケットシーケンスと BE ノードの間のマッピングは、ネイティブのラウンドロビン アルゴリズムを使用して決定されます。
一貫したデータ分布とマッピングは、バケッティングキーによって取得された同じ値を持つデータ行が同じ BE に落ちることを保証します。したがって、バケッティングキーを使用して列をジョインする場合、ローカルジョインのみが必要です。
使用法
テーブル作成
テーブルを作成するとき、PROPERTIES で属性 "colocate_with" = "group_name"
を指定して、テーブルが Colocate Join テーブルであり、指定された Colocation Group に属することを示すことができます。
例えば:
CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);
指定されたグループが存在しない場合、CelerData は現在のテーブルのみを含むグループを自動的に作成します。グループが存在する場合、CelerData は現在のテーブルが Colocation Group Schema を満たしているかどうかを確認します。満たしている場合、テーブルを作成してグループに追加します。同時に、テーブルは既存のグループのデータ分布ルールに基づいてパーティションとタブレットを作成します。
グループはデータベースに属し、グループの名前はデータベース内で一意です。グループのフルネームは内部ストレージでは dbId_groupName ですが、ユーザーは groupName のみを必要とします。
削除
完全な削除は、リサイクルビンからの削除です。通常、DROP TABLE
コマンドでテーブルが削除されると、デフォルトで 1 日間リサイクルビンに残り、その後削除されます。グループ内の最後のテーブルが完全に削除されると、グループも自動的に削除されます。
グループ情報の表示
次のコマンドを使用して、クラスター内に既に存在するグループ情報を表示できます。
SHOW PROC '/colocation_group';
+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
- GroupId: クラスター全体で一意のグループ識別子で、前半がデータベース ID、後半がグループ ID です。
- GroupName: グループのフルネーム。
- TabletIds: グループ内のテーブルの ID リスト。
- BucketsNum: バケットの数。
- ReplicationNum: レプリカの数。
- DistCols: 分布列、つまりバケッティング列のタイプ。
- IsStable: グループが安定しているかどうか(安定性の定義については、Colocation Replica Balancing and Repair のセクションを参照)。
次のコマンドを使用して、グループのデータ分布をさらに表示できます。
SHOW PROC '/colocation_group/10005.10008';
+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004, 10002, 10001 |
| 1 | 10003, 10002, 10004 |
| 2 | 10002, 10004, 10001 |
| 3 | 10003, 10002, 10004 |
| 4 | 10002, 10004, 10003 |
| 5 | 10003, 10002, 10001 |
| 6 | 10003, 10004, 10001 |
| 7 | 10003, 10004, 10002 |
+-------------+---------------------+
- BucketIndex: バケットシーケンスの添字。
- BackendIds: バケッティングデータスライスが配置されている BE ノードの ID。
注: 上記のコマンドは AMDIN 権 限を必要とします。通常のユーザーはアクセスできません。
テーブルグループプロパティの変更
テーブルの Colocation Group プロパティを変更できます。例えば:
ALTER TABLE tbl SET ("colocate_with" = "group2");
テーブルが以前にグループに割り当てられていない場合、コマンドはスキーマを確認し、テーブルをグループに追加します(グループが存在しない場合は最初に作成されます)。テーブルが以前に別のグループに割り当てられていた場合、コマンドはテーブルを元のグループから削除し、新しいグループに追加します(グループが存在しない場合は最初に作成されます)。
次のコマンドを使用して、テーブルの Colocation プロパティを削除することもできます。
ALTER TABLE tbl SET ("colocate_with" = "");
その他の関連操作
Colocation 属性を持つテーブルに ADD PARTITION
を使用してパーティションを追加したり、コピー数を変更したりする場合、CelerData はその操作が Colocation Group Schema に違反するかどうかを確認し、違反する場合は拒否します。
Colocation レプリカのバランスと修復
Colocation テーブルのレプリカ分布は、グループスキーマで指定された分布ルールに従う必要があるため、通常のシャーディングとはレプリカの修復とバランスの点で異なります。
グループ自体には stable
プロパティがあります。stable
が true
の場合、グループ内のテーブルスライスに変更が加えられておらず、Colocation 機能が正常に動作していることを意味します。stable
が false
の場合、現在のグループ内のいくつかのテーブルスライスが修復または移行されており、影響を受けたテーブルの Colocate Join は通常のジョインに劣化します。
レプリカ修復
レプリカは指定された BE ノードにのみ保存できます。CelerData は、使用できない BE(例: ダウン、退役)を置き換えるために最も負荷の少ない BE を探します。置き換え後、古い BE 上のすべてのバケッティングデータスライスが修復されます。移行中、グループは Unstable とマークされます。
レプリカのバランス
CelerData は、Colocation テーブルスライスをすべての BE ノードに均等に分配しようとします。通常のテーブルのバランスはレプリカレベルで行われ、各レプリカが個別に負荷の低い BE ノードを見つけます。Colocation テーブルのバランスはバケットレベルで行われ、バケット内のすべてのレプリカが一緒に移行されます。BucketsSequence
をすべての BE ノードに均等に分配する単純なバランスアルゴリズムを使用し、レプリカの実際のサイズではなくレプリカの数のみを考慮しま す。正確なアルゴリズムは ColocateTableBalancer.java
のコードコメントに記載されています。
注 1: 現在の Colocation レプリカのバランスと修復アルゴリズムは、異種展開を持つ CelerData クラスターではうまく機能しない可能性があります。いわゆる異種展開とは、BE ノードのディスク容量、ディスクの数、およびディスクタイプ(SSD と HDD)が一貫していないことを意味します。異種展開の場合、小容量の BE ノードが大容量の BE ノードと同じ数のレプリカを保存することがあるかもしれません。
注 2: グループが Unstable 状態にあるとき、そのテーブルのジョインは通常のジョインに劣化し、クラスターのクエリパフォーマンスが大幅に低下する可能性があります。システムが自動的にバランスを取らないようにしたい場合は、FE 構成
disable_colocate_balance
を設定して自動バランスを無効にし、適切な時期に再度有効にします。(詳細は Advanced Operations (#Advanced Operations) セクションを参照)
クエリ
Colocation テーブルは通常のテーブルと同じ方法でクエリされます。Colocation テーブルがあるグループが Unstable 状態にある場合、それは自動的に通常のジョインに劣化します。以下の例で示されるように。
テーブル 1:
CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);
テーブル 2:
CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);
クエリプランの表示:
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN |
| | hash predicates: |
| | colocate: true |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl2 |
| | PREAGGREGATION: OFF. Reason: null |
| | partitions=0/1 |
| | rollup: null |
| | buckets=0/0 |
| | cardinality=-1 |
| | avgRowSize=0.0 |
| | numNodes=0 |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
+----------------------------------------------------+
Colocate Join が有効になると、Hash Join ノードに colocate: true
が表示されます。
有効でない場合、クエリプランは次のようになります:
+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: group is not stable |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----3:EXCHANGE |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/1 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 1 |
+----------------------------------------------------+