跳到主要内容
版本: 最新版本-3.5

SPARK LOAD

SPARK LOAD 通过外部 spark 资源预处理导入的数据,提高大量 StarRocks 数据的导入性能,并节省 StarRocks 集群的计算资源。它主要用于初始迁移和大量数据导入 StarRocks 的场景。

Spark load 是一种异步导入方法。用户需要通过 MySQL 协议创建 Spark 类型导入任务,并通过SHOW LOAD查看导入结果。

注意

  • 只有对 StarRocks 表具有 INSERT 权限的用户才能将数据加载到 StarRocks 表中。如果您没有 INSERT 权限,请按照GRANT中的说明授予您用于连接 StarRocks 集群的用户的 INSERT 权限。
  • 当 Spark Load 用于将数据加载到 StarRocks 表中时,StarRocks 表的 bucketing 列不能是 DATE、DATETIME 或 DECIMAL 类型。

语法

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH RESOURCE resource_name
[resource_properties]
[opt_properties]

1.load_label

当前导入批次的标签。在数据库中是唯一的。

语法

[database_name.]your_label

2.data_desc

用于描述一批导入的数据。

语法

DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (col2, ...)]
[SET (k1 = func(k2))]
[WHERE predicate]

DATA FROM TABLE hive_external_tbl
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]

注意

file_path:

The file path can be specified to one file, or the * wildcard can be used to specify all files in a directory. Wildcards must match files, not directories.

hive_external_tbl:

hive external table name.
It is required that the columns in the imported starrocks table must exist in the hive external table.
Each load task only supports loading from one Hive external table.
Cannot be used with file_ path mode at the same time.

PARTITION:

If this parameter is specified, only the specified partition will be imported, and the data outside the imported partition will be filtered out.
If not specified, all partitions of table will be imported by default.

NEGATIVE:

If this parameter is specified, it is equivalent to loading a batch of "negative" data. Used to offset the same batch of previously imported data.
This parameter is only applicable when the value column exists and the aggregation type of the value column is SUM only.

column_separator:

Specifies the column separator in the import file. Default is \ t
If it is an invisible character, you need to prefix it with \ \ x and use hexadecimal to represent the separator.
For example, the separator of hive file \ x01 is specified as "\ \ x01"

file_type:

Used to specify the type of imported file. Currently, supported file types are csv, orc, and parquet.

column_list:

Used to specify the correspondence between the columns in the import file and the columns in the table.
When you need to skip a column in the import file, specify the column as a column name that does not exist in the table.

Syntax:
(col_name1, col_name2, ...)

SET:

If specify this parameter, you can convert a column of the source file according to the function, and then import the converted results into table. Syntax is column_name = expression.
Only Spark SQL build_in functions are supported. Please refer to https://spark.apache.ac.cn/docs/2.4.6/api/sql/index.html.
Give a few examples to help understand.
Example 1: there are three columns "c1, c2, c3" in the table, and the first two columns in the source file correspond to (c1, c2), and the sum of the last two columns corresponds to C3; then columns (c1, c2, tmp_c3, tmp_c4) set (c3 = tmp_c3 + tmp_c4) needs to be specified;
Example 2: there are three columns "year, month and day" in the table, and there is only one time column in the source file in the format of "2018-06-01 01:02:03".
Then you can specify columns (tmp_time) set (year = year (tmp_time), month = month (tmp_time), day = day (tmp_time)) to complete the import.

WHERE:

Filter the transformed data, and only the data that meets the where condition can be imported. Only the column names in the table can be referenced in the WHERE statement

3.resource_name

所使用的 spark 资源的名称,可以通过 SHOW RESOURCES 命令查看。

4.resource_properties

当您有临时需要时,例如修改 Spark 和 HDFS 配置,您可以在此处设置参数,这些参数仅在此特定的 spark 加载作业中生效,而不会影响 StarRocks 集群中的现有配置。

