Java UDF
从 v2.2.0 开始,您可以使用 Java 编程语言编译用户定义的函数 (UDF) 以满足您的特定业务需求。
从 v3.0 开始,StarRocks 支持全局 UDF,您只需要在相关的 SQL 语句 (CREATE/SHOW/DROP) 中包含 GLOBAL
关键字。
本主题介绍如何开发和使用各种 UDF。
目前,StarRocks 支持标量 UDF、用户定义聚合函数 (UDAF)、用户定义窗口函数 (UDWF) 和用户定义表函数 (UDTF)。
前提条件
-
您已安装 Apache Maven,因此您可以创建和编译 Java 项目。
-
您的服务器上已安装 JDK 1.8。
-
Java UDF 功能已启用。 您可以在 FE 配置文件 fe/conf/fe.conf 中将 FE 配置项
enable_udf
设置为true
以启用此功能,然后重启 FE 节点以使设置生效。 有关更多信息,请参见参数配置。
开发和使用 UDF
您需要创建一个 Maven 项目并使用 Java 编程语言编译您需要的 UDF。
步骤 1:创建一个 Maven 项目
创建一个 Maven 项目,其基本目录结构如下
project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--target
步骤 2:添加依赖
将以下依赖项添加到 pom.xml 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
步骤 3:编译 UDF
使用 Java 编程语言来编译 UDF。
编译标量 UDF
标量 UDF 对单行数据进行操作并返回单个值。 当您在查询中使用标量 UDF 时,每一行对应于结果集中的单个值。 典型的标量函数包括 UPPER
、LOWER
、ROUND
和 ABS
。
假设您的 JSON 数据中字段的值是 JSON 字符串而不是 JSON 对象。 当您使用 SQL 语句提取 JSON 字符串时,您需要运行两次 GET_JSON_STRING
,例如,GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")
。
为了简化 SQL 语句,您可以编译一个可以直接提取 JSON 字符串的标量 UDF,例如,MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")
。
package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;
public class UDFJsonGet {
public final String evaluate(String obj, String key) {
if (obj == null || key == null) return null;
try {
// The JSONPath library can be fully expanded even if the values of a field are JSON strings.
return JSONPath.read(obj, key).toString();
} catch (Exception e) {
return null;
}
}
}
用户定义的类必须实现下表中描述的方法。
注意
方法中的请求参数和返回参数的数据类型必须与步骤 6中要执行的
CREATE FUNCTION
语句中声明的数据类型相同,并且符合本主题的“SQL 数据类型和 Java 数据类型之间的映射”部分中提供的映射。
方法 | 描述 |
---|---|
TYPE1 evaluate(TYPE2, ...) | 运行 UDF。 evaluate() 方法需要 public 成员访问级别。 |
编译 UDAF
UDAF 对多行数据进行操作并返回单个值。 典型的聚合函数包括 SUM
、COUNT
、MAX
和 MIN
,它们聚合每个 GROUP BY 子句中指定的多个数据行并返回单个值。
假设您要编译一个名为 MY_SUM_INT
的 UDAF。 与内置聚合函数 SUM
(返回 BIGINT 类型的值)不同,MY_SUM_INT
函数仅支持 INT 数据类型的请求参数和返回参数。
package com.starrocks.udf.sample;
public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
}
用户定义的类必须实现下表中描述的方法。
注意
方法中的请求参数和返回参数的数据类型必须与步骤 6中要执行的
CREATE FUNCTION
语句中声明的数据类型相同,并且符合本主题的“SQL 数据类型和 Java 数据类型之间的映射”部分中提供的映射。
方法 | 描述 |
---|---|
State create() | 创建一个状态。 |
void destroy(State) | 销毁一个状态。 |
void update(State, ...) | 更新一个状态。 除了第一个参数 State 之外,您还可以在 UDF 声明中指定一个或多个请求参数。 |
void serialize(State, ByteBuffer) | 将状态序列化到字节缓冲区中。 |
void merge(State, ByteBuffer) | 从字节缓冲区反序列化一个状态,并将该字节缓冲区作为第一个参数合并到状态中。 |
TYPE finalize(State) | 从一个状态中获取 UDF 的最终结果。 |
在编译期间,您还必须使用缓冲区类 java.nio.ByteBuffer
和局部变量 serializeLength
,如下表所述。
类和局部变量 | 描述 |
---|---|
java.nio.ByteBuffer() | 缓冲区类,用于存储中间结果。 当中间结果在节点之间传输以供执行时,可能会被序列化或反序列化。 因此,您还必须使用 serializeLength 变量来指定允许反序列化中间结果的长度。 |
serializeLength() | 允许反序列化中间结果的长度。 单位:字节。 将此局部变量设置为 INT 类型的值。 例如,State { int counter = 0; public int serializeLength() { return 4; }} 指定中间结果为 INT 数据类型,反序列化长度为 4 字节。 您可以根据您的业务需求调整这些设置。 例如,如果要将中间结果的数据类型指定为 LONG,并将反序列化的长度指定为 8 个字节,则传递 State { long counter = 0; public int serializeLength() { return 8; }} 。 |
请注意以下几点,以便反序列化存储在 java.nio.ByteBuffer
类中的中间结果
- 不能调用依赖于
ByteBuffer
类的 remaining() 方法来反序列化状态。 - 不能在
ByteBuffer
类上调用 clear() 方法。 serializeLength
的值必须与写入数据的长度相同。 否则,在序列化和反序列化期间会生成不正确的结果。
编译 UDWF
与常规聚合函数不同,UDWF 对多行数据(统称为窗口)进行操作,并为每一行返回一个值。 典型的窗口函数包括一个 OVER
子句,该子句将行划分为多个集合。 它对每组行执行计算,并为每一行返回一个值。
假设您要编译一个名为 MY_WINDOW_SUM_INT
的 UDWF。 与内置聚合函数 SUM
(返回 BIGINT 类型的值)不同,MY_WINDOW_SUM_INT
函数仅支持 INT 数据类型的请求参数和返回参数。
package com.starrocks.udf.sample;
public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
public void reset(State state) {
state.counter = 0;
}
public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}
用户定义的类必须实现 UDAF 所需的方法(因为 UDWF 是一种特殊的聚合函数)和下表中描述的 windowUpdate() 方法。
注意
方法中的请求参数和返回参数的数据类型必须与步骤 6中要执行的
CREATE FUNCTION
语句中声明的数据类型相同,并且符合本主题的“SQL 数据类型和 Java 数据类型之间的映射”部分中提供的映射。
方法 | 描述 |
---|---|
void windowUpdate(State state, int, int, int , int, ...) | 更新窗口的数据。 有关 UDWF 的更多信息,请参阅窗口函数。 每次将一行作为输入输入时,此方法都会获取窗口信息并相应地更新中间结果。
|
编译 UDTF
UDTF 读取一行数据并返回多个可以视为表的值。 表值函数通常用于将行转换为列。
注意
StarRocks 允许 UDTF 返回一个由多行和一列组成的表。
假设您要编译一个名为 MY_UDF_SPLIT
的 UDTF。 MY_UDF_SPLIT
函数允许您使用空格作为分隔符,并支持 STRING 数据类型的请求参数和返回参数。
package com.starrocks.udf.sample;
public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}
用户定义的类定义的方法必须满足以下要求
注意
方法中的请求参数和返回参数的数据类型必须与步骤 6中要执行的
CREATE FUNCTION
语句中声明的数据类型相同,并且符合本主题的“SQL 数据类型和 Java 数据类型之间的映射”部分中提供的映射。
方法 | 描述 |
---|---|
TYPE[] process() | 运行 UDTF 并返回一个数组。 |
步骤 4:打包 Java 项目
运行以下命令以打包 Java 项目
mvn package
以下 JAR 文件在 target 文件夹中生成:udf-1.0-SNAPSHOT.jar 和 udf-1.0-SNAPSHOT-jar-with-dependencies.jar。
步骤 5:上传 Java 项目
将 JAR 文件 udf-1.0-SNAPSHOT-jar-with-dependencies.jar 上传到持续运行并且 StarRocks 集群中所有 FE 和 BE 都可以访问的 HTTP 服务器。 然后,运行以下命令以部署该文件
mvn deploy
您可以使用 Python 设置一个简单的 HTTP 服务器,并将 JAR 文件上传到该 HTTP 服务器。
注意
在步骤 6中,FE 将检查包含 UDF 代码的 JAR 文件并计算校验和,BE 将下载并执行 JAR 文件。
步骤 6:在 StarRocks 中创建 UDF
StarRocks 允许您在两种类型的命名空间中创建 UDF:数据库命名空间和全局命名空间。
- 如果您对 UDF 没有可见性或隔离要求,您可以将其创建为全局 UDF。 然后,您可以通过使用函数名称来引用全局 UDF,而无需在函数名称中包含目录和数据库名称作为前缀。
- 如果您对 UDF 有可见性或隔离要求,或者如果您需要在不同的数据库中创建相同的 UDF,则可以在每个单独的数据库中创建它。 这样,如果您的会话连接到目标数据库,则可以使用函数名称来引用 UDF。 如果您的会话连接到目标数据库以外的其他目录或数据库,则需要通过在函数名称中包含目录和数据库名称作为前缀来引用 UDF,例如,
catalog.database.function
。
注意
在创建和使用全局 UDF 之前,您必须联系系统管理员授予您所需的权限。 有关更多信息,请参见GRANT。
上传 JAR 包后,您可以在 StarRocks 中创建 UDF。 对于全局 UDF,您必须在创建语句中包含 GLOBAL
关键字。
语法
CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name
(arg_type [, ...])
RETURNS return_type
PROPERTIES ("key" = "value" [, ...])
参数
参数 | 必需 | 描述 |
---|---|---|
GLOBAL | 否 | 是否创建全局 UDF,从 v3.0 开始支持。 |
AGGREGATE | 否 | 是否创建 UDAF 或 UDWF。 |
TABLE | 否 | 是否创建 UDTF。 如果未指定 AGGREGATE 和 TABLE ,则会创建一个标量函数。 |
function_name | 是 | 您要创建的函数的名称。 您可以在此参数中包含数据库的名称,例如db1.my_func 。 如果 function_name 包含数据库名称,则 UDF 在该数据库中创建。 否则,UDF 在当前数据库中创建。 新函数的名称及其参数不能与目标数据库中已存在的名称相同。 否则,无法创建该函数。 如果函数名称相同但参数不同,则创建成功。 |
arg_type | 是 | 函数的参数类型。 添加的参数可以用 , ... 表示。 对于支持的数据类型,请参见SQL 数据类型和 Java 数据类型之间的映射。 |
return_type | 是 | 函数的返回类型。 对于支持的数据类型,请参见Java UDF。 |
PROPERTIES | 是 | 函数的属性,这些属性因要创建的 UDF 的类型而异。 |
创建标量 UDF
运行以下命令以创建您在前面的示例中编译的标量 UDF
CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
参数 | 描述 |
---|---|
symbol | UDF 所属的 Maven 项目的类的名称。 此参数的值采用 <package_name>.<class_name> 格式。 |
type | UDF 的类型。 将该值设置为 StarrocksJar ,该值指定 UDF 是基于 Java 的函数。 |
file | 您可以从中下载包含 UDF 代码的 JAR 文件的 HTTP URL。 此参数的值采用 http://<http_server_ip>:<http_server_port>/<jar_package_name> 格式。 |
isolation | (可选)要在 UDF 执行中共享函数实例并支持静态变量,请将其设置为“shared”。 |
创建 UDAF
运行以下命令以创建您在前面的示例中编译的 UDAF
CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
PROPERTIES 中参数的描述与创建标量 UDF中的描述相同。
创建 UDWF
运行以下命令以创建您在前面的示例中编译的 UDWF
CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
properties
(
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
analytic
:UDF 是否为窗口函数。 将该值设置为 true
。 其他属性的描述与创建标量 UDF中的描述相同。
创建 UDTF
运行以下命令以创建您在前面的示例中编译的 UDTF
CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
properties
(
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
PROPERTIES 中参数的描述与创建标量 UDF中的描述相同。
步骤 7:使用 UDF
创建 UDF 后,您可以根据您的业务需求对其进行测试和使用。
使用标量 UDF
运行以下命令以使用您在前面的示例中创建的标量 UDF
SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');
使用 UDAF
运行以下命令以使用您在前面的示例中创建的 UDAF
SELECT MY_SUM_INT(col1);
使用 UDWF
运行以下命令以使用您在前面的示例中创建的 UDWF
SELECT MY_WINDOW_SUM_INT(intcol)
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;
使用 UDTF
运行以下命令以使用您在前面的示例中创建的 UDTF
-- Suppose that you have a table named t1, and the information about its columns a, b, and c1 is as follows:
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."
-- Run the MY_UDF_SPLIT() function.
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."
注意
- 前面的代码段中的第一个
MY_UDF_SPLIT
是第二个MY_UDF_SPLIT
(一个函数)返回的列的别名。- 您不能使用
AS t2(f1)
来指定要返回的表及其列的别名。
查看 UDF
运行以下命令以查询 UDF
SHOW [GLOBAL] FUNCTIONS;
有关更多信息,请参见SHOW FUNCTIONS。
删除 UDF
运行以下命令以删除 UDF
DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);
有关更多信息,请参见DROP FUNCTION。
SQL 数据类型和 Java 数据类型之间的映射
注意
目前,Scalar UDF 仅支持非嵌套的 ARRAY 和 MAP 参数/返回类型。
SQL TYPE | Java TYPE |
---|---|
BOOLEAN | java.lang.Boolean |
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
STRING/VARCHAR | java.lang.String |
ARRAY | java.util.List |
Map | java.util.Map |
参数设置
在 StarRocks 集群中每个 Java 虚拟机 (JVM) 的 be/conf/be.conf 文件中配置以下环境变量,以控制内存使用情况。
JAVA_OPTS="-Xmx12G"
常见问题
创建 UDF 时是否可以使用静态变量? 不同 UDF 的静态变量是否会相互影响?
是的,您可以在编译 UDF 时使用静态变量。 不同 UDF 的静态变量是相互隔离的,即使 UDF 具有名称相同的类,也不会相互影响。