跳到主要内容
版本: 最新版本-3.5

通过 Arrow Flight SQL 与 StarRocks 交互

从 v3.5.1 版本开始,StarRocks 支持通过 Apache Arrow Flight SQL 协议进行连接。

概述

通过 Arrow Flight SQL 协议,您可以执行常规的 DDL、DML、DQL 语句,并使用 Python 代码或 Java 代码通过 Arrow Flight SQL ADBC 或 JDBC 驱动程序读取大规模数据。

此解决方案建立了一个从 StarRocks 列式执行引擎到客户端的完全列式数据传输管道,消除了传统 JDBC 和 ODBC 接口中常见的频繁行列转换和序列化开销。这使 StarRocks 能够以零拷贝、低延迟和高吞吐量传输数据。

使用场景

Arrow Flight SQL 集成使 StarRocks 特别适合以下应用场景:

  • 数据科学工作流程,其中 Pandas 和 Apache Arrow 等工具期望列式数据。
  • 数据湖分析,需要高吞吐量、低延迟地访问海量数据集。
  • 机器学习,其中快速迭代和处理速度至关重要。
  • 必须以最小延迟交付数据的实时分析平台。

通过 Arrow Flight SQL,您可以受益于:

  • 端到端列式数据传输,消除了列式和行式格式之间昂贵的转换。
  • 零拷贝数据移动,降低了 CPU 和内存开销。
  • 低延迟和极高的吞吐量,加速了分析和响应能力。

技术方案

传统上,StarRocks 在内部以列式 Block 结构组织查询结果。但是,当使用 JDBC、ODBC 或 MySQL 协议时,数据必须:

  1. 在服务器上序列化为基于行的字节。
  2. 通过网络传输。
  3. 反序列化回目标结构(通常需要重新转换为列式格式)。

这三个步骤会导致:

  • 高序列化/反序列化开销。
  • 复杂的数据转换。
  • 延迟随数据量增长。

与 Arrow Flight SQL 的集成通过以下方式解决了这些问题:

  • 保留端到端列式格式,从 StarRocks 执行引擎直接到客户端。
  • 利用 Apache Arrow 的内存列式表示,该表示针对分析工作负载进行了优化。
  • 使用 Arrow Flight 的协议进行高速传输,实现高效的流式传输,无需中间转换。

Arrow Flight

这种设计提供了真正的零拷贝传输,比传统方法更快、更节省资源。

此外,StarRocks 为 Arrow Flight SQL 提供了通用的 JDBC 驱动程序,因此应用程序可以在不牺牲 JDBC 兼容性或与其他启用 Arrow Flight 的系统的互操作性的情况下采用这种高性能传输路径。

性能比较

全面的测试表明数据检索速度显着提高。在各种数据类型(整数、浮点数、字符串、布尔值和混合列)中,Arrow Flight SQL 的性能始终优于传统的 PyMySQL 和 Pandas read_sql 接口。主要结果包括:

  • 对于读取 1000 万个整数行,执行时间从约 35 秒降至 0.4 秒(快约 85 倍)。
  • 对于混合列表,性能提升达到了 160 倍的加速。
  • 即使在不太复杂的查询(例如,单字符串列)中,性能提升也超过了 12 倍。

平均而言,Arrow Flight SQL 实现了:

  • 20 倍到 160 倍更快的传输时间,具体取决于查询复杂性和数据类型。
  • 由于消除了冗余的序列化步骤,CPU 和内存使用量明显减少。

这些性能提升直接转化为更快的仪表板、更具响应性的数据科学工作流程,以及实时分析更大规模数据集的能力。

用法

请按照以下步骤使用 Python ADBC 驱动程序通过 Arrow Flight SQL 协议连接并与 StarRocks 交互。有关完整的代码示例,请参阅附录

注意

Python 3.9 或更高版本是先决条件。

步骤 1. 安装库

使用 pip 从 PyPI 安装 adbc_driver_manageradbc_driver_flightsql

pip install adbc_driver_manager
pip install adbc_driver_flightsql

将以下模块或库导入到您的代码中

  • 必需库
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
  • 可选模块,用于更好的可用性和调试