5.opt_properties

用于指定一些特殊参数。

语法

[PROPERTIES ("key"="value", ...)]

您可以指定以下参数: timeout:指定导入操作的超时时间。默认超时时间为 4 小时。以秒为单位。 max_filter_ratio允许过滤的最大数据比例(由于诸如非标准数据等原因)。默认情况下为零容忍度。 strict mode:是否严格限制数据。默认为 false。 timezone:指定受时区影响的某些函数的时区,例如 strftime / alignment_timestamp/from_unixtime 等。有关详细信息,请参阅 [时区] 文档。如果未指定,则使用“Asia / Shanghai”时区。

6.导入数据格式示例

int (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234 float (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356 date (DATE/DATETIME) :2017-10-03, 2017-06-13 12:34:03. (注意:对于其他日期格式,您可以使用 strftime 或 time_format 函数在导入命令中进行转换) string class (CHAR/VARCHAR): "I am a student", "a"

NULL 值:\ N

示例

  1. 从 HDFS 导入一批数据,并指定超时时间和过滤比率。使用名称 my_ spark 资源用于 spark。

    LOAD LABEL example_db.label1
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    )
    WITH RESOURCE 'my_spark'
    PROPERTIES
    (
    "timeout" = "3600",
    "max_filter_ratio" = "0.1"
    );

    其中 hdfs_host 是 namenode 的主机,hdfs_port 是 fs.defaultfs 端口(默认为 9000)

  2. 从 HDFS 导入一批“负”数据,指定分隔符为逗号,使用通配符 * 指定目录中的所有文件,并指定 spark 资源的临时参数。

    LOAD LABEL example_db.label3
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/*")
    NEGATIVE
    INTO TABLE `my_table`
    COLUMNS TERMINATED BY ","
    )
    WITH RESOURCE 'my_spark'
    (
    "spark.executor.memory" = "3g",
    "broker.username" = "hdfs_user",
    "broker.password" = "hdfs_passwd"
    );
  3. 从 HDFS 导入一批数据,指定分区,并对导入文件的列进行一些转换,如下所示

    The table structure is:
    k1 varchar(20)
    k2 int

    Assume that the data file has only one line of data:

    Adele,1,1

    Each column in the data file corresponds to each column specified in the import statement:
    k1,tmp_k2,tmp_k3

    The conversion is as follows:

    1. k1: no conversion
    2. k2:is the sum of tmp_ k2 and tmp_k3

    LOAD LABEL example_db.label6
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    PARTITION (p1, p2)
    COLUMNS TERMINATED BY ","
    (k1, tmp_k2, tmp_k3)
    SET (
    k2 = tmp_k2 + tmp_k3
    )
    )
    WITH RESOURCE 'my_spark';
  4. 提取文件路径中的分区字段

    如有必要,文件路径中的分区字段将按照表中定义的字段类型进行解析,类似于 Spark 中 Partition Discovery 的功能

    LOAD LABEL example_db.label10
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/*/*")
    INTO TABLE `my_table`
    (k1, k2, k3)
    COLUMNS FROM PATH AS (city, utc_date)
    SET (uniq_id = md5sum(k1, city))
    )
    WITH RESOURCE 'my_spark';

    hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing 目录包括以下文件

    [hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]

    提取文件路径中的城市和 utc_date 字段

  5. 过滤要导入的数据。只能导入 k1 值大于 10 的列。

    LOAD LABEL example_db.label10
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    WHERE k1 > 10
    )
    WITH RESOURCE 'my_spark';
  6. 从 hive 外部表导入,并通过全局字典将源表中的 uuid 列转换为 bitmap 类型。

    LOAD LABEL db1.label1
    (
    DATA FROM TABLE hive_t1
    INTO TABLE tbl1
    SET
    (
    uuid=bitmap_dict(uuid)
    )
    )
    WITH RESOURCE 'my_spark';