跳到主要内容
版本: 最新版本-3.5

Java UDF

从 v2.2.0 开始,您可以使用 Java 编程语言编译用户定义的函数 (UDF) 以满足您的特定业务需求。

从 v3.0 开始,StarRocks 支持全局 UDF,您只需要在相关的 SQL 语句 (CREATE/SHOW/DROP) 中包含 GLOBAL 关键字。

本主题介绍如何开发和使用各种 UDF。

目前,StarRocks 支持标量 UDF、用户定义聚合函数 (UDAF)、用户定义窗口函数 (UDWF) 和用户定义表函数 (UDTF)。

前提条件

  • 您已安装 Apache Maven,因此您可以创建和编译 Java 项目。

  • 您的服务器上已安装 JDK 1.8。

  • Java UDF 功能已启用。 您可以在 FE 配置文件 fe/conf/fe.conf 中将 FE 配置项 enable_udf 设置为 true 以启用此功能,然后重启 FE 节点以使设置生效。 有关更多信息,请参见参数配置

开发和使用 UDF

您需要创建一个 Maven 项目并使用 Java 编程语言编译您需要的 UDF。

步骤 1:创建一个 Maven 项目

创建一个 Maven 项目,其基本目录结构如下

project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--target

步骤 2:添加依赖

将以下依赖项添加到 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

步骤 3:编译 UDF

使用 Java 编程语言来编译 UDF。

编译标量 UDF

标量 UDF 对单行数据进行操作并返回单个值。 当您在查询中使用标量 UDF 时,每一行对应于结果集中的单个值。 典型的标量函数包括 UPPERLOWERROUNDABS

假设您的 JSON 数据中字段的值是 JSON 字符串而不是 JSON 对象。 当您使用 SQL 语句提取 JSON 字符串时,您需要运行两次 GET_JSON_STRING,例如,GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")

为了简化 SQL 语句,您可以编译一个可以直接提取 JSON 字符串的标量 UDF,例如,MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")

package com.starrocks.udf.sample;

import com.alibaba.fastjson.JSONPath;

public class UDFJsonGet {
public final String evaluate(String obj, String key) {
if (obj == null || key == null) return null;
try {
// The JSONPath library can be fully expanded even if the values of a field are JSON strings.
return JSONPath.read(obj, key).toString();
} catch (Exception e) {
return null;
}
}
}

用户定义的类必须实现下表中描述的方法。

注意

方法中的请求参数和返回参数的数据类型必须与步骤 6中要执行的 CREATE FUNCTION 语句中声明的数据类型相同,并且符合本主题的“SQL 数据类型和 Java 数据类型之间的映射”部分中提供的映射。

方法描述
TYPE1 evaluate(TYPE2, ...)运行 UDF。 evaluate() 方法需要 public 成员访问级别。

编译 UDAF

UDAF 对多行数据进行操作并返回单个值。 典型的聚合函数包括 SUMCOUNTMAXMIN,它们聚合每个 GROUP BY 子句中指定的多个数据行并返回单个值。

假设您要编译一个名为 MY_SUM_INT 的 UDAF。 与内置聚合函数 SUM(返回 BIGINT 类型的值)不同,MY_SUM_INT 函数仅支持 INT 数据类型的请求参数和返回参数。

package com.starrocks.udf.sample;

public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}

public State create() {
return new State();
}

public void destroy(State state) {
}

public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}
}

用户定义的类必须实现下表中描述的方法。

注意

方法中的请求参数和返回参数的数据类型必须与步骤 6中要执行的 CREATE FUNCTION 语句中声明的数据类型相同,并且符合本主题的“SQL 数据类型和 Java 数据类型之间的映射”部分中提供的映射。

方法描述
State create()创建一个状态。
void destroy(State)销毁一个状态。
void update(State, ...)更新一个状态。 除了第一个参数 State 之外,您还可以在 UDF 声明中指定一个或多个请求参数。
void serialize(State, ByteBuffer)将状态序列化到字节缓冲区中。
void merge(State, ByteBuffer)从字节缓冲区反序列化一个状态,并将该字节缓冲区作为第一个参数合并到状态中。
TYPE finalize(State)从一个状态中获取 UDF 的最终结果。

在编译期间,您还必须使用缓冲区类 java.nio.ByteBuffer 和局部变量 serializeLength,如下表所述。

类和局部变量描述
java.nio.ByteBuffer()缓冲区类,用于存储中间结果。 当中间结果在节点之间传输以供执行时,可能会被序列化或反序列化。 因此,您还必须使用 serializeLength 变量来指定允许反序列化中间结果的长度。
serializeLength()允许反序列化中间结果的长度。 单位:字节。 将此局部变量设置为 INT 类型的值。 例如,State { int counter = 0; public int serializeLength() { return 4; }} 指定中间结果为 INT 数据类型,反序列化长度为 4 字节。 您可以根据您的业务需求调整这些设置。 例如,如果要将中间结果的数据类型指定为 LONG,并将反序列化的长度指定为 8 个字节,则传递 State { long counter = 0; public int serializeLength() { return 8; }}

