メインコンテンツまでスキップ

Arrow Flight を CelerData クラスタで有効化する

CelerData は、Apache Arrow Flight SQL プロトコルを介した接続をサポートしています。

Apache Arrow Flight プロトコルは、高性能なデータ転送のためのリモートプロシージャコール(RPC)フレームワークであり、通常は gRPC プロトコルを基盤として実装されます。Arrow Flight SQL は、SQL シナリオに対応する Arrow Flight の拡張です。Arrow のメモリフォーマットと Flight RPC を組み合わせることで、データベース間の効率的なデータ交換を実現します。これにより、ユーザーは SQL クエリを実行し、この高速なチャネルを通じてカラムナー形式のデータを取得できます。

Arrow Flight を有効化する

important
  • Arrow Flight は v4.0.7 以降でサポートされています。クラスタがそれ以前のバージョンで稼働している場合は、作業を進める前にアップグレードしてください。
  • Arrow Flight の有効化または無効化は、クラスタ(クラスタ内のすべてのウェアハウスを含む)が Running 状態のときのみ実行できます。

以下の手順に従って Arrow Flight を有効化します。

  1. CelerData Cloud BYOC コンソール にサインインします。
  2. Clusters ページで、Arrow Flight を有効化したいクラスタをクリックします。
  3. クラスタ詳細ページの Cluster parameters タブを開き、Cluster Configuration セクションの Arrow Flight フィールドのスイッチをオンにします。

Arrow Flight を使用してクラスタとやり取りする

以下の手順では、Arrow Flight SQL プロトコルを使用し、Python ADBC Driver を通じて CelerData クラスタに接続し操作する方法を説明します。完全なコード例については 付録 を参照してください。

注記

Python 3.9 以降が必要です。

ステップ 1. ライブラリをインストールする

pip を使用して、PyPI から adbc_driver_manageradbc_driver_flightsql をインストールします。

pip install adbc_driver_manager
pip install adbc_driver_flightsql

次のモジュールまたはライブラリをコードにインポートします。

  • 必須ライブラリ:
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
  • 使いやすさやデバッグを向上させるためのオプションモジュール:
import pandas as pd       # オプション: DataFrame を使用した結果表示の改善
import traceback # オプション: SQL 実行中の詳細なエラーのトレースバックを表示する場合
import time # オプション: SQL 実行時間の測定用

ステップ 2. クラスタに接続する

クライアント側で、次の情報を使用して Arrow Flight SQL クライアントを作成します。

  • CelerData クラスタのエンドポイント
  • ポート 443
  • 必要な権限を持つデータベースユーザーのユーザー名とパスワード

例:

FE_HOST = "xxxxxxxx.cloud-app.celerdata.org"
FE_PORT = 443

conn = flight_sql.connect(
uri=f"grpcs://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "johndoe",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "my_1st_password",
}
)
cursor = conn.cursor()

接続が確立されると、返された Cursor を使用して SQL 文を実行し、CelerData クラスタとやり取りできます。

付録

以下のコード例では、Arrow Flight を介して CelerData クラスタとやり取りする一般的な手順を説明します。

important

コード例を使用する際は、<cluster_endpoint> を実際のクラスタのエンドポイントに、<username><password> をクラスタにアクセスするデータベースユーザーのユーザー名とパスワードに置き換えてください。また、そのユーザーに必要な権限が付与されていることを確認してください。

# =============================================================================
# CelerData Arrow Flight SQL Test Script
# =============================================================================
# pip install adbc_driver_manager adbc_driver_flightsql pandas
# =============================================================================

# =============================================================================
# Required core modules for connecting to CelerData via Arrow Flight SQL
# =============================================================================
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql

# =============================================================================
# Optional modules for better usability and debugging
# =============================================================================
import pandas as pd # Optional: for better result display using DataFrame
import traceback # Optional: for detailed error traceback during SQL execution
import time # Optional: for measuring SQL execution time

# =============================================================================
# CelerData Flight SQL Configuration
# =============================================================================

FE_HOST = "<cluster_endpoint>"
FE_PORT = 443

# =============================================================================
# Connect to CelerData
# =============================================================================
conn = flight_sql.connect(
uri=f"grpcs://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "<username>",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "<password>",
}
)

cursor = conn.cursor()

# =============================================================================
# Utility functions for better output formatting and SQL execution
# =============================================================================

def print_header(title: str):
"""
Print a section header for better readability.
"""
print("\n" + "=" * 80)
print(f"🟢 {title}")
print("=" * 80)


def print_sql(sql: str):
"""
Print the SQL statement before execution.
"""
print(f"\n🟡 SQL:\n{sql.strip()}")


def print_result(df: pd.DataFrame):
"""
Print the result DataFrame in a readable format.
"""
if df.empty:
print("\n🟢 Result: (no rows returned)\n")
else:
print("\n🟢 Result:\n")
print(df.to_string(index=False))


def print_error(e: Exception):
"""
Print the error traceback if SQL execution fails.
"""
print("\n🔴 Error occurred:")
traceback.print_exc()


def execute(sql: str):
"""
Execute a SQL statement and print the result and execution time.
"""
print_sql(sql)
try:
start = time.time() # Start time for execution time measurement
cursor.execute(sql)
result = cursor.fetchallarrow() # Arrow Table
df = result.to_pandas() # Convert to DataFrame for better display
print_result(df)
print(f"\n⏱️ Execution time: {time.time() - start:.3f} seconds")
except Exception as e:
print_error(e)

# =============================================================================
# Step 1: Drop and Create Database
# =============================================================================
print_header("Step 1: Drop and Create Database")
execute("DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;")
execute("SHOW DATABASES;")
execute("CREATE DATABASE sr_arrow_flight_sql;")
execute("SHOW DATABASES;")
execute("USE sr_arrow_flight_sql;")

# =============================================================================
# Step 2: Create Table
# =============================================================================
print_header("Step 2: Create Table")
execute("""
CREATE TABLE sr_arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");
""")

execute("SHOW CREATE TABLE sr_arrow_flight_sql_test;")

# =============================================================================
# Step 3: Insert Data
# =============================================================================
print_header("Step 3: Insert Data")
execute("""
INSERT INTO sr_arrow_flight_sql_test VALUES
(0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
(1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
(2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
(3, 4, "ID", 4, 4, '2025-04-22'),
(4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');
""")

# =============================================================================
# Step 4: Query Data
# =============================================================================
print_header("Step 4: Query Data")
execute("SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;")

# =============================================================================
# Step 5: Session Variables
# =============================================================================
print_header("Step 5: Session Variables")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
execute("SET query_mem_limit = 2147483648;")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")

# =============================================================================
# Step 6: Aggregation Query
# =============================================================================
print_header("Step 6: Aggregation Query")
execute("""
SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
FROM sr_arrow_flight_sql_test
GROUP BY k5
ORDER BY k5;
""")

# =============================================================================
# Step 7: Close Connection
# =============================================================================
print_header("Step 7: Close Connection")
cursor.close()
conn.close()
print("✅ Test completed successfully.")