使用 SQLAlchemy 和 Alembic 进行 Schema 管理和迁移
本指南介绍如何使用 Python 生态系统(包括 SQLAlchemy、Alembic 和 sqlacodegen)通过 starrocks SQLAlchemy 方言来管理 StarRocks 模式(Schema)。它旨在帮助您了解模式迁移的用途以及如何与 StarRocks 有效地使用它。
概览
许多用户直接使用 SQL DDL 来管理 StarRocks 表、视图和物化视图。然而,随着项目的增长,手动维护 ALTER TABLE 语句很容易出错且难以跟踪。
StarRocks SQLAlchemy 方言 (starrocks) 提供了
- StarRocks 表、视图和物化视图的完整 SQLAlchemy 模型层
- 表结构和表属性(包括视图和物化视图)的声明式定义
- 与 Alembic 的集成,以便可以自动检测和生成模式更改
- 与 sqlacodegen 等工具的兼容性,用于反向生成模型
这使得 Python 用户能够以声明式、版本控制和自动化的方式维护 StarRocks 模式。
主要优势
尽管模式迁移传统上与 OLTP 数据库相关,但它在 StarRocks 等数据仓库系统中也很有价值。团队将 Alembic 与 StarRocks 方言一起使用,是因为以下好处。
声明式模式定义
一旦您以 Python ORM 模型或 SQLAlchemy 核心样式定义模式,就不再需要手动编写 ALTER TABLE 语句了。
自动差异比较和自动生成
Alembic 会比较当前的 StarRocks 模式和您的 SQLAlchemy 模型,并自动生成迁移脚本(CREATE/DROP/ALTER)。
可审查、版本控制的迁移
每个模式更改都会成为一个迁移文件(Python),因此用户可以跟踪更改并在需要时回滚。
跨环境的一致工作流程
可以使用相同的流程将模式更改应用于开发、分级和生产环境。
安装和连接
先决条件**
- StarRocks Python 客户端:1.3.2 或更高版本
SQLAlchemy:1.4 或更高版本(推荐使用 SQLAlchemy 2.0,并且使用sqlacodegen需要它)Alembic:1.16 或更高版本
安装 StarRocks Python 客户端
运行以下命令安装 StarRocks Python 客户端。
pip install starrocks
连接到 StarRocks
使用以下 URL 连接到您的 StarRocks 集群。
starrocks://<user>:<password>@<FE_host>:<query_port>/[<catalog>.]<database>
user:用于连接到集群的用户名。password:用户密码。FE_host:FE 的 IP 地址。query_port:FE 的query_port(默认值:9030)。catalog:您的数据库所在的目录的名称。database:您想要连接的数据库的名称。
安装后,您可以使用以下代码示例快速验证连接
from sqlalchemy import create_engine, text
# you need to create `mydatabase` first
engine = create_engine("starrocks://root@localhost:9030/mydatabase")
with engine.connect() as conn:
conn.execute(text("SELECT 1")).fetchall()
print("Connection successful!")
定义 StarRocks 模型(声明式 ORM)
StarRocks 方言支持
- 表
- 视图
- 物化视图
它还支持 StarRocks 特定的表属性,例如
ENGINE(OLAP)- 关键模型(
DUPLICATE KEY、PRIMARY KEY、UNIQUE KEY、AGGREGATE KEY) PARTITION BY变体(RANGE / LIST / Expression 分区)DISTRIBUTED BY变体(HASH / RANDOM)ORDER BY- 表属性(例如
replication_num,storage_medium)
- StarRocks 方言选项作为以
starrocks_为前缀的关键字参数传递。 starrocks_前缀必须是小写。后缀可以是任何大小写(例如,PRIMARY_KEY和primary_key)。- 如果您指定了表键(例如
starrocks_primary_key="id"),则所涉及的列必须在Column(...)中也标记为primary_key=True,以便 SQLAlchemy 元数据和 Alembic 自动生成能够正确运行。
下面的示例反映了真实的公共 API 和参数名称。
表示例
StarRocks 表选项可以通过 ORM(通过 __table_args__)和核心(通过 Table(..., starrocks_...=...))样式指定。
ORM(声明式)样式
from sqlalchemy import create_engine
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
from starrocks import INTEGER, STRING
# with the same engine as the quick test
engine = create_engine("starrocks://root@localhost:9030/mydatabase")
Base = declarative_base()
class MyTable(Base):
__tablename__ = 'my_orm_table'
id: Mapped[int] = mapped_column(INTEGER, primary_key=True)
name: Mapped[str] = mapped_column(STRING)
__table_args__ = {
'comment': 'table comment',
'starrocks_primary_key': 'id',
'starrocks_distributed_by': 'HASH(id) BUCKETS 10',
'starrocks_properties': {'replication_num': '1'}
}
# Create the table in the database
Base.metadata.create_all(engine)
核心样式
from sqlalchemy import Column, MetaData, Table, create_engine
from starrocks import INTEGER, VARCHAR
# with the same engine as the quick test
engine = create_engine("starrocks://root@localhost:9030/mydatabase")
metadata = MetaData()
my_core_table = Table(
'my_core_table',
metadata,
Column('id', INTEGER, primary_key=True),
Column('name', VARCHAR(50)),
# StarRocks-specific arguments
starrocks_primary_key='id',
starrocks_distributed_by='HASH(id) BUCKETS 10',
starrocks_properties={"replication_num": "1"}
)
# Create the table in the database
metadata.create_all(engine)
有关表属性和数据类型的全面参考,请参阅 参考 [4]。
视图示例
以下是推荐的视图定义样式,使用 columns 作为字典列表(name/comment)。此示例基于现有表 my_core_table。
from starrocks.schema import View
# Reuse the metadata from the Core table example above
metadata = my_core_table.metadata
user_view = View(
"user_view",
metadata,
definition="SELECT id, name FROM my_core_table WHERE name IS NOT NULL",
columns=[
{"name": "id", "comment": "ID"},
{"name": "name", "comment": "Name"},
],
comment="Active users",
)
有关更多视图选项和限制,请参阅 参考 [5]。
物化视图示例
物化视图的定义方式类似。starrocks_refresh 属性是一个字符串,指示刷新策略。
from starrocks.schema import MaterializedView
# Reuse the metadata from the Core table example above
metadata = my_core_table.metadata
# Create a simple Materialized View (asynchronous refresh)
user_stats_ = MaterializedView(
'user_stats_',
metadata,
definition='SELECT id, COUNT(*) AS cnt FROM my_core_table GROUP BY id',
starrocks_refresh='ASYNC'
)
有关更多选项和 ALTER 限制,请参阅 参考 [6]。
Alembic 集成
StarRocks SQLAlchemy 方言完全支持
- 创建/删除表
- 创建/删除视图
- 创建/删除物化视图
- 检测 StarRocks 特有属性上的支持的更改(例如,表属性和分布)
这使得 Alembic 的自动生成能够正常工作。
初始化 Alembic
-
初始化 Alembic
alembic init migrations -
在
alembic.ini中配置数据库 URL# alembic.ini
sqlalchemy.url = starrocks://<user>:<password>@<FE_host>:<query_port>/[<catalog>.]<database> -
启用 StarRocks 方言日志记录(可选)
您可以在
alembic.ini中启用starrocks日志记录器,以通过日志观察表的检测到的更改。有关详细信息,请参阅 参考 [2]。编辑
env.py(配置离线和在线路径)from alembic import context
from starrocks.alembic import render_column_type, include_object_for_view_
from starrocks.alembic.starrocks import StarRocksImpl # noqa: F401 (ensure impl registered)
from myapp.models import Base # adjust to your project
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = context.config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
render_item=render_column_type,
include_object=include_object_for_view_
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
# ... create engine and connect as in alembic default env.py ...
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
render_item=render_column_type,
include_object=include_object_for_view_
)
with context.begin_transaction():
context.run_migrations()
自动生成迁移
alembic revision --autogenerate -m "initial schema"
Alembic 将比较 SQLAlchemy 模型与实际的 StarRocks 模式,并输出正确的 DDL。
应用迁移
alembic upgrade head
也支持降级(可逆的情况下)。
StarRocks DDL 在多个语句之间不是事务性的。如果升级中途失败,您可能需要检查已应用的更改并执行手动修复(例如,编写补偿性迁移或运行手动 DDL),然后再重新运行。
支持的模式更改操作
该方言支持 Alembic 自动生成以下内容:
- 表:创建/删除,以及通过
starrocks_*声明的 StarRocks 特定属性的差异比较(在 StarRocks ALTER 支持范围内) - 视图:创建/删除/修改(主要是与定义相关的更改;某些属性是不可变的)
- 物化视图:创建/删除/修改(仅限于可变子句,如刷新策略和属性)
某些 StarRocks DDL 更改是不可逆或不可更改的。您只能通过删除并重新创建表/视图/物化视图来进行这些更改。如果您在方言中指定了这些更改,自动生成将发出警告或抛出错误。
端到端示例(初学者推荐阅读)
本节展示了一个可运行的端到端工作流程,包括在哪里暂停和审查生成的文件。
步骤 1. 创建项目目录并初始化 Alembic
mkdir my_sr_alembic_project
cd my_sr_alembic_project
alembic init alembic
步骤 2. 配置 alembic.ini
编辑 alembic.ini 中的 URL
sqlalchemy.url = starrocks://root@localhost:9030/mydatabase
步骤 3. 定义模型
为您的模型创建一个包
mkdir -p myapp
touch myapp/__init__.py
创建 myapp/models.py 并将您的表/视图/物化视图定义放在包中
使用 Alembic 迁移时,请勿在模型模块中调用 metadata.create_all(engine)。
from sqlalchemy import Column, Table
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
from starrocks import INTEGER, STRING, VARCHAR
from starrocks.schema import MaterializedView, View
Base = declarative_base()
# --- ORM table ---
class MyOrmTable(Base):
__tablename__ = "my_orm_table"
id: Mapped[int] = mapped_column(INTEGER, primary_key=True)
name: Mapped[str] = mapped_column(STRING)
__table_args__ = {
"comment": "table comment",
"starrocks_primary_key": "id",
"starrocks_distributed_by": "HASH(id) BUCKETS 10",
"starrocks_properties": {"replication_num": "1"},
}
# --- Core table on the same metadata (important for Alembic target_metadata) ---
my_core_table = Table(
"my_core_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
comment="core table comment",
starrocks_primary_key="id",
starrocks_distributed_by="HASH(id) BUCKETS 10",
starrocks_properties={"replication_num": "1"},
)
# --- View ---
user_view = View(
"user_view",
Base.metadata,
definition="SELECT id, name FROM my_core_table WHERE name IS NOT NULL",
columns=[
{"name": "id", "comment": "ID"},
{"name": "name", "comment": "Name"},
],
comment="Active users",
)
# --- Materialized View ---
user_stats_mv = MaterializedView(
"user_stats_mv",
Base.metadata,
definition="SELECT id, COUNT(*) AS cnt FROM my_core_table GROUP BY id",
starrocks_refresh="ASYNC",
)
步骤 4. 配置 env.py 以进行自动生成
编辑 alembic/env.py
- 导入
myapp.models以设置target_metadata。 - 导入
render_column_type和include_object_for_view_mv以在run_migrations_offline()和run_migrations_online()中设置它们,以便正确处理视图和 MV,并正确渲染 StarRocks 列类型。
您需要向 env.py 添加或修改这些行,而不是替换生成的 env.py 文件。
from alembic import context
from starrocks.alembic import render_column_type, include_object_for_view_mv
from starrocks.alembic.starrocks import StarRocksImpl # noqa: F401
from myapp.models import Base
target_metadata = Base.metadata
# Optional: set version table replication for single-BE dev clusters
version_table_kwargs = {"starrocks_properties": {"replication_num": "1"}}
# In both run_migrations_offline() and run_migrations_online(), ensure:
def run_migrations_offline() -> None:
url = context.config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
render_item=render_column_type,
include_object=include_object_for_view_mv,
version_table_kwargs=version_table_kwargs,
)
def run_migrations_online() -> None:
# ... create engine and connect as in alembic default env.py ...
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
render_item=render_column_type,
include_object=include_object_for_view_mv,
version_table_kwargs=version_table_kwargs,
)
步骤 5. 自动生成第一个修订版
alembic revision --autogenerate -m "create initial schema"
暂停并审查
- 检查
alembic/versions/下生成的迁移文件。 - 确保它包含预期的操作(例如
create_table、create_view、create_materialized_view)。 - 确保它不包含意外的删除或更改。
步骤 6. 预览 SQL 并应用
预览 SQL
alembic upgrade head --sql
暂停并审查
- 确认 DDL 的顺序符合您的预期。
- 识别任何潜在的繁重操作,并在需要时考虑拆分迁移。
应用
alembic upgrade head
StarRocks DDL 在多个语句之间不是事务性的。如果升级中途失败,您可能需要在重新运行前检查已应用的更改并执行手动修复。
步骤 7. 进行更改并再次自动生成
更新 myapp/models.py 以
- 修改现有表(
my_core_table):添加列,或更新表注释,并更改一个表属性。 - 添加新表(
my_new_table)。
添加列可能是一个耗时的模式更改。StarRocks 每次只允许一个正在运行的表模式更改作业。在实践中,建议将“添加/删除/修改列”的更改与其它繁重更改(例如,额外的添加/删除列或批量属性更改)分开,并在需要时将它们拆分到多个 Alembic 修订版中。
from sqlalchemy import Column, Table
from starrocks import INTEGER, VARCHAR
# Modify an existing table (add a column)
# (Update the existing my_core_table definition in-place.)
my_core_table = Table(
"my_core_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
Column("age", INTEGER), # added column only
starrocks_primary_key='id',
starrocks_distributed_by='HASH(id) BUCKETS 10',
starrocks_properties={"replication_num": "1"},
)
my_new_table = Table(
"my_new_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
starrocks_primary_key="id",
starrocks_distributed_by="HASH(id) BUCKETS 10",
starrocks_properties={"replication_num": "1"},
)
alembic revision --autogenerate -m "add a new table, change a old table"
暂停并审查
确保新迁移包含
my_new_table的create_table(...),以及my_core_table更改的预期操作(例如,添加列/设置注释/设置属性)。
预览 SQL 并应用
alembic upgrade head --sql
alembic upgrade head
使用 sqlacodegen
sqlacodegen 可以直接从 StarRocks 反向生成 SQLAlchemy 模型
sqlacodegen --options include_dialect_options,keep_dialect_types \
--generator tables \
starrocks://<user>:<password>@<FE_host>:<query_port>/[catalog.]<database> > models.py
支持的对象
- 表
- 视图
- 物化视图
- 分区、分布和排序子句以及属性
这在将现有 StarRocks 模式导入 Alembic 时非常有用。
您可以直接使用以上命令来生成 端到端示例 部分中定义的表/视图/物化视图的 Python 脚本。
- 建议在生成核心样式模型时添加
--generator tables(ORM 生成器可能会根据NOT NULL/NULL属性重新排序列)。 - 关键列可能会生成为
NOT NULL。如果希望它们可为空,请手动调整生成的模型。
限制和最佳实践
- 某些 StarRocks DDL 操作需要删除并重新创建表;自动生成将发出警告或抛出错误,而不是静默地产生不可用的 SQL。
- 键模型更改(例如,将 DUPLICATE KEY 更改为 PRIMARY KEY)不支持通过
ALTER TABLE;使用明确的计划(通常是删除并使用回填重新创建)。 - StarRocks 不提供跨多个语句的事务性 DDL;请审查生成的迁移并操作性地应用它们。如果迁移中途失败,您可能需要手动处理回滚。
- 对于分布,如果您省略
BUCKETS子句,StarRocks 可能会自动分配桶计数;在这种情况下,该方言旨在避免产生不必要的差异。
总结
使用 StarRocks SQLAlchemy 方言和 Alembic 集成,您可以
- ✔ 使用声明式模型定义 StarRocks 模式
- ✔ 自动检测和生成模式迁移脚本
- ✔ 对模式演进使用版本控制
- ✔ 声明式地管理视图和物化视图
- ✔ 使用 sqlacodegen 反向工程现有模式
这使 StarRocks 模式管理进入现代 Python 数据工程生态系统,并大大简化了跨环境的模式一致性。
参考资料
[1]: starrocks-python-client README
[2]: Alembic 集成
[3]: SQLAlchemy 详情
[4]: 表支持
[5]: 视图支持
[6]: 物化视图支持