跳到主要内容
版本: 最新版-4.0

使用 SQLAlchemy 和 Alembic 进行 Schema 管理和迁移

本指南介绍如何使用 Python 生态系统(包括 SQLAlchemy、Alembic 和 sqlacodegen)通过 starrocks SQLAlchemy 方言来管理 StarRocks 模式(Schema)。它旨在帮助您了解模式迁移的用途以及如何与 StarRocks 有效地使用它

概览

许多用户直接使用 SQL DDL 来管理 StarRocks 表、视图和物化视图。然而,随着项目的增长,手动维护 ALTER TABLE 语句很容易出错且难以跟踪。

StarRocks SQLAlchemy 方言 (starrocks) 提供了

  • StarRocks 视图物化视图的完整 SQLAlchemy 模型层
  • 表结构和表属性(包括视图和物化视图)的声明式定义
  • Alembic 的集成,以便可以自动检测生成模式更改
  • sqlacodegen 等工具的兼容性,用于反向生成模型

这使得 Python 用户能够以声明式版本控制自动化的方式维护 StarRocks 模式。

主要优势

尽管模式迁移传统上与 OLTP 数据库相关,但它在 StarRocks 等数据仓库系统中也很有价值。团队将 Alembic 与 StarRocks 方言一起使用,是因为以下好处。

声明式模式定义

一旦您以 Python ORM 模型或 SQLAlchemy 核心样式定义模式,就不再需要手动编写 ALTER TABLE 语句了。

自动差异比较和自动生成

Alembic 会比较当前的 StarRocks 模式您的 SQLAlchemy 模型,并自动生成迁移脚本(CREATE/DROP/ALTER)。

可审查、版本控制的迁移

每个模式更改都会成为一个迁移文件(Python),因此用户可以跟踪更改并在需要时回滚。

跨环境的一致工作流程

可以使用相同的流程将模式更改应用于开发、分级和生产环境。

安装和连接

先决条件**

  • StarRocks Python 客户端:1.3.2 或更高版本
  • SQLAlchemy:1.4 或更高版本(推荐使用 SQLAlchemy 2.0,并且使用 sqlacodegen 需要它)
  • Alembic:1.16 或更高版本

安装 StarRocks Python 客户端

运行以下命令安装 StarRocks Python 客户端。

pip install starrocks

连接到 StarRocks

使用以下 URL 连接到您的 StarRocks 集群。

starrocks://<user>:<password>@<FE_host>:<query_port>/[<catalog>.]<database>
  • user:用于连接到集群的用户名。
  • password:用户密码。
  • FE_host:FE 的 IP 地址。
  • query_port:FE 的 query_port(默认值:9030)。
  • catalog:您的数据库所在的目录的名称。
  • database:您想要连接的数据库的名称。

安装后,您可以使用以下代码示例快速验证连接

from sqlalchemy import create_engine, text

# you need to create `mydatabase` first
engine = create_engine("starrocks://root@localhost:9030/mydatabase")

with engine.connect() as conn:
conn.execute(text("SELECT 1")).fetchall()
print("Connection successful!")

定义 StarRocks 模型(声明式 ORM)

StarRocks 方言支持

  • 视图
  • 物化视图