import pandas as pd       # Optional: for better result display using DataFrame
import traceback # Optional: for detailed error traceback during SQL execution
import time # Optional: for measuring SQL execution time

步骤 2. 连接到 StarRocks

注意
  • 如果要使用命令行启动 FE 服务,可以使用以下任一方式:

    • 指定环境变量 JAVA_TOOL_OPTIONS

      export JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
    • fe.conf 中指定 FE 配置项 JAVA_OPTS。这样,您可以附加其他 JAVA_OPTS 值。

      JAVA_OPTS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED ..."
  • 如果要在 IntelliJ IDEA 中运行该服务,则必须将以下选项添加到 Run/Debug Configurations 中的 Build and run

    --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED

配置 StarRocks

在通过 Arrow Flight SQL 连接到 StarRocks 之前,必须首先配置 FE 和 BE 节点,以确保已启用 Arrow Flight SQL 服务并在指定端口上监听。

在 FE 配置文件 fe.conf 和 BE 配置文件 be.conf 中,将 arrow_flight_port 设置为可用端口。修改配置文件后,重新启动 FE 和 BE 服务以使修改生效。

注意

您必须为 FE 和 BE 设置不同的 arrow_flight_port

示例

// fe.conf
arrow_flight_port = 9408
// be.conf
arrow_flight_port = 9419

建立连接

在客户端,使用以下信息创建 Arrow Flight SQL 客户端:

  • StarRocks FE 的主机地址
  • Arrow Flight 用于监听 StarRocks FE 的端口
  • 具有必要权限的 StarRocks 用户的用户名和密码

示例

FE_HOST = "127.0.0.1"
FE_PORT = 9408

conn = flight_sql.connect(
uri=f"grpc://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}
)
cursor = conn.cursor()

建立连接后,您可以通过执行通过返回的 Cursor 执行 SQL 语句来与 StarRocks 交互。

步骤 3. (可选)预定义实用程序函数

这些函数用于格式化输出、标准化格式和简化调试。您可以选择在代码中定义它们以进行测试。

# =============================================================================
# Utility functions for better output formatting and SQL execution
# =============================================================================

# Print a section header
def print_header(title: str):
"""
Print a section header for better readability.
"""
print("\n" + "=" * 80)
print(f"🟢 {title}")
print("=" * 80)

# Print the SQL statement being executed
def print_sql(sql: str):
"""
Print the SQL statement before execution.
"""
print(f"\n🟡 SQL:\n{sql.strip()}")

# Print the result DataFrame
def print_result(df: pd.DataFrame):
"""
Print the result DataFrame in a readable format.
"""
if df.empty:
print("\n🟢 Result: (no rows returned)\n")
else:
print("\n🟢 Result:\n")
print(df.to_string(index=False))

# Print the error traceback
def print_error(e: Exception):
"""
Print the error traceback if SQL execution fails.
"""
print("\n🔴 Error occurred:")
traceback.print_exc()

# Execute a SQL statement and print the result
def execute(sql: str):
"""
Execute a SQL statement and print the result and execution time.
"""
print_sql(sql)
try:
start = time.time() # Optional: start time for execution time measurement
cursor.execute(sql)
result = cursor.fetchallarrow() # Arrow Table
df = result.to_pandas() # Optional: convert to DataFrame for better display
print_result(df)
print(f"\n⏱️ Execution time: {time.time() - start:.3f} seconds")
except Exception as e:
print_error(e)

步骤 4. 与 StarRocks 交互

本节将指导您完成一些基本操作,例如创建表、加载数据、检查表元数据、设置变量和运行查询。

注意