请注意以下几点,以便反序列化存储在 java.nio.ByteBuffer 类中的中间结果

  • 不能调用依赖于 ByteBuffer 类的 remaining() 方法来反序列化状态。
  • 不能在 ByteBuffer 类上调用 clear() 方法。
  • serializeLength 的值必须与写入数据的长度相同。 否则,在序列化和反序列化期间会生成不正确的结果。

编译 UDWF

与常规聚合函数不同,UDWF 对多行数据(统称为窗口)进行操作,并为每一行返回一个值。 典型的窗口函数包括一个 OVER 子句,该子句将行划分为多个集合。 它对每组行执行计算,并为每一行返回一个值。

假设您要编译一个名为 MY_WINDOW_SUM_INT 的 UDWF。 与内置聚合函数 SUM(返回 BIGINT 类型的值)不同,MY_WINDOW_SUM_INT 函数仅支持 INT 数据类型的请求参数和返回参数。

package com.starrocks.udf.sample;

public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}

public State create() {
return new State();
}

public void destroy(State state) {

}

public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}

public void reset(State state) {
state.counter = 0;
}

public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}

用户定义的类必须实现 UDAF 所需的方法(因为 UDWF 是一种特殊的聚合函数)和下表中描述的 windowUpdate() 方法。

注意

方法中的请求参数和返回参数的数据类型必须与步骤 6中要执行的 CREATE FUNCTION 语句中声明的数据类型相同,并且符合本主题的“SQL 数据类型和 Java 数据类型之间的映射”部分中提供的映射。

方法描述
void windowUpdate(State state, int, int, int , int, ...)更新窗口的数据。 有关 UDWF 的更多信息,请参阅窗口函数。 每次将一行作为输入输入时,此方法都会获取窗口信息并相应地更新中间结果。
  • peer_group_start:当前分区的起始位置。 PARTITION BY 在 OVER 子句中用于指定分区列。 分区列中具有相同值的行被认为位于同一分区中。
  • peer_group_end:当前分区的结束位置。
  • frame_start:当前窗口框架的起始位置。 窗口框架子句指定了一个计算范围,该范围涵盖当前行和与当前行在指定距离内的行。 例如,ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING 指定一个计算范围,该范围涵盖当前行、当前行之前的上一行和当前行之后的下一行。
  • frame_end:当前窗口框架的结束位置。
  • inputs:作为输入输入到窗口的数据。 数据是一个数组包,仅支持特定的数据类型。 在此示例中,INT 值作为输入输入,数组包为 Integer[]。

编译 UDTF

UDTF 读取一行数据并返回多个可以视为表的值。 表值函数通常用于将行转换为列。

注意

StarRocks 允许 UDTF 返回一个由多行和一列组成的表。

假设您要编译一个名为 MY_UDF_SPLIT 的 UDTF。 MY_UDF_SPLIT 函数允许您使用空格作为分隔符,并支持 STRING 数据类型的请求参数和返回参数。

package com.starrocks.udf.sample;

public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}

用户定义的类定义的方法必须满足以下要求

注意

方法中的请求参数和返回参数的数据类型必须与步骤 6中要执行的 CREATE FUNCTION 语句中声明的数据类型相同,并且符合本主题的“SQL 数据类型和 Java 数据类型之间的映射”部分中提供的映射。

方法描述
TYPE[] process()运行 UDTF 并返回一个数组。

步骤 4:打包 Java 项目

运行以下命令以打包 Java 项目

mvn package

以下 JAR 文件在 target 文件夹中生成:udf-1.0-SNAPSHOT.jarudf-1.0-SNAPSHOT-jar-with-dependencies.jar

步骤 5:上传 Java 项目

将 JAR 文件 udf-1.0-SNAPSHOT-jar-with-dependencies.jar 上传到持续运行并且 StarRocks 集群中所有 FE 和 BE 都可以访问的 HTTP 服务器。 然后,运行以下命令以部署该文件

mvn deploy 

您可以使用 Python 设置一个简单的 HTTP 服务器,并将 JAR 文件上传到该 HTTP 服务器。

注意

步骤 6中,FE 将检查包含 UDF 代码的 JAR 文件并计算校验和,BE 将下载并执行 JAR 文件。

步骤 6:在 StarRocks 中创建 UDF

StarRocks 允许您在两种类型的命名空间中创建 UDF:数据库命名空间和全局命名空间。

  • 如果您对 UDF 没有可见性或隔离要求,您可以将其创建为全局 UDF。 然后,您可以通过使用函数名称来引用全局 UDF,而无需在函数名称中包含目录和数据库名称作为前缀。
  • 如果您对 UDF 有可见性或隔离要求,或者如果您需要在不同的数据库中创建相同的 UDF,则可以在每个单独的数据库中创建它。 这样,如果您的会话连接到目标数据库,则可以使用函数名称来引用 UDF。 如果您的会话连接到目标数据库以外的其他目录或数据库,则需要通过在函数名称中包含目录和数据库名称作为前缀来引用 UDF,例如,catalog.database.function

注意

在创建和使用全局 UDF 之前,您必须联系系统管理员授予您所需的权限。 有关更多信息,请参见GRANT

上传 JAR 包后,您可以在 StarRocks 中创建 UDF。 对于全局 UDF,您必须在创建语句中包含 GLOBAL 关键字。

语法

CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name
(arg_type [, ...])
RETURNS return_type
PROPERTIES ("key" = "value" [, ...])

参数

参数必需描述
GLOBAL是否创建全局 UDF,从 v3.0 开始支持。
AGGREGATE是否创建 UDAF 或 UDWF。
TABLE是否创建 UDTF。 如果未指定 AGGREGATETABLE,则会创建一个标量函数。
function_name您要创建的函数的名称。 您可以在此参数中包含数据库的名称,例如db1.my_func。 如果 function_name 包含数据库名称,则 UDF 在该数据库中创建。 否则,UDF 在当前数据库中创建。 新函数的名称及其参数不能与目标数据库中已存在的名称相同。 否则,无法创建该函数。 如果函数名称相同但参数不同,则创建成功。
arg_type函数的参数类型。 添加的参数可以用 , ... 表示。 对于支持的数据类型,请参见SQL 数据类型和 Java 数据类型之间的映射
return_type函数的返回类型。 对于支持的数据类型,请参见Java UDF
PROPERTIES函数的属性,这些属性因要创建的 UDF 的类型而异。

创建标量 UDF

运行以下命令以创建您在前面的示例中编译的标量 UDF

CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string) 
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
参数描述
symbolUDF 所属的 Maven 项目的类的名称。 此参数的值采用 <package_name>.<class_name> 格式。
typeUDF 的类型。 将该值设置为 StarrocksJar,该值指定 UDF 是基于 Java 的函数。
file您可以从中下载包含 UDF 代码的 JAR 文件的 HTTP URL。 此参数的值采用 http://<http_server_ip>:<http_server_port>/<jar_package_name> 格式。
isolation(可选)要在 UDF 执行中共享函数实例并支持静态变量,请将其设置为“shared”。

创建 UDAF

运行以下命令以创建您在前面的示例中编译的 UDAF

CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT) 
RETURNS INT
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES 中参数的描述与创建标量 UDF中的描述相同。

创建 UDWF

运行以下命令以创建您在前面的示例中编译的 UDWF

CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
properties
(
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

analytic:UDF 是否为窗口函数。 将该值设置为 true。 其他属性的描述与创建标量 UDF中的描述相同。

创建 UDTF

运行以下命令以创建您在前面的示例中编译的 UDTF

CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
properties
(
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES 中参数的描述与创建标量 UDF中的描述相同。

步骤 7:使用 UDF

创建 UDF 后,您可以根据您的业务需求对其进行测试和使用。

使用标量 UDF

运行以下命令以使用您在前面的示例中创建的标量 UDF

SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');

使用 UDAF

运行以下命令以使用您在前面的示例中创建的 UDAF

SELECT MY_SUM_INT(col1);

使用 UDWF

运行以下命令以使用您在前面的示例中创建的 UDWF

SELECT MY_WINDOW_SUM_INT(intcol) 
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;

使用 UDTF

运行以下命令以使用您在前面的示例中创建的 UDTF

-- Suppose that you have a table named t1, and the information about its columns a, b, and c1 is as follows:
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."

-- Run the MY_UDF_SPLIT() function.
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."

注意

  • 前面的代码段中的第一个 MY_UDF_SPLIT 是第二个 MY_UDF_SPLIT(一个函数)返回的列的别名。
  • 您不能使用 AS t2(f1) 来指定要返回的表及其列的别名。

查看 UDF

运行以下命令以查询 UDF

SHOW [GLOBAL] FUNCTIONS;

有关更多信息,请参见SHOW FUNCTIONS

删除 UDF

运行以下命令以删除 UDF

DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);

有关更多信息,请参见DROP FUNCTION

SQL 数据类型和 Java 数据类型之间的映射

注意

目前,Scalar UDF 仅支持非嵌套的 ARRAY 和 MAP 参数/返回类型。

SQL TYPEJava TYPE
BOOLEANjava.lang.Boolean
TINYINTjava.lang.Byte
SMALLINTjava.lang.Short
INTjava.lang.Integer
BIGINTjava.lang.Long
FLOATjava.lang.Float
DOUBLEjava.lang.Double
STRING/VARCHARjava.lang.String
ARRAYjava.util.List
Mapjava.util.Map

参数设置

在 StarRocks 集群中每个 Java 虚拟机 (JVM) 的 be/conf/be.conf 文件中配置以下环境变量,以控制内存使用情况。

JAVA_OPTS="-Xmx12G"

常见问题

创建 UDF 时是否可以使用静态变量? 不同 UDF 的静态变量是否会相互影响?

是的,您可以在编译 UDF 时使用静态变量。 不同 UDF 的静态变量是相互隔离的,即使 UDF 具有名称相同的类,也不会相互影响。