Skip to main content

Enable Arrow Flight for a CelerData Cluster

CelerData supports connections via Apache Arrow Flight SQL protocol.

Apache Arrow Flight protocol is a remote procedure call (RPC) framework for high-performance data transfer, usually implemented based on the gRPC protocol. Arrow Flight SQL is the Arrow Flight's extension in SQL scenarios. It combines Arrow memory format and Flight RPC, aiming to achieve efficient database data exchange, allowing users to execute SQL queries and obtain columnar data through this high-speed channel.

Enable Arrow Flight​

important
  • Arrow Flight is supported in the v4.0.7 and later. If your cluster is running on an earlier version, upgrade it before proceeding.
  • You can only enable or disable Arrow Flight for your cluster when the cluster (including all warehouses within) are in the Running state.

Follow these steps to enable Arrow Flight:

  1. Sign in to the CelerData Cloud BYOC console.
  2. On the Clusters page, click the cluster that you want to enable Arrow Flight for.
  3. On the Cluster parameters tab of the cluster details page, turn on the switch of the Arrow Flight field in the Cluster Configuration section.

Use Arrow Flight to interact with your cluster​

Follow these steps to connect to and interact with your CelerData cluster using Python ADBC Driver via Arrow Flight SQL protocol. Refer to Appendix for the complete code example.

note

Python 3.9 or later is a prerequisite.

Step 1. Install libraries​

Use pip to install adbc_driver_manager and adbc_driver_flightsql from PyPI:

pip install adbc_driver_manager
pip install adbc_driver_flightsql

Import the following modules or libraries into your code:

  • Required libraries:
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
  • Optional modules for better usability and debugging:
import pandas as pd       # Optional: for better result display using DataFrame
import traceback # Optional: for detailed error traceback during SQL execution
import time # Optional: for measuring SQL execution time

Step 2. Connect to your cluster​

On the client side, create an Arrow Flight SQL client using the following information:

  • Endpoint of your CelerData cluster
  • Port 443
  • Username and password of the database user that has the necessary privileges

Example:

FE_HOST = "xxxxxxxx.cloud-app.celerdata.org"
FE_PORT = 443

conn = flight_sql.connect(
uri=f"grpcs://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "johndoe",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "my_1st_password",
}
)
cursor = conn.cursor()

After the connection is established, you can interact with your CelerData cluster by executing SQL statements through the returned Cursor.

Appendix​

The following code example will walk you through the general process of interacting with your CelerData cluster via Arrow Flight.

important

You must modify the code example to replace <cluster_endpoint> with the actual endpoint of your cluster, and <username> and <password> with the username and password of the database user you use to access the cluster. Make sure the user has the necessary privileges.

# =============================================================================
# CelerData Arrow Flight SQL Test Script
# =============================================================================
# pip install adbc_driver_manager adbc_driver_flightsql pandas
# =============================================================================

# =============================================================================
# Required core modules for connecting to CelerData via Arrow Flight SQL
# =============================================================================
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql

# =============================================================================
# Optional modules for better usability and debugging
# =============================================================================
import pandas as pd # Optional: for better result display using DataFrame
import traceback # Optional: for detailed error traceback during SQL execution
import time # Optional: for measuring SQL execution time

# =============================================================================
# CelerData Flight SQL Configuration
# =============================================================================

FE_HOST = "<cluster_endpoint>"
FE_PORT = 443

# =============================================================================
# Connect to CelerData
# =============================================================================
conn = flight_sql.connect(
uri=f"grpcs://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "<username>",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "<password>",
}
)

cursor = conn.cursor()

# =============================================================================
# Utility functions for better output formatting and SQL execution
# =============================================================================

def print_header(title: str):
"""
Print a section header for better readability.
"""
print("\n" + "=" * 80)
print(f"🟢 {title}")
print("=" * 80)


def print_sql(sql: str):
"""
Print the SQL statement before execution.
"""
print(f"\n🟔 SQL:\n{sql.strip()}")


def print_result(df: pd.DataFrame):
"""
Print the result DataFrame in a readable format.
"""
if df.empty:
print("\n🟢 Result: (no rows returned)\n")
else:
print("\n🟢 Result:\n")
print(df.to_string(index=False))


def print_error(e: Exception):
"""
Print the error traceback if SQL execution fails.
"""
print("\nšŸ”“ Error occurred:")
traceback.print_exc()


def execute(sql: str):
"""
Execute a SQL statement and print the result and execution time.
"""
print_sql(sql)
try:
start = time.time() # Start time for execution time measurement
cursor.execute(sql)
result = cursor.fetchallarrow() # Arrow Table
df = result.to_pandas() # Convert to DataFrame for better display
print_result(df)
print(f"\nā±ļø Execution time: {time.time() - start:.3f} seconds")
except Exception as e:
print_error(e)

# =============================================================================
# Step 1: Drop and Create Database
# =============================================================================
print_header("Step 1: Drop and Create Database")
execute("DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;")
execute("SHOW DATABASES;")
execute("CREATE DATABASE sr_arrow_flight_sql;")
execute("SHOW DATABASES;")
execute("USE sr_arrow_flight_sql;")

# =============================================================================
# Step 2: Create Table
# =============================================================================
print_header("Step 2: Create Table")
execute("""
CREATE TABLE sr_arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");
""")

execute("SHOW CREATE TABLE sr_arrow_flight_sql_test;")

# =============================================================================
# Step 3: Insert Data
# =============================================================================
print_header("Step 3: Insert Data")
execute("""
INSERT INTO sr_arrow_flight_sql_test VALUES
(0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
(1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
(2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
(3, 4, "ID", 4, 4, '2025-04-22'),
(4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');
""")

# =============================================================================
# Step 4: Query Data
# =============================================================================
print_header("Step 4: Query Data")
execute("SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;")

# =============================================================================
# Step 5: Session Variables
# =============================================================================
print_header("Step 5: Session Variables")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
execute("SET query_mem_limit = 2147483648;")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
execute("SHOW VARIABLES LIKE '%arrow_flight_proxy%';")
execute("SET arrow_flight_proxy_enabled = true;")
execute("SET arrow_flight_proxy = 'fe-proxy.example.com';")
execute("SHOW VARIABLES LIKE '%arrow_flight_proxy%';")

# =============================================================================
# Step 6: Aggregation Query
# =============================================================================
print_header("Step 6: Aggregation Query")
execute("""
SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
FROM sr_arrow_flight_sql_test
GROUP BY k5
ORDER BY k5;
""")

# =============================================================================
# Step 7: Close Connection
# =============================================================================
print_header("Step 7: Close Connection")
cursor.close()
conn.close()
print("āœ… Test completed successfully.")