メインコンテンツまでスキップ

Colocate Join

Shuffle Join と Broadcast Join では、ジョイン条件が満たされると、2 つのジョインテーブルのデータ行が単一ノードにマージされてジョインが完了します。これら 2 つのジョイン方法のいずれも、ノード間のデータネットワーク伝送によって引き起こされる遅延やオーバーヘッドを回避することはできません。

コアアイデアは、同じ Colocation Group 内のテーブルに対してバケッティングキー、コピー数、およびコピー配置を一貫して保つことです。ジョイン列がバケッティングキーである場合、計算ノードは他のノードからデータを取得せずにローカルジョインを行うだけで済みます。

このドキュメントでは、Colocate Join の原理、実装、使用法、および考慮事項を紹介します。

用語

  • Colocation Group (CG): CG は 1 つ以上のテーブルを含みます。CG 内のテーブルは同じバケッティングとレプリカ配置を持ち、Colocation Group Schema を使用して説明されます。
  • Colocation Group Schema (CGS): CGS には CG のバケッティングキー、バケット数、およびレプリカ数が含まれます。

原理

Colocate Join は、同じ CGS を持つ一連のテーブルで CG を形成し、これらのテーブルの対応するバケットコピーが同じ BE ノードセットに配置されることを保証します。CG 内のテーブルがバケッティング列でジョイン操作を行うとき、ローカルデータは直接ジョインされ、ノード間でデータを転送する時間を節約できます。

バケットシーケンスは hash(key) mod buckets によって取得されます。例えば、テーブルが 8 バケットを持つ場合、[0, 1, 2, 3, 4, 5, 6, 7] の 8 バケットがあり、各バケットには 1 つ以上のサブテーブルがあります。サブテーブルの数はパーティションの数に依存します。マルチパーティションテーブルの場合、複数のタブレットが存在します。

同じデータ分布を持つために、同じ CG 内のテーブルは次の条件を満たす必要があります。

  1. 同じ CG 内のテーブルは、同一のバケッティングキー(タイプ、数、順序)と同じ数のバケットを持たなければなりません。これにより、複数のテーブルのデータスライスが一対一で分配および制御されます。バケッティングキーは、テーブル作成文 DISTRIBUTED BY HASH(col1, col2, ...) で指定された列です。バケッティングキーは、データのどの列が異なるバケットシーケンスにハッシュされるかを決定します。同じ CG 内のテーブルでバケッティングキーの名前は異なる場合があります。作成文でバケッティング列は異なる場合がありますが、DISTRIBUTED BY HASH(col1, col2, ...) の対応するデータ型の順序は完全に同じである必要があります。
  2. 同じ CG 内のテーブルは、同じ数のパーティションコピーを持たなければなりません。そうでない場合、タブレットコピーが同じ BE のパーティションに対応するコピーを持たないことがあるかもしれません。
  3. 同じ CG 内のテーブルは、異なる数のパーティションと異なるパーティションキーを持つことができます。

テーブルを作成するとき、CG はテーブルプロパティで属性 "colocate_with" = "group_name" によって指定されます。CG が存在しない場合、それは CG の最初のテーブルであり、親テーブルと呼ばれます。親テーブルのデータ分布(スプリットバケットキーのタイプ、数、順序、コピー数、スプリットバケット数)が CGS を決定します。CG が存在する場合、テーブルのデータ分布が CGS と一致しているかどうかを確認します。

同じ CG 内のテーブルのコピー配置は次の条件を満たします。

  1. すべてのテーブルのバケットシーケンスと BE ノードの間のマッピングは、親テーブルと同じです。
  2. 親テーブル内のすべてのパーティションのバケットシーケンスと BE ノードの間のマッピングは、最初のパーティションと同じです。
  3. 親テーブルの最初のパーティションのバケットシーケンスと BE ノードの間のマッピングは、ネイティブのラウンドロビンアルゴリズムを使用して決定されます。

一貫したデータ分布とマッピングは、バケッティングキーによって取得された同じ値を持つデータ行が同じ BE に落ちることを保証します。したがって、バケッティングキーを使用して列をジョインする場合、ローカルジョインのみが必要です。

使用法

テーブル作成

テーブルを作成するとき、PROPERTIES で属性 "colocate_with" = "group_name" を指定して、テーブルが Colocate Join テーブルであり、指定された Colocation Group に属することを示すことができます。

例えば:

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);

指定されたグループが存在しない場合、CelerData は現在のテーブルのみを含むグループを自動的に作成します。グループが存在する場合、CelerData は現在のテーブルが Colocation Group Schema を満たしているかどうかを確認します。満たしている場合、テーブルを作成してグループに追加します。同時に、テーブルは既存のグループのデータ分布ルールに基づいてパーティションとタブレットを作成します。

グループはデータベースに属し、グループの名前はデータベース内で一意です。グループのフルネームは内部ストレージでは dbId_groupName ですが、ユーザーは groupName のみを必要とします。

削除

完全な削除は、リサイクルビンからの削除です。通常、DROP TABLE コマンドでテーブルが削除されると、デフォルトで 1 日間リサイクルビンに残り、その後削除されます。グループ内の最後のテーブルが完全に削除されると、グループも自動的に削除されます。

グループ情報の表示

次のコマンドを使用して、クラスター内に既に存在するグループ情報を表示できます。

SHOW PROC '/colocation_group';

+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
  • GroupId: クラスター全体で一意のグループ識別子で、前半がデータベース ID、後半がグループ ID です。
  • GroupName: グループのフルネーム。
  • TabletIds: グループ内のテーブルの ID リスト。
  • BucketsNum: バケットの数。
  • ReplicationNum: レプリカの数。
  • DistCols: 分布列、つまりバケッティング列のタイプ。
  • IsStable: グループが安定しているかどうか(安定性の定義については、Colocation Replica Balancing and Repair のセクションを参照)。

次のコマンドを使用して、グループのデータ分布をさらに表示できます。

SHOW PROC '/colocation_group/10005.10008';

+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004, 10002, 10001 |
| 1 | 10003, 10002, 10004 |
| 2 | 10002, 10004, 10001 |
| 3 | 10003, 10002, 10004 |
| 4 | 10002, 10004, 10003 |
| 5 | 10003, 10002, 10001 |
| 6 | 10003, 10004, 10001 |
| 7 | 10003, 10004, 10002 |
+-------------+---------------------+
  • BucketIndex: バケットシーケンスの添字。
  • BackendIds: バケッティングデータスライスが配置されている BE ノードの ID。

注: 上記のコマンドは AMDIN 権限を必要とします。通常のユーザーはアクセスできません。

テーブルグループプロパティの変更

テーブルの Colocation Group プロパティを変更できます。例えば:

ALTER TABLE tbl SET ("colocate_with" = "group2");

テーブルが以前にグループに割り当てられていない場合、コマンドはスキーマを確認し、テーブルをグループに追加します(グループが存在しない場合は最初に作成されます)。テーブルが以前に別のグループに割り当てられていた場合、コマンドはテーブルを元のグループから削除し、新しいグループに追加します(グループが存在しない場合は最初に作成されます)。

次のコマンドを使用して、テーブルの Colocation プロパティを削除することもできます。

ALTER TABLE tbl SET ("colocate_with" = "");

その他の関連操作

Colocation 属性を持つテーブルに ADD PARTITION を使用してパーティションを追加したり、コピー数を変更したりする場合、CelerData はその操作が Colocation Group Schema に違反するかどうかを確認し、違反する場合は拒否します。

Colocation レプリカのバランスと修復

Colocation テーブルのレプリカ分布は、グループスキーマで指定された分布ルールに従う必要があるため、通常のシャーディングとはレプリカの修復とバランスの点で異なります。

グループ自体には stable プロパティがあります。stabletrue の場合、グループ内のテーブルスライスに変更が加えられておらず、Colocation 機能が正常に動作していることを意味します。stablefalse の場合、現在のグループ内のいくつかのテーブルスライスが修復または移行されており、影響を受けたテーブルの Colocate Join は通常のジョインに劣化します。

レプリカ修復

レプリカは指定された BE ノードにのみ保存できます。CelerData は、使用できない BE(例: ダウン、退役)を置き換えるために最も負荷の少ない BE を探します。置き換え後、古い BE 上のすべてのバケッティングデータスライスが修復されます。移行中、グループは Unstable とマークされます。

レプリカのバランス

CelerData は、Colocation テーブルスライスをすべての BE ノードに均等に分配しようとします。通常のテーブルのバランスはレプリカレベルで行われ、各レプリカが個別に負荷の低い BE ノードを見つけます。Colocation テーブルのバランスはバケットレベルで行われ、バケット内のすべてのレプリカが一緒に移行されます。BucketsSequence をすべての BE ノードに均等に分配する単純なバランスアルゴリズムを使用し、レプリカの実際のサイズではなくレプリカの数のみを考慮します。正確なアルゴリズムは ColocateTableBalancer.java のコードコメントに記載されています。

注 1: 現在の Colocation レプリカのバランスと修復アルゴリズムは、異種展開を持つ CelerData クラスターではうまく機能しない可能性があります。いわゆる異種展開とは、BE ノードのディスク容量、ディスクの数、およびディスクタイプ(SSD と HDD)が一貫していないことを意味します。異種展開の場合、小容量の BE ノードが大容量の BE ノードと同じ数のレプリカを保存することがあるかもしれません。

注 2: グループが Unstable 状態にあるとき、そのテーブルのジョインは通常のジョインに劣化し、クラスターのクエリパフォーマンスが大幅に低下する可能性があります。システムが自動的にバランスを取らないようにしたい場合は、FE 構成 disable_colocate_balance を設定して自動バランスを無効にし、適切な時期に再度有効にします。(詳細は Advanced Operations (#Advanced Operations) セクションを参照)

クエリ

Colocation テーブルは通常のテーブルと同じ方法でクエリされます。Colocation テーブルがあるグループが Unstable 状態にある場合、それは自動的に通常のジョインに劣化します。以下の例で示されるように。

テーブル 1:

CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);

テーブル 2:

CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);

クエリプランの表示:

DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);

+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN |
| | hash predicates: |
| | colocate: true |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl2 |
| | PREAGGREGATION: OFF. Reason: null |
| | partitions=0/1 |
| | rollup: null |
| | buckets=0/0 |
| | cardinality=-1 |
| | avgRowSize=0.0 |
| | numNodes=0 |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
+----------------------------------------------------+

Colocate Join が有効になると、Hash Join ノードに colocate: true が表示されます。

有効でない場合、クエリプランは次のようになります:

+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: group is not stable |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----3:EXCHANGE |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/1 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 1 |
+----------------------------------------------------+

HASH JOIN ノードは対応する理由を示します: colocate: false, reason: group is not stable。同時に EXCHANGE ノードが生成されます。

高度な操作

FE 構成項目

  • disable_colocate_relocate

CelerData の自動 Colocation レプリカ修復を無効にするかどうか。デフォルトは false で、オンになっています。このパラメータは、Colocation テーブルのレプリカ修復にのみ影響し、通常のテーブルには影響しません。

  • disable_colocate_balance

CelerData の自動 Colocation レプリカバランスを無効にするかどうか。デフォルトは false で、オンになっています。このパラメータは、Colocation テーブルのレプリカバランスにのみ影響し、通常のテーブルには影響しません。

  • disable_colocate_join

    セッション単位で Colocate join を無効にすることができます。

  • disable_colocate_join

    Colocate join 機能を無効にすることができます。

HTTP Restful API

CelerData は、Colocate Join に関連する Colocation Group の表示および変更のためのいくつかの HTTP Restful API を提供しています。

この API は FE 上に実装されており、fe_host:fe_http_port を使用して ADMIN 権限でアクセスできます。

  1. クラスターのすべての Colocation 情報を表示

    GET /api/colocate

    // Json 形式で内部の Colocation 情報を返します。
    {
    "colocate_meta": {
    "groupName2Id": {
    "g1": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Tables": {},
    "table2Group": {
    "10007": {
    "dbId": 10005,
    "grpId": 10008
    },
    "10040": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Schema": {
    "10005.10008": {
    "groupId": {
    "dbId": 10005,
    "grpId": 10008
    },
    "distributionColTypes": [{
    "type": "INT",
    "len": -1,
    "isAssignedStrLenInColDefinition": false,
    "precision": 0,
    "scale": 0
    }],
    "bucketsNum": 10,
    "replicationNum": 2
    }
    },
    "group2BackendsPerBucketSeq": {
    "10005.10008": [
    [10004, 10002],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10003, 10004],
    [10003, 10004],
    [10003, 10004],
    [10002, 10004]
    ]
    },
    "unstableGroups": []
    },
    "status": "OK"
    }
  2. グループを Stable または Unstable にマーク

    • Stable にマーク POST /api/colocate/group_stable?db_id=10005&group_id=10008

      Return:200

    • Unstable にマーク

      DELETE /api/colocate/group_stable?db_id=10005&group_id=10008

      Return:200

  3. グループのデータ分布を設定

    このインターフェースを使用して、グループの数の分布を強制的に設定できます。

    POST /api/colocate/bucketseq?db_id=10005&group_id= 10008

    Body:

    [[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]

    returns: 200

    BodyBucketsSequence をネストされた配列として表し、バケッティングスライスが配置されている BE の ID です。

    このコマンドを使用するには、FE 構成 disable_colocate_relocate および disable_colocate_balance を true に設定し、システムが自動的に Colocation レプリカの修復とバランスを行わないようにする必要があります。そうしないと、変更後にシステムによって自動的にリセットされる可能性があります。