它还支持 StarRocks 特定的表属性,例如

  • ENGINE (OLAP)
  • 关键模型(DUPLICATE KEYPRIMARY KEYUNIQUE KEYAGGREGATE KEY
  • PARTITION BY 变体(RANGE / LIST / Expression 分区)
  • DISTRIBUTED BY 变体(HASH / RANDOM)
  • ORDER BY
  • 表属性(例如 replication_num, storage_medium
重要
  • StarRocks 方言选项作为以 starrocks_ 为前缀的关键字参数传递。
  • starrocks_ 前缀必须是小写。后缀可以是任何大小写(例如,PRIMARY_KEYprimary_key)。
  • 如果您指定了表键(例如 starrocks_primary_key="id"),则所涉及的列必须Column(...) 中也标记为 primary_key=True,以便 SQLAlchemy 元数据和 Alembic 自动生成能够正确运行。

下面的示例反映了真实的公共 API 和参数名称。

表示例

StarRocks 表选项可以通过 ORM(通过 __table_args__)和核心(通过 Table(..., starrocks_...=...))样式指定。

ORM(声明式)样式

from sqlalchemy import create_engine
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
from starrocks import INTEGER, STRING

# with the same engine as the quick test
engine = create_engine("starrocks://root@localhost:9030/mydatabase")

Base = declarative_base()

class MyTable(Base):
__tablename__ = 'my_orm_table'
id: Mapped[int] = mapped_column(INTEGER, primary_key=True)
name: Mapped[str] = mapped_column(STRING)

__table_args__ = {
'comment': 'table comment',

'starrocks_primary_key': 'id',
'starrocks_distributed_by': 'HASH(id) BUCKETS 10',
'starrocks_properties': {'replication_num': '1'}
}

# Create the table in the database
Base.metadata.create_all(engine)

核心样式

from sqlalchemy import Column, MetaData, Table, create_engine
from starrocks import INTEGER, VARCHAR

# with the same engine as the quick test
engine = create_engine("starrocks://root@localhost:9030/mydatabase")

metadata = MetaData()

my_core_table = Table(
'my_core_table',
metadata,
Column('id', INTEGER, primary_key=True),
Column('name', VARCHAR(50)),

# StarRocks-specific arguments
starrocks_primary_key='id',
starrocks_distributed_by='HASH(id) BUCKETS 10',
starrocks_properties={"replication_num": "1"}
)

# Create the table in the database
metadata.create_all(engine)
注意

有关表属性和数据类型的全面参考,请参阅 参考 [4]

视图示例

以下是推荐的视图定义样式,使用 columns 作为字典列表(name/comment)。此示例基于现有表 my_core_table

from starrocks.schema import View

# Reuse the metadata from the Core table example above
metadata = my_core_table.metadata

user_view = View(
"user_view",
metadata,
definition="SELECT id, name FROM my_core_table WHERE name IS NOT NULL",
columns=[
{"name": "id", "comment": "ID"},
{"name": "name", "comment": "Name"},
],
comment="Active users",
)
注意

有关更多视图选项和限制,请参阅 参考 [5]

物化视图示例

物化视图的定义方式类似。starrocks_refresh 属性是一个字符串,指示刷新策略。

from starrocks.schema import MaterializedView

# Reuse the metadata from the Core table example above
metadata = my_core_table.metadata

# Create a simple Materialized View (asynchronous refresh)
user_stats_ = MaterializedView(
'user_stats_',
metadata,
definition='SELECT id, COUNT(*) AS cnt FROM my_core_table GROUP BY id',
starrocks_refresh='ASYNC'
)
注意

有关更多选项和 ALTER 限制,请参阅 参考 [6]

Alembic 集成

StarRocks SQLAlchemy 方言完全支持

  • 创建/删除表
  • 创建/删除视图
  • 创建/删除物化视图
  • 检测 StarRocks 特有属性上的支持的更改(例如,表属性和分布)

这使得 Alembic 的自动生成能够正常工作。

初始化 Alembic

  1. 初始化 Alembic

    alembic init migrations
  2. alembic.ini 中配置数据库 URL

    # alembic.ini
    sqlalchemy.url = starrocks://<user>:<password>@<FE_host>:<query_port>/[<catalog>.]<database>
  3. 启用 StarRocks 方言日志记录(可选)

    您可以在 alembic.ini 中启用 starrocks 日志记录器,以通过日志观察表的检测到的更改。有关详细信息,请参阅 参考 [2]

    编辑 env.py(配置离线和在线路径)

    from alembic import context
    from starrocks.alembic import render_column_type, include_object_for_view_
    from starrocks.alembic.starrocks import StarRocksImpl # noqa: F401 (ensure impl registered)

    from myapp.models import Base # adjust to your project

    target_metadata = Base.metadata


    def run_migrations_offline() -> None:
    url = context.config.get_main_option("sqlalchemy.url")
    context.configure(
    url=url,
    target_metadata=target_metadata,
    render_item=render_column_type,
    include_object=include_object_for_view_
    )

    with context.begin_transaction():
    context.run_migrations()


    def run_migrations_online() -> None:
    # ... create engine and connect as in alembic default env.py ...
    with connectable.connect() as connection:
    context.configure(
    connection=connection,
    target_metadata=target_metadata,
    render_item=render_column_type,
    include_object=include_object_for_view_
    )

    with context.begin_transaction():
    context.run_migrations()

自动生成迁移

alembic revision --autogenerate -m "initial schema"

Alembic 将比较 SQLAlchemy 模型与实际的 StarRocks 模式,并输出正确的 DDL。

应用迁移

alembic upgrade head

也支持降级(可逆的情况下)。

重要

StarRocks DDL 在多个语句之间不是事务性的。如果升级中途失败,您可能需要检查已应用的更改并执行手动修复(例如,编写补偿性迁移或运行手动 DDL),然后再重新运行。

支持的模式更改操作

该方言支持 Alembic 自动生成以下内容:

  • :创建/删除,以及通过 starrocks_* 声明的 StarRocks 特定属性的差异比较(在 StarRocks ALTER 支持范围内)
  • 视图:创建/删除/修改(主要是与定义相关的更改;某些属性是不可变的)
  • 物化视图:创建/删除/修改(仅限于可变子句,如刷新策略和属性)

某些 StarRocks DDL 更改是不可逆或不可更改的。您只能通过删除并重新创建表/视图/物化视图来进行这些更改。如果您在方言中指定了这些更改,自动生成将发出警告或抛出错误

本节展示了一个可运行的端到端工作流程,包括在哪里暂停和审查生成的文件。

步骤 1. 创建项目目录并初始化 Alembic

mkdir my_sr_alembic_project
cd my_sr_alembic_project

alembic init alembic

步骤 2. 配置 alembic.ini

编辑 alembic.ini 中的 URL

sqlalchemy.url = starrocks://root@localhost:9030/mydatabase

步骤 3. 定义模型

为您的模型创建一个包

mkdir -p myapp
touch myapp/__init__.py

创建 myapp/models.py 并将您的表/视图/物化视图定义放在包中

注意

使用 Alembic 迁移时,请勿在模型模块中调用 metadata.create_all(engine)

from sqlalchemy import Column, Table
from sqlalchemy.orm import Mapped, declarative_base, mapped_column

from starrocks import INTEGER, STRING, VARCHAR
from starrocks.schema import MaterializedView, View

Base = declarative_base()


# --- ORM table ---
class MyOrmTable(Base):
__tablename__ = "my_orm_table"

id: Mapped[int] = mapped_column(INTEGER, primary_key=True)
name: Mapped[str] = mapped_column(STRING)

__table_args__ = {
"comment": "table comment",
"starrocks_primary_key": "id",
"starrocks_distributed_by": "HASH(id) BUCKETS 10",
"starrocks_properties": {"replication_num": "1"},
}


# --- Core table on the same metadata (important for Alembic target_metadata) ---
my_core_table = Table(
"my_core_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
comment="core table comment",
starrocks_primary_key="id",
starrocks_distributed_by="HASH(id) BUCKETS 10",
starrocks_properties={"replication_num": "1"},
)


# --- View ---
user_view = View(
"user_view",
Base.metadata,
definition="SELECT id, name FROM my_core_table WHERE name IS NOT NULL",
columns=[
{"name": "id", "comment": "ID"},
{"name": "name", "comment": "Name"},
],
comment="Active users",
)


# --- Materialized View ---
user_stats_mv = MaterializedView(
"user_stats_mv",
Base.metadata,
definition="SELECT id, COUNT(*) AS cnt FROM my_core_table GROUP BY id",
starrocks_refresh="ASYNC",
)

步骤 4. 配置 env.py 以进行自动生成

编辑 alembic/env.py

  1. 导入 myapp.models 以设置 target_metadata
  2. 导入 render_column_typeinclude_object_for_view_mv 以在 run_migrations_offline()run_migrations_online() 中设置它们,以便正确处理视图和 MV,并正确渲染 StarRocks 列类型。
注意

您需要向 env.py 添加或修改这些行,而不是替换生成的 env.py 文件。

from alembic import context
from starrocks.alembic import render_column_type, include_object_for_view_mv
from starrocks.alembic.starrocks import StarRocksImpl # noqa: F401

from myapp.models import Base

target_metadata = Base.metadata

# Optional: set version table replication for single-BE dev clusters
version_table_kwargs = {"starrocks_properties": {"replication_num": "1"}}

# In both run_migrations_offline() and run_migrations_online(), ensure:
def run_migrations_offline() -> None:
url = context.config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
render_item=render_column_type,
include_object=include_object_for_view_mv,
version_table_kwargs=version_table_kwargs,
)


def run_migrations_online() -> None:
# ... create engine and connect as in alembic default env.py ...
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
render_item=render_column_type,
include_object=include_object_for_view_mv,
version_table_kwargs=version_table_kwargs,
)

步骤 5. 自动生成第一个修订版

alembic revision --autogenerate -m "create initial schema"

暂停并审查

  1. 检查 alembic/versions/ 下生成的迁移文件。
  2. 确保它包含预期的操作(例如 create_tablecreate_viewcreate_materialized_view)。
  3. 确保它不包含意外的删除或更改。

步骤 6. 预览 SQL 并应用

预览 SQL

alembic upgrade head --sql

暂停并审查

  1. 确认 DDL 的顺序符合您的预期。
  2. 识别任何潜在的繁重操作,并在需要时考虑拆分迁移。

应用

alembic upgrade head
重要

StarRocks DDL 在多个语句之间不是事务性的。如果升级中途失败,您可能需要在重新运行前检查已应用的更改并执行手动修复。

步骤 7. 进行更改并再次自动生成

更新 myapp/models.py

  • 修改现有表my_core_table):添加列,或更新表注释,并更改一个表属性。
  • 添加新表my_new_table)。
注意

添加列可能是一个耗时的模式更改。StarRocks 每次只允许一个正在运行的表模式更改作业。在实践中,建议将“添加/删除/修改列”的更改与其它繁重更改(例如,额外的添加/删除列或批量属性更改)分开,并在需要时将它们拆分到多个 Alembic 修订版中。

from sqlalchemy import Column, Table
from starrocks import INTEGER, VARCHAR

# Modify an existing table (add a column)
# (Update the existing my_core_table definition in-place.)
my_core_table = Table(
"my_core_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
Column("age", INTEGER), # added column only

starrocks_primary_key='id',
starrocks_distributed_by='HASH(id) BUCKETS 10',
starrocks_properties={"replication_num": "1"},
)

my_new_table = Table(
"my_new_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
starrocks_primary_key="id",
starrocks_distributed_by="HASH(id) BUCKETS 10",
starrocks_properties={"replication_num": "1"},
)
alembic revision --autogenerate -m "add a new table, change a old table"

暂停并审查

确保新迁移包含

  • my_new_tablecreate_table(...),以及
  • my_core_table 更改的预期操作(例如,添加列/设置注释/设置属性)。

预览 SQL 并应用

alembic upgrade head --sql
alembic upgrade head

使用 sqlacodegen

sqlacodegen 可以直接从 StarRocks 反向生成 SQLAlchemy 模型

sqlacodegen --options include_dialect_options,keep_dialect_types \
--generator tables \
starrocks://<user>:<password>@<FE_host>:<query_port>/[catalog.]<database> > models.py

支持的对象

  • 视图
  • 物化视图
  • 分区、分布和排序子句以及属性

这在将现有 StarRocks 模式导入 Alembic 时非常有用。

您可以直接使用以上命令来生成 端到端示例 部分中定义的表/视图/物化视图的 Python 脚本。

注意
  • 建议在生成核心样式模型时添加 --generator tables(ORM 生成器可能会根据 NOT NULL / NULL 属性重新排序列)。
  • 关键列可能会生成为 NOT NULL。如果希望它们可为空,请手动调整生成的模型。

限制和最佳实践

  • 某些 StarRocks DDL 操作需要删除并重新创建表;自动生成将发出警告或抛出错误,而不是静默地产生不可用的 SQL。
  • 键模型更改(例如,将 DUPLICATE KEY 更改为 PRIMARY KEY)不支持通过 ALTER TABLE;使用明确的计划(通常是删除并使用回填重新创建)。
  • StarRocks 不提供跨多个语句的事务性 DDL;请审查生成的迁移并操作性地应用它们。如果迁移中途失败,您可能需要手动处理回滚。
  • 对于分布,如果您省略 BUCKETS 子句,StarRocks 可能会自动分配桶计数;在这种情况下,该方言旨在避免产生不必要的差异。

总结

使用 StarRocks SQLAlchemy 方言和 Alembic 集成,您可以

  • ✔ 使用声明式模型定义 StarRocks 模式
  • ✔ 自动检测和生成模式迁移脚本
  • ✔ 对模式演进使用版本控制
  • ✔ 声明式地管理视图和物化视图
  • ✔ 使用 sqlacodegen 反向工程现有模式

这使 StarRocks 模式管理进入现代 Python 数据工程生态系统,并大大简化了跨环境的模式一致性。

参考资料

[1]: starrocks-python-client README

[2]: Alembic 集成

[3]: SQLAlchemy 详情

[4]: 表支持

[5]: 视图支持

[6]: 物化视图支持

开心的水獭 RockyStarRocks 助手

AI 生成的答案基于文档和其他来源。请在非生产环境中测试答案。