使用物化视图加速数据湖查询
本主题介绍如何使用 StarRocks 的异步物化视图来优化数据湖中的查询性能。
StarRocks 提供开箱即用的数据湖查询功能,对于湖中数据的探索性查询和分析非常有效。在大多数情况下,数据缓存可以提供块级文件缓存,避免远程存储抖动和大量 I/O 操作导致的性能下降。
但是,当需要使用湖中的数据构建复杂高效的报告或进一步加速这些查询时,您仍然可能会遇到性能挑战。借助异步物化视图,您可以为湖上的报告和数据应用程序实现更高的并发性和更好的查询性能。
概述
StarRocks 支持基于外部目录(如 Hive catalog、Iceberg catalog、Hudi catalog、JDBC catalog 和 Paimon catalog)构建异步物化视图。基于外部目录的物化视图在以下场景中特别有用
-
透明加速数据湖报告
为了确保数据湖报告的查询性能,数据工程师通常需要与数据分析师密切合作,以深入了解报告加速层的构建逻辑。如果加速层需要进一步更新,他们必须相应地更新构建逻辑、处理计划和查询语句。
通过物化视图的查询改写能力,可以使报告加速对用户透明且不易察觉。当识别到慢查询时,数据工程师可以分析慢查询的模式并按需创建物化视图。然后,应用程序端的查询会被物化视图智能改写和透明加速,从而无需修改业务应用程序或查询语句的逻辑即可快速提高查询性能。
-
与历史数据关联的实时数据的增量计算
假设您的业务应用程序需要关联 StarRocks 本地表中的实时数据和数据湖中的历史数据以进行增量计算。在这种情况下,物化视图可以提供一个简单的解决方案。例如,如果实时事实表是 StarRocks 中的本地表,而维度表存储在数据湖中,您可以通过构建将本地表与外部数据源中的表关联的物化视图来轻松执行增量计算。
-
快速构建指标层
在处理数据的高维度时,计算和处理指标可能会遇到挑战。您可以使用物化视图(允许您执行数据预聚合和汇总)来创建相对轻量级的指标层。此外,物化视图可以自动刷新,从而进一步降低了指标计算的复杂性。
StarRocks 中的物化视图、数据缓存和本地表都是实现查询性能显着提升的有效方法。下表比较了它们的主要区别
数据缓存 | 物化视图 | 本地表 | |
---|---|---|---|
数据加载和更新 | 查询自动触发数据缓存。 | 刷新任务自动触发。 | 支持多种导入方法,但需要手动维护导入任务 |
数据缓存粒度 |
| 存储预计算的查询结果 | 基于表模式存储数据 |
查询性能 | 数据缓存 ≤ 物化视图 = 本地表 | ||
查询语句 |
|
| 需要修改查询语句以查询本地表 |
与直接查询湖数据或将数据加载到本地表相比,物化视图具有几个独特的优势
- 本地存储加速:物化视图可以利用 StarRocks 的本地存储加速优势,例如索引、分区、分桶和 Colocate Group,从而与直接从数据湖查询数据相比,可以获得更好的查询性能。
- 加载任务零维护:物化视图通过自动刷新任务透明地更新数据。无需维护加载任务来执行计划的数据更新。此外,基于 Hive、Iceberg 和 Paimon catalog 的物化视图可以检测数据更改并在分区级别执行增量刷新。
- 智能查询改写:查询可以透明地改写为使用物化视图。您可以立即从加速中受益,而无需修改应用程序使用的查询语句。
因此,我们建议在以下情况下使用物化视图
- 即使启用了数据缓存,查询性能也无法满足您对查询延迟和并发性的要求。
- 查询涉及可重用组件,例如固定的聚合函数或 Join 模式。
- 数据按分区组织,而查询涉及相对较高的级别上的聚合(例如,按天聚合)。
在以下情况下,我们建议优先通过数据缓存进行加速
- 查询没有很多可重用组件,并且可能会扫描数据湖中的任何数据。
- 远程存储具有显着波动或不稳定,这可能会影响访问。
创建基于外部 Catalog 的物化视图
在外部 Catalog 中的表上创建物化视图类似于在 StarRocks 本地表上创建物化视图。您只需要根据您正在使用的数据源设置合适的刷新策略,并手动启用基于外部 Catalog 的物化视图的查询改写。
选择合适的刷新策略
目前,StarRocks 无法检测 Hudi Catalog 中的分区级数据更改。因此,一旦触发任务,就会执行全量刷新。
对于 Hive Catalog、Iceberg Catalog(从 v3.1.4 开始)、JDBC catalog(从 v3.1.4 开始,仅适用于 MySQL 范围分区表)和 Paimon Catalog(从 v3.2.1 开始),StarRocks 支持检测分区级别的数据更改。因此,StarRocks 可以
-
仅刷新具有数据更改的分区,以避免全量刷新,从而减少刷新引起的资源消耗。
-
在查询改写期间,在一定程度上确保数据一致性。如果数据湖中的基表发生数据更改,则查询不会被改写为使用物化视图。
您仍然可以通过在创建物化视图时设置属性 mv_rewrite_staleness_second
来选择容忍一定程度的数据不一致。有关更多信息,请参见CREATE MATERIALIZED VIEW。
请注意,如果您需要按分区刷新,则物化视图的分区键必须包含在基表的分区键中。
从 v3.2.3 开始,StarRocks 支持在具有 分区转换的 Iceberg 表上创建分区物化视图,并且物化视图按转换后的列进行分区。目前,仅支持具有 identity
、year
、month
、day
或 hour
转换的 Iceberg 表。
以下示例显示了具有 day
分区转换的 Iceberg 表的定义,并在其上创建了具有对齐分区的物化视图
-- Iceberg table definition.
CREATE TABLE spark_catalog.test.iceberg_sample_datetime_day (
id BIGINT,
data STRING,
category STRING,
ts TIMESTAMP)
USING iceberg
PARTITIONED BY (days(ts))
-- Create a materialized view upon the Iceberg table.
CREATE MATERIALIZED VIEW `test_iceberg_datetime_day_mv` (`id`, `data`, `category`, `ts`)
PARTITION BY (`ts`)
DISTRIBUTED BY HASH(`id`)
REFRESH MANUAL
AS
SELECT
`iceberg_sample_datetime_day`.`id`,
`iceberg_sample_datetime_day`.`data`,
`iceberg_sample_datetime_day`.`category`,
`iceberg_sample_datetime_day`.`ts`
FROM `iceberg`.`test`.`iceberg_sample_datetime_day`;
对于 Hive Catalog,您可以启用 Hive 元数据缓存刷新功能,以允许 StarRocks 检测分区级别的数据更改。启用此功能后,StarRocks 会定期访问 Hive Metastore Service (HMS) 或 AWS Glue,以检查最近查询的热数据的元数据信息。
要启用 Hive 元数据缓存刷新功能,您可以使用 ADMIN SET FRONTEND CONFIG 设置以下 FE 动态配置项
配置项
enable_background_refresh_connector_metadata
默认值:v3.0 中为 true,v2.5 中为 false
说明:是否启用定期 Hive 元数据缓存刷新。启用后,StarRocks 会轮询 Hive 集群的 metastore(Hive Metastore 或 AWS Glue),并刷新频繁访问的 Hive Catalog 的缓存元数据以感知数据更改。True 表示启用 Hive 元数据缓存刷新,false 表示禁用它。
background_refresh_metadata_interval_millis
默认值:600000(10 分钟)
说明:两次连续 Hive 元数据缓存刷新之间的间隔。单位:毫秒。
background_refresh_metadata_time_secs_since_last_access_secs
默认值:86400(24 小时)
说明:Hive 元数据缓存刷新任务的到期时间。对于已访问的 Hive Catalog,如果超过指定时间未访问,StarRocks 将停止刷新其缓存的元数据。对于尚未访问的 Hive Catalog,StarRocks 将不会刷新其缓存的元数据。单位:秒。
从 v3.1.4 开始,StarRocks 支持检测 Iceberg Catalog 的分区级别的数据更改。目前,仅支持 Iceberg V1 表。
为基于外部 Catalog 的物化视图启用查询改写
默认情况下,StarRocks 不支持为基于 Hudi 和 JDBC Catalog 构建的物化视图进行查询改写,因为在这种情况下查询改写无法确保结果的强一致性。您可以通过在创建物化视图时将属性 force_external_table_query_rewrite
设置为 true
来启用此功能。对于基于 Hive Catalog 中的表构建的物化视图,默认情况下会启用查询改写。
示例
CREATE MATERIALIZED VIEW ex_mv_par_tbl
PARTITION BY emp_date
DISTRIBUTED BY hash(empid)
PROPERTIES (
"force_external_table_query_rewrite" = "true"
)
AS
select empid, deptno, emp_date
from `hive_catalog`.`emp_db`.`emps_par_tbl`
where empid < 5;
在涉及查询改写的场景中,如果您使用非常复杂的查询语句来构建物化视图,我们建议您拆分查询语句并以嵌套方式构建多个简单的物化视图。嵌套物化视图更通用,可以容纳更广泛的查询模式。
最佳实践
在实际业务场景中,您可以通过分析审计日志或大查询日志来识别具有高执行延迟和资源消耗的查询。您可以进一步使用 查询 Profile 来查明查询缓慢的特定阶段。以下部分提供了有关如何使用物化视图提高数据湖查询性能的说明和示例。
案例一:加速数据湖中的 Join 计算
您可以使用物化视图来加速数据湖中的 Join 查询。
假设您的 Hive Catalog 上的以下查询特别慢
--Q1
SELECT SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE
lo_orderdate = d_datekey
AND d_year = 1993
AND lo_discount BETWEEN 1 AND 3
AND lo_quantity < 25;
--Q2
SELECT SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE
lo_orderdate = d_datekey
AND d_yearmonth = 'Jan1994'
AND lo_discount BETWEEN 4 AND 6
AND lo_quantity BETWEEN 26 AND 35;
--Q3
SELECT SUM(lo_revenue), d_year, p_brand
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates, hive.ssb_1g_csv.part, hive.ssb_1g_csv.supplier
WHERE
lo_orderdate = d_datekey
AND lo_partkey = p_partkey
AND lo_suppkey = s_suppkey
AND p_brand BETWEEN 'MFGR#2221' AND 'MFGR#2228'
AND s_region = 'ASIA'
GROUP BY d_year, p_brand
ORDER BY d_year, p_brand;
通过分析它们的查询 Profile,您可能会注意到查询执行时间主要花费在表 lineorder
和其他维度表在列 lo_orderdate
之间的哈希 Join 上。
在这里,Q1 和 Q2 在 Join lineorder
和 dates
之后执行聚合,而 Q3 在 Join lineorder
、dates
、part
和 supplier
之后执行聚合。
因此,您可以利用 StarRocks 的 View Delta Join 改写功能来构建一个 Join lineorder
、dates
、part
和 supplier
的物化视图。
CREATE MATERIALIZED VIEW lineorder_flat_mv
DISTRIBUTED BY HASH(LO_ORDERDATE, LO_ORDERKEY) BUCKETS 48
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
-- Specify the unique constraints.
"unique_constraints" = "
hive.ssb_1g_csv.supplier.s_suppkey;
hive.ssb_1g_csv.part.p_partkey;
hive.ssb_1g_csv.dates.d_datekey",
-- Specify the Foreign Keys.
"foreign_key_constraints" = "
hive.ssb_1g_csv.lineorder(lo_partkey) REFERENCES hive.ssb_1g_csv.part(p_partkey);
hive.ssb_1g_csv.lineorder(lo_suppkey) REFERENCES hive.ssb_1g_csv.supplier(s_suppkey);
hive.ssb_1g_csv.lineorder(lo_orderdate) REFERENCES hive.ssb_1g_csv.dates(d_datekey)",
-- Enable query rewrite for the external catalog-based materialized view.
"force_external_table_query_rewrite" = "TRUE"
)
AS SELECT
l.LO_ORDERDATE AS LO_ORDERDATE,
l.LO_ORDERKEY AS LO_ORDERKEY,
l.LO_PARTKEY AS LO_PARTKEY,
l.LO_SUPPKEY AS LO_SUPPKEY,
l.LO_QUANTITY AS LO_QUANTITY,
l.LO_EXTENDEDPRICE AS LO_EXTENDEDPRICE,
l.LO_DISCOUNT AS LO_DISCOUNT,
l.LO_REVENUE AS LO_REVENUE,
s.S_REGION AS S_REGION,
p.P_BRAND AS P_BRAND,
d.D_YEAR AS D_YEAR,
d.D_YEARMONTH AS D_YEARMONTH
FROM hive.ssb_1g_csv.lineorder AS l
INNER JOIN hive.ssb_1g_csv.supplier AS s ON s.S_SUPPKEY = l.LO_SUPPKEY
INNER JOIN hive.ssb_1g_csv.part AS p ON p.P_PARTKEY = l.LO_PARTKEY
INNER JOIN hive.ssb_1g_csv.dates AS d ON l.LO_ORDERDATE = d.D_DATEKEY;
案例二:加速数据湖中的聚合和 Join 上的聚合
物化视图可用于加速聚合查询,无论它们是针对单个表还是涉及多个表。
-
单表聚合查询
对于单个表上的典型查询,它们的查询 Profile 将显示 AGGREGATE 节点消耗大量时间。您可以使用通用聚合运算符来构建物化视图。
假设以下是一个慢查询
--Q4
SELECT
lo_orderdate, count(distinct lo_orderkey)
FROM hive.ssb_1g_csv.lineorder
GROUP BY lo_orderdate
ORDER BY lo_orderdate limit 100;Q4 计算每日唯一订单数。由于 Count Distinct 计算可能在计算上很昂贵,您可以创建以下两种类型的物化视图来加速
CREATE MATERIALIZED VIEW mv_2_1
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT
lo_orderdate, count(distinct lo_orderkey)
FROM hive.ssb_1g_csv.lineorder
GROUP BY lo_orderdate;
CREATE MATERIALIZED VIEW mv_2_2
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT
-- lo_orderkey must be the BIGINT type so that it can be used for query rewrite.
lo_orderdate, bitmap_union(to_bitmap(lo_orderkey))
FROM hive.ssb_1g_csv.lineorder
GROUP BY lo_orderdate;请注意,在这种情况下,不要创建带有 LIMIT 和 ORDER BY 子句的物化视图,以避免改写失败。有关查询改写限制的更多信息,请参见 使用物化视图进行查询改写 - 限制。
-
多表聚合查询
在涉及 Join 结果的聚合的场景中,您可以在现有 Join 表的物化视图上创建嵌套物化视图,以进一步聚合 Join 结果。例如,基于案例一中的示例,您可以创建以下物化视图来加速 Q1 和 Q2,因为它们的聚合模式相似
CREATE MATERIALIZED VIEW mv_2_3
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT
lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth, SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM lineorder_flat_mv
GROUP BY lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth;当然,可以在单个物化视图中执行 Join 和聚合计算。虽然这些类型的物化视图可能具有更少的查询改写机会(由于它们的特定计算),但它们在聚合后通常占用更少的存储空间。您的选择可以基于您的特定用例。
CREATE MATERIALIZED VIEW mv_2_4
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth, SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE lo_orderdate = d_datekey
GROUP BY lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth;
案例三:加速数据湖中聚合上的 Join
在某些情况下,可能需要首先对一个表执行聚合计算,然后执行与其他表的 Join 查询。为了充分利用 StarRocks 的查询改写功能,我们建议构建嵌套物化视图。例如
--Q5
SELECT * FROM (
SELECT
l.lo_orderkey, l.lo_orderdate, c.c_custkey, c_region, sum(l.lo_revenue)
FROM
hive.ssb_1g_csv.lineorder l
INNER JOIN (
SELECT distinct c_custkey, c_region
from
hive.ssb_1g_csv.customer
WHERE
c_region IN ('ASIA', 'AMERICA')
) c ON l.lo_custkey = c.c_custkey
GROUP BY l.lo_orderkey, l.lo_orderdate, c.c_custkey, c_region
) c1
WHERE
lo_orderdate = '19970503';
Q5 首先对 customer
表执行聚合查询,然后执行与 lineorder
表的 Join 和聚合。类似的查询可能涉及 c_region
和 lo_orderdate
上的不同过滤器。为了利用查询改写功能,您可以创建两个物化视图,一个用于聚合,另一个用于 Join。
--mv_3_1
CREATE MATERIALIZED VIEW mv_3_1
DISTRIBUTED BY HASH(c_custkey)
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT distinct c_custkey, c_region from hive.ssb_1g_csv.customer;
--mv_3_2
CREATE MATERIALIZED VIEW mv_3_2
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT l.lo_orderdate, l.lo_orderkey, mv.c_custkey, mv.c_region, sum(l.lo_revenue)
FROM hive.ssb_1g_csv.lineorder l
INNER JOIN mv_3_1 mv
ON l.lo_custkey = mv.c_custkey
GROUP BY l.lo_orderkey, l.lo_orderdate, mv.c_custkey, mv.c_region;
案例四:为数据湖中的实时数据和历史数据分离冷热数据
考虑以下场景:过去三天内更新的数据直接写入 StarRocks,而不太新的数据会被检查并批量写入 Hive。但是,仍然有一些查询可能涉及过去七天的数据。在这种情况下,您可以使用物化视图创建一个简单的模型来自动使数据过期。
CREATE MATERIALIZED VIEW mv_4_1
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT lo_orderkey, lo_orderdate, lo_revenue
FROM hive.ssb_1g_csv.lineorder
WHERE lo_orderdate<=current_date()
AND lo_orderdate>=date_add(current_date(), INTERVAL -4 DAY);
您可以基于上层应用程序的逻辑进一步构建视图或物化视图。