下面列出的输出示例是基于前面步骤中描述的可选模块和实用程序函数实现的。

  1. 创建一个数据库和一个将加载数据的表,并检查表架构。

    # Step 1: Drop and create database
    print_header("Step 1: Drop and Create Database")
    execute("DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;")
    execute("SHOW DATABASES;")
    execute("CREATE DATABASE sr_arrow_flight_sql;")
    execute("SHOW DATABASES;")
    execute("USE sr_arrow_flight_sql;")

    # Step 2: Create table
    print_header("Step 2: Create Table")
    execute("""
    CREATE TABLE sr_arrow_flight_sql_test
    (
    k0 INT,
    k1 DOUBLE,
    k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
    k3 DECIMAL(27,9) DEFAULT "0",
    k4 BIGINT NULL DEFAULT '10',
    k5 DATE
    )
    DISTRIBUTED BY HASH(k5) BUCKETS 5
    PROPERTIES("replication_num" = "1");
    """)
    execute("SHOW CREATE TABLE sr_arrow_flight_sql_test;")

    示例输出

    ================================================================================
    🟢 Step 1: Drop and Create Database
    ================================================================================

    🟡 SQL:
    DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;
    /Users/starrocks/test/venv/lib/python3.9/site-packages/adbc_driver_manager/dbapi.py:307: Warning: Cannot disable autocommit; conn will not be DB-API 2.0 compliant
    warnings.warn(

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.025 seconds

    🟡 SQL:
    SHOW DATABASES;

    🟢 Result:

    Database
    _statistics_
    hits
    information_schema
    sys

    ⏱️ Execution time: 0.014 seconds

    🟡 SQL:
    CREATE DATABASE sr_arrow_flight_sql;

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.012 seconds

    🟡 SQL:
    SHOW DATABASES;

    🟢 Result:

    Database
    _statistics_
    hits
    information_schema
    sr_arrow_flight_sql
    sys

    ⏱️ Execution time: 0.005 seconds

    🟡 SQL:
    USE sr_arrow_flight_sql;

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.006 seconds

    ================================================================================
    🟢 Step 2: Create Table
    ================================================================================

    🟡 SQL:
    CREATE TABLE sr_arrow_flight_sql_test
    (
    k0 INT,
    k1 DOUBLE,
    k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
    k3 DECIMAL(27,9) DEFAULT "0",
    k4 BIGINT NULL DEFAULT '10',
    k5 DATE
    )
    DISTRIBUTED BY HASH(k5) BUCKETS 5
    PROPERTIES("replication_num" = "1");

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.021 seconds

    🟡 SQL:
    SHOW CREATE TABLE sr_arrow_flight_sql_test;

    🟢 Result:

    Table Create Table
    sr_arrow_flight_sql_test CREATE TABLE `sr_arrow_flight_sql_test` (\n `k0` int(11) NULL COMMENT "",\n `k1` double NULL COMMENT "",\n `k2` varchar(32) NULL DEFAULT "" COMMENT "",\n `k3` decimal(27, 9) NULL DEFAULT "0" COMMENT "",\n `k4` bigint(20) NULL DEFAULT "10" COMMENT "",\n `k5` date NULL COMMENT ""\n) ENGINE=OLAP \nDUPLICATE KEY(`k0`)\nDISTRIBUTED BY HASH(`k5`) BUCKETS 5 \nPROPERTIES (\n"compression" = "LZ4",\n"fast_schema_evolution" = "true",\n"replicated_storage" = "true",\n"replication_num" = "1"\n);

    ⏱️ Execution time: 0.005 seconds
  2. 插入数据、运行一些查询和设置变量。

    # Step 3: Insert data
    print_header("Step 3: Insert Data")
    execute("""
    INSERT INTO sr_arrow_flight_sql_test VALUES
    (0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
    (1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
    (2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
    (3, 4, "ID", 4, 4, '2025-04-22'),
    (4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');
    """)

    # Step 4: Query data
    print_header("Step 4: Query Data")
    execute("SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;")

    # Step 5: Session variables
    print_header("Step 5: Session Variables")
    execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
    execute("SET query_mem_limit = 2147483648;")
    execute("SHOW VARIABLES LIKE '%query_mem_limit%';")

    # Step 6: Aggregation query
    print_header("Step 6: Aggregation Query")
    execute("""
    SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
    FROM sr_arrow_flight_sql_test
    GROUP BY k5
    ORDER BY k5;
    """)

    示例输出

    ================================================================================
    🟢 Step 3: Insert Data
    ================================================================================

    🟡 SQL:
    INSERT INTO sr_arrow_flight_sql_test VALUES
    (0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
    (1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
    (2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
    (3, 4, "ID", 4, 4, '2025-04-22'),
    (4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.149 seconds

    ================================================================================
    🟢 Step 4: Query Data
    ================================================================================

    🟡 SQL:
    SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;

    🟢 Result:

    0 0.10000 ID 0.000100000 1111111111 2025-04-21
    1 0.20000 ID_1 1.000000010 0 2025-04-21
    2 3.40000 ID_1 3.100000000 123456 2025-04-22
    3 4.00000 ID 4.000000000 4 2025-04-22
    4 122345.54321 ID 122345.543210000 5 2025-04-22

    ⏱️ Execution time: 0.019 seconds

    ================================================================================
    🟢 Step 5: Session Variables
    ================================================================================

    🟡 SQL:
    SHOW VARIABLES LIKE '%query_mem_limit%';

    🟢 Result:

    Variable_name Value
    query_mem_limit 0

    ⏱️ Execution time: 0.005 seconds

    🟡 SQL:
    SET query_mem_limit = 2147483648;

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.007 seconds

    🟡 SQL:
    SHOW VARIABLES LIKE '%query_mem_limit%';

    🟢 Result:

    Variable_name Value
    query_mem_limit 2147483648

    ⏱️ Execution time: 0.005 seconds

    ================================================================================
    🟢 Step 6: Aggregation Query
    ================================================================================

    🟡 SQL:
    SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
    FROM sr_arrow_flight_sql_test
    GROUP BY k5
    ORDER BY k5;

    🟢 Result:

    2025-04-21 0.30000 2 0.500050005000
    2025-04-22 122352.94321 3 40784.214403333333

    ⏱️ Execution time: 0.014 second

步骤 5. 关闭连接

在您的代码中包含以下部分以关闭连接。

# Step 7: Close
print_header("Step 7: Close Connection")
cursor.close()
conn.close()
print("✅ Test completed successfully.")

示例输出

================================================================================
🟢 Step 7: Close Connection
================================================================================
✅ Test completed successfully.

Process finished with exit code 0

大规模数据传输的用例

Python

通过 Python 中的 ADBC 驱动程序(支持 Arrow Flight SQL)连接到 StarRocks 后,您可以使用各种 ADBC API 将 Clickbench 数据集从 StarRocks 加载到 Python 中。

代码示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
from datetime import datetime

# ----------------------------------------
# StarRocks Flight SQL Connection Settings
# ----------------------------------------
# Replace the URI and credentials as needed
my_uri = "grpc://127.0.0.1:9408" # Default Flight SQL port for StarRocks
my_db_kwargs = {
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}

# ----------------------------------------
# SQL Query (ClickBench: hits table)
# ----------------------------------------
# Replace with the actual table and dataset as needed
sql = "SELECT * FROM clickbench.hits LIMIT 1000000;" # Read 1 million rows

# ----------------------------------------
# Method 1: fetchallarrow + to_pandas
# ----------------------------------------
def test_fetchallarrow():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start = datetime.now()
cursor.execute(sql)
arrow_table = cursor.fetchallarrow()
df = arrow_table.to_pandas()
duration = datetime.now() - start

print("\n[Method 1] fetchallarrow + to_pandas")
print(f"Time taken: {duration}, Arrow table size: {arrow_table.nbytes / 1024 / 1024:.2f} MB, Rows: {len(df)}")
print(df.info(memory_usage='deep'))

# ----------------------------------------
# Method 2: fetch_df (recommended)
# ----------------------------------------
def test_fetch_df():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start = datetime.now()
cursor.execute(sql)
df = cursor.fetch_df()
duration = datetime.now() - start

print("\n[Method 2] fetch_df (recommended)")
print(f"Time taken: {duration}, Rows: {len(df)}")
print(df.info(memory_usage='deep'))

# ----------------------------------------
# Method 3: adbc_execute_partitions (for parallel read)
# ----------------------------------------
def test_execute_partitions():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start = datetime.now()
partitions, schema = cursor.adbc_execute_partitions(sql)

# Read the first partition (for demo)
cursor.adbc_read_partition(partitions[0])
arrow_table = cursor.fetchallarrow()
df = arrow_table.to_pandas()
duration = datetime.now() - start

print("\n[Method 3] adbc_execute_partitions (parallel read)")
print(f"Time taken: {duration}, Partitions: {len(partitions)}, Rows: {len(df)}")
print(df.info(memory_usage='deep'))

# ----------------------------------------
# Run All Tests
# ----------------------------------------
if __name__ == "__main__":
test_fetchallarrow()
test_fetch_df()
test_execute_partitions()

结果表明,从 StarRocks 加载 100 万行 Clickbench 数据集(105 列,780 MB)仅需 3 秒。

[Method 1] fetchallarrow + to_pandas
Time taken: 0:00:03.219575, Arrow table size: 717.42 MB, Rows: 1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB

[Method 2] fetch_df (recommended)
Time taken: 0:00:02.358840, Rows: 1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB

[Method 3] adbc_execute_partitions (parallel read)
Time taken: 0:00:02.231144, Partitions: 1, Rows: 1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB

Arrow Flight SQL JDBC 驱动程序

Arrow Flight SQL 协议提供了一个与标准 JDBC 接口兼容的开源 JDBC 驱动程序。您可以轻松地将其集成到各种 BI 工具(例如 Tableau、Power BI、DBeaver 等)中以访问 StarRocks 数据库,就像使用传统的 JDBC 驱动程序一样。此驱动程序的一个显着优势是它支持基于 Apache Arrow 的高速数据传输,这大大提高了查询和数据传输的效率。用法几乎与传统的 MySQL JDBC 驱动程序相同。您只需要在连接 URL 中将 jdbc:mysql 替换为 jdbc:arrow-flight-sql 即可无缝切换。查询结果仍然以标准的 ResultSet 格式返回,从而确保与现有 JDBC 处理逻辑的兼容性。

注意

请注意,如果您使用的是 Java 9 或更高版本,则必须将 --add-opens=java.base/java.nio=ALL-UNNAMED 添加到 Java 代码中,以公开 JDK 的内部结构。否则,您可能会遇到某些错误。

  • 如果要使用命令行启动 FE 服务,可以使用以下任一方式:

    • 指定环境变量 JAVA_TOOL_OPTIONS

      export JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
    • fe.conf 中指定 FE 配置项 JAVA_OPTS。这样,您可以附加其他 JAVA_OPTS 值。

      JAVA_OPTS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED ..."
  • 如果要在 IntelliJ IDEA 中进行调试,则必须将以下选项添加到 Run/Debug Configurations 中的 Build and run

    --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED

Arrow Flight Example

单击此处以查看 POM 依赖项
<properties>
<adbc.version>0.15.0</adbc.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-jdbc</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-core</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-manager</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
</dependencies>

代码示例

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class ArrowFlightSqlIntegrationTest {

private static final String JDBC_URL = "jdbc:arrow-flight-sql://127.0.0.1:9408"
+ "?useEncryption=false"
+ "&useServerPrepStmts=false"
+ "&useSSL=false"
+ "&useArrowFlightSql=true";

private static final String USER = "root";
private static final String PASSWORD = "";

private static int testCaseNum = 1;

public static void main(String[] args) {
try {
// Load Arrow Flight SQL JDBC driver
Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");

try (Connection conn = DriverManager.getConnection(JDBC_URL, USER, PASSWORD);
Statement stmt = conn.createStatement()) {

testUpdate(stmt, "DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;");
testQuery(stmt, "SHOW PROCESSLIST;");
testUpdate(stmt, "CREATE DATABASE sr_arrow_flight_sql;");
testQuery(stmt, "SHOW DATABASES;");
testUpdate(stmt, "USE sr_arrow_flight_sql;");
testUpdate(stmt, "CREATE TABLE sr_table_test (id INT, name STRING) ENGINE=OLAP PRIMARY KEY (id) " +
"DISTRIBUTED BY HASH(id) BUCKETS 1 " +
"PROPERTIES ('replication_num' = '1');");
testUpdate(stmt, "INSERT INTO sr_table_test VALUES (1, 'Alice'), (2, 'Bob');");
testQuery(stmt, "SELECT * FROM sr_arrow_flight_sql.sr_table_test;");
}
} catch (Exception e) {
e.printStackTrace();
}

}

/**
* Executes a query and prints the result to the console.
*/
private static void testQuery(Statement stmt, String sql) throws Exception {
System.out.println("Test Case: " + testCaseNum);
System.out.println("▶ Executing query: " + sql);
ResultSet rs = stmt.executeQuery(sql);
try {
System.out.println("Result:");
int columnCount = rs.getMetaData().getColumnCount();
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
System.out.print(rs.getString(i) + "\t");
}
System.out.println();
}
} finally {
rs.close();
}
testCaseNum++;
System.out.println();
}

/**
* Executes an update (DDL or DML) and prints the result to the console.
*/
private static void testUpdate(Statement stmt, String sql) throws Exception {
System.out.println("Test Case: " + testCaseNum);
System.out.println("▶ Executing update: " + sql);
stmt.executeUpdate(sql);
System.out.println("Result: ✅ Success");
testCaseNum++;
System.out.println();
}
}

执行结果

Test Case: 1
▶ Executing update: DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;
Result: ✅ Success

Test Case: 2
▶ Executing query: SHOW PROCESSLIST;
Result:
192.168.124.48_9010_1751449846872 16777217 root Query 2025-07-02 18:46:49 0 OK SHOW PROCESSLIST; false default_warehouse

Test Case: 3
▶ Executing update: CREATE DATABASE sr_arrow_flight_sql;
Result: ✅ Success

Test Case: 4
▶ Executing query: SHOW DATABASES;
Result:
_statistics_
information_schema
sr_arrow_flight_sql
sys

Test Case: 5
▶ Executing update: USE sr_arrow_flight_sql;
Result: ✅ Success

Test Case: 6
▶ Executing update: CREATE TABLE sr_table_test (id INT, name STRING) ENGINE=OLAP PRIMARY KEY (id) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1');
Result: ✅ Success

Test Case: 7
▶ Executing update: INSERT INTO sr_table_test VALUES (1, 'Alice'), (2, 'Bob');
Result: ✅ Success

Test Case: 8
▶ Executing query: SELECT * FROM sr_arrow_flight_sql.sr_table_test;
Result:
1 Alice
2 Bob

Java ADBC 驱动程序

Arrow Flight SQL 协议提供了一个与标准 JDBC 接口兼容的开源 JDBC 驱动程序。您可以轻松地将其集成到各种 BI 工具(例如 Tableau、Power BI、DBeaver 等)中以访问 StarRocks 数据库,就像使用传统的 JDBC 驱动程序一样。此驱动程序的一个显着优势是它支持基于 Apache Arrow 的高速数据传输,这大大提高了查询和数据传输的效率。用法几乎与传统的 MySQL JDBC 驱动程序相同。

注意
  • 如果要使用命令行启动 FE 服务,可以使用以下任一方式:

    • 指定环境变量 JAVA_TOOL_OPTIONS

      export JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
    • fe.conf 中指定 FE 配置项 JAVA_OPTS。这样,您可以附加其他 JAVA_OPTS 值。

      JAVA_OPTS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED ..."
  • 如果要在 IntelliJ IDEA 中进行调试,则必须将以下选项添加到 Run/Debug Configurations 中的 Build and run

    --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
POM 依赖项
<properties>
<adbc.version>0.15.0</adbc.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-jdbc</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-core</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-manager</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
</dependencies>

与 Python 中的情况类似,您还可以在 Java 中直接创建一个 ADBC 客户端以从 StarRocks 读取数据。

在此过程中,您首先需要获取 FlightInfo,然后连接到每个 Endpoint 以获取数据。

代码示例

public static void main(String[] args) throws Exception {
try (BufferAllocator allocator = new RootAllocator()) {
FlightSqlDriver driver = new FlightSqlDriver(allocator);

Map<String, Object> parameters = new HashMap<>();
String host = "localhost";
int port = 9408;
String uri = Location.forGrpcInsecure(host, port).getUri().toString();

AdbcDriver.PARAM_URI.set(parameters, uri);
AdbcDriver.PARAM_USERNAME.set(parameters, "root");
AdbcDriver.PARAM_PASSWORD.set(parameters, "");

try (AdbcDatabase database = driver.open(parameters);
AdbcConnection connection = database.connect();
AdbcStatement statement = connection.createStatement()) {

statement.setSqlQuery("SHOW DATABASES;");

try (AdbcStatement.QueryResult result = statement.executeQuery();
ArrowReader reader = result.getReader()) {

int batchCount = 0;
while (reader.loadNextBatch()) {
batchCount++;
VectorSchemaRoot root = reader.getVectorSchemaRoot();
System.out.println("Batch " + batchCount + ":");
System.out.println(root.contentToTSVString());
}

System.out.println("Total batches: " + batchCount);
}
}
}
}

建议

  • 在上面提到的三种 Java Arrow Flight SQL 连接方法中:

    • 如果后续数据分析依赖于基于行的数据格式,建议使用 jdbc:arrow-flight-sql,它以 JDBC ResultSet 格式返回数据。
    • 如果分析可以直接处理 Arrow 格式或其他列式数据格式,则可以使用 Flight AdbcDriver 或 Flight JdbcDriver。这些选项直接返回 Arrow 格式的数据,避免了行列转换,并利用 Arrow 的功能来加速数据解析。
  • 无论您解析 JDBC ResultSet 还是 Arrow 格式的数据,解析时间通常都比读取数据本身所花费的时间长。如果您发现 Arrow Flight SQL 没有提供预期的性能改进,而不是 jdbc:mysql://,请考虑调查数据解析是否花费了太长时间。

  • 对于所有连接方法,使用 JDK 17 读取数据通常比使用 JDK 1.8 更快。

  • 读取大规模数据集时,与 jdbc:mysql:// 相比,Arrow Flight SQL 通常消耗更少的内存。因此,如果您遇到内存限制,也值得尝试 Arrow Flight SQL。

  • 除了上述三种连接方法之外,您还可以使用本机 FlightClient 连接到 Arrow Flight Server,从而可以更灵活地从多个端点并行读取。Java Flight AdbcDriver 构建于 FlightClient 之上,并提供了比直接使用 FlightClient 更简单的接口。

Spark

目前,官方 Arrow Flight 项目没有支持 Spark 或 Flink 的计划。将来,将逐步添加支持,以允许 starrocks-spark-connector 通过 Arrow Flight SQL 访问 StarRocks,预计读取性能将提高数倍。

使用 Spark 访问 StarRocks 时,除了传统的 JDBC 或 Java 客户端方法外,还可以使用开源 Spark-Flight-Connector 组件作为 Spark DataSource 直接从 StarRocks Flight SQL Server 读取和写入数据。这种基于 Apache Arrow Flight 协议的方法具有以下显着优势:

  • 高性能数据传输 Spark-Flight-Connector 使用 Apache Arrow 作为数据传输格式,从而实现零拷贝、高效的数据交换。StarRocks 的 internal Block 数据格式和 Arrow 之间的转换非常高效,与传统的 CSVJDBC 方法相比,性能提升高达 10 倍,并显着降低了数据传输开销。
  • 原生支持复杂数据类型 Arrow 数据格式原生支持复杂类型(例如 MapArrayStruct 等),与传统的 JDBC 方法相比,可以更好地适应 StarRocks 的复杂数据模型,并增强数据表达能力和兼容性。
  • 支持读、写和流式写入 该组件支持 Spark 作为 Flight SQL 客户端进行高效的读写操作,包括 insertmergeupdatedelete DML 语句,甚至支持流式写入,使其适用于实时数据处理场景。
  • 支持谓词下推和列剪裁 读取数据时,Spark-Flight-Connector 支持谓词下推和列剪裁,从而可以在 StarRocks 端进行数据过滤和列选择,从而显着减少传输的数据量并提高查询性能。
  • 支持聚合下推和并行读取 聚合操作(例如 sumcountmaxmin 等)可以下推到 StarRocks 进行执行,从而减少 Spark 上的计算负载。还支持基于分区的并行读取,从而提高大型数据场景中的读取效率。
  • 更适合大数据场景 与传统的 JDBC 方法相比,Flight SQL 协议更适合大规模、高并发访问场景,从而使 StarRocks 可以充分利用其高性能分析功能。

附录

以下是用法教程中的完整代码示例。

# =============================================================================
# StarRocks Arrow Flight SQL Test Script
# =============================================================================
# pip install adbc_driver_manager adbc_driver_flightsql pandas
# =============================================================================

# =============================================================================
# Required core modules for connecting to StarRocks via Arrow Flight SQL
# =============================================================================
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql

# =============================================================================
# Optional modules for better usability and debugging
# =============================================================================
import pandas as pd # Optional: for better result display using DataFrame
import traceback # Optional: for detailed error traceback during SQL execution
import time # Optional: for measuring SQL execution time

# =============================================================================
# StarRocks Flight SQL Configuration
# =============================================================================
FE_HOST = "127.0.0.1"
FE_PORT = 9408

# =============================================================================
# Connect to StarRocks
# =============================================================================
conn = flight_sql.connect(
uri=f"grpc://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}
)

cursor = conn.cursor()

# =============================================================================
# Utility functions for better output formatting and SQL execution
# =============================================================================

def print_header(title: str):
"""
Print a section header for better readability.
"""
print("\n" + "=" * 80)
print(f"🟢 {title}")
print("=" * 80)


def print_sql(sql: str):
"""
Print the SQL statement before execution.
"""
print(f"\n🟡 SQL:\n{sql.strip()}")


def print_result(df: pd.DataFrame):
"""
Print the result DataFrame in a readable format.
"""
if df.empty:
print("\n🟢 Result: (no rows returned)\n")
else:
print("\n🟢 Result:\n")
print(df.to_string(index=False))


def print_error(e: Exception):
"""
Print the error traceback if SQL execution fails.
"""
print("\n🔴 Error occurred:")
traceback.print_exc()


def execute(sql: str):
"""
Execute a SQL statement and print the result and execution time.
"""
print_sql(sql)
try:
start = time.time() # Start time for execution time measurement
cursor.execute(sql)
result = cursor.fetchallarrow() # Arrow Table
df = result.to_pandas() # Convert to DataFrame for better display
print_result(df)
print(f"\n⏱️ Execution time: {time.time() - start:.3f} seconds")
except Exception as e:
print_error(e)

# =============================================================================
# Step 1: Drop and Create Database
# =============================================================================
print_header("Step 1: Drop and Create Database")
execute("DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;")
execute("SHOW DATABASES;")
execute("CREATE DATABASE sr_arrow_flight_sql;")
execute("SHOW DATABASES;")
execute("USE sr_arrow_flight_sql;")

# =============================================================================
# Step 2: Create Table
# =============================================================================
print_header("Step 2: Create Table")
execute("""
CREATE TABLE sr_arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");
""")

execute("SHOW CREATE TABLE sr_arrow_flight_sql_test;")

# =============================================================================
# Step 3: Insert Data
# =============================================================================
print_header("Step 3: Insert Data")
execute("""
INSERT INTO sr_arrow_flight_sql_test VALUES
(0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
(1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
(2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
(3, 4, "ID", 4, 4, '2025-04-22'),
(4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');
""")

# =============================================================================
# Step 4: Query Data
# =============================================================================
print_header("Step 4: Query Data")
execute("SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;")

# =============================================================================
# Step 5: Session Variables
# =============================================================================
print_header("Step 5: Session Variables")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
execute("SET query_mem_limit = 2147483648;")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")

# =============================================================================
# Step 6: Aggregation Query
# =============================================================================
print_header("Step 6: Aggregation Query")
execute("""
SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
FROM sr_arrow_flight_sql_test
GROUP BY k5
ORDER BY k5;
""")

# =============================================================================
# Step 7: Close Connection
# =============================================================================
print_header("Step 7: Close Connection")
cursor.close()
conn.close()
print("✅ Test completed successfully.")