跳转至

4.4.2 Export

本文档将介绍如何使用 EXPORT 命令导出 Doris 中存储的数据。

有关 EXPORT 命令的详细介绍,请参考: EXPORT

1 概述

ExportDoris 提供的一种将数据异步导出的功能。该功能可以将用户指定的表或分区的数据,以指定的文件格式,导出到目标存储系统中,包括对象存储、 HDFS 或本地文件系统。

Export 是一个异步执行的命令,命令执行成功后,立即返回结果,用户可以通过 Show Export 命令查看该 Export 任务的详细信息。

关于如何选择 SELECT INTO OUTFILEEXPORT ,请参阅导出综述。

EXPORT 当前支持导出以下类型的表或视图

  • Doris 内表

  • Doris 逻辑视图

  • Doris Catalog

EXPORT 目前支持以下导出格式

  • Parquet

  • ORC

  • csv

  • csv_with_names

  • csv_with_names_and_types

不支持压缩格式的导出。

示例:

SQL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
mysql> EXPORT TABLE tpch1.lineitem TO "s3://my_bucket/path/to/exp_"
    -> PROPERTIES(
    ->     "format" = "csv",
    ->     "max_file_size" = "2048MB"
    -> )
    -> WITH s3 (
    ->   "s3.endpoint" = "${endpoint}",
    ->   "s3.region" = "${region}",
    ->   "s3.secret_key"="${sk}",
    ->   "s3.access_key" = "${ak}"
    -> );

提交作业后,可以通过 SHOW EXPORT 命令查询导出作业状态,结果举例如下:

SQL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
mysql> show export\G
*************************** 1. row ***************************
      JobId: 143265
      Label: export_0aa6c944-5a09-4d0b-80e1-cb09ea223f65
      State: FINISHED
   Progress: 100%
   TaskInfo: {"partitions":[],"parallelism":5,"data_consistency":"partition","format":"csv","broker":"S3","column_separator":"\t","line_delimiter":"\n","max_file_size":"2048MB","delete_existing_files":"","with_bom":"false","db":"tpch1","tbl":"lineitem"}
       Path: s3://ftw-datalake-test-1308700295/test_ycs_activeDefense_v10/test_csv/exp_
 CreateTime: 2024-06-11 18:01:18
  StartTime: 2024-06-11 18:01:18
 FinishTime: 2024-06-11 18:01:31
    Timeout: 7200
   ErrorMsg: NULL
OutfileInfo: [
  [
    {
      "fileNumber": "1",
      "totalRows": "6001215",
      "fileSize": "747503989bytes",
      "url": "s3://my_bucket/path/to/exp_6555cd33e7447c1-baa9568b5c4eb0ac_*"
    }
  ]
]
1 row in set (0.00 sec)

show export 命令返回的结果各个列的含义如下:

  • JobId :作业的唯一 ID

  • Label :该导出作业的标签,如果 Export 没有指定,则系统会默认生成一个。

  • State :作业状态:

    • PENDING :作业待调度

    • EXPORTING :数据导出中

    • FINISHED :作业成功

    • CANCELLED :作业失败

    • Progress :作业进度。该进度以查询计划为单位。假设一共 10 个线程,当前已完成 3 个,则进度为 30%

  • TaskInfo :以 Json 格式展示的作业信息:

    • db :数据库名

    • tbl :表名

    • partitions :指定导出的分区。空列表表示所有分区。

    • column_separator :导出文件的列分隔符。

    • line_delimiter :导出文件的行分隔符。

    • tablet num :涉及的总 Tablet 数量。

    • broker :使用的 broker 的名称。

    • coord num :查询计划的个数。

    • max_file_size :一个导出文件的最大大小。

    • delete_existing_files :是否删除导出目录下已存在的文件及目录。

    • columns :指定需要导出的列名,空值代表导出所有列。

    • format :导出的文件格式

  • Path :远端存储上的导出路径。

  • CreateTime/StartTime/FinishTime :作业的创建时间、开始调度时间和结束时间。

  • Timeout :作业超时时间。单位是秒。该时间从 CreateTime 开始计算。

  • ErrorMsg :如果作业出现错误,这里会显示错误原因。

  • OutfileInfo :如果作业导出成功,这里会显示具体的 SELECT INTO OUTFILE 结果信息。

提交 Export 作业后,在 Export 任务成功或失败之前可以通过 CANCEL EXPORT 命令取消导出作业。取消命令举例如下:

SQL
1
CANCEL EXPORT FROM tpch1 WHERE LABEL like "%export_%";

2 导出文件列类型映射

Export 支持导出数据为 ParquetORC 文件格式。 ParquetORC 文件格式拥有自己的数据类型, Doris 的导出功能能够自动将 Doris 的数据类型导出为 ParquetORC 文件格式的对应数据类型,具体映射关系请参阅导出综述文档的“导出文件列类型映射”部分。

3 示例

3.1 导出到 HDFS

db1.tbl1 表的 p1p2 分区中的 col1 列和 col2 列数据导出到 HDFS 上,设置导出作业的 labelmylabel 。导出文件格式为 csv (默认格式),列分割符为 , ,导出作业单个文件大小限制为 512MB

SQL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
TO "hdfs://host/path/to/export/"
PROPERTIES
(
    "label" = "mylabel",
    "column_separator"=",",
    "max_file_size" = "512MB",
    "columns" = "col1,col2"
)
with HDFS (
    "fs.defaultFS"="hdfs://hdfs_host:port",
    "hadoop.username" = "hadoop"
);

如果 HDFS 开启了高可用,则需要提供 HA 信息,如:

SQL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
TO "hdfs://HDFS8000871/path/to/export/"
PROPERTIES
(
    "label" = "mylabel",
    "column_separator"=",",
    "max_file_size" = "512MB",
    "columns" = "col1,col2"
)
with HDFS (
    "fs.defaultFS" = "hdfs://HDFS8000871",
    "hadoop.username" = "hadoop",
    "dfs.nameservices" = "your-nameservices",
    "dfs.ha.namenodes.your-nameservices" = "nn1,nn2",
    "dfs.namenode.rpc-address.HDFS8000871.nn1" = "ip:port",
    "dfs.namenode.rpc-address.HDFS8000871.nn2" = "ip:port",
    "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

如果 Hadoop 集群开启了高可用并且启用了 Kerberos 认证,可以参考如下 SQL 语句:

SQL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
TO "hdfs://HDFS8000871/path/to/export/"
PROPERTIES
(
    "label" = "mylabel",
    "column_separator"=",",
    "max_file_size" = "512MB",
    "columns" = "col1,col2"
)
with HDFS (
    "fs.defaultFS"="hdfs://hacluster/",
    "hadoop.username" = "hadoop",
    "dfs.nameservices"="hacluster",
    "dfs.ha.namenodes.hacluster"="n1,n2",
    "dfs.namenode.rpc-address.hacluster.n1"="192.168.0.1:8020",
    "dfs.namenode.rpc-address.hacluster.n2"="192.168.0.2:8020",
    "dfs.client.failover.proxy.provider.hacluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    "dfs.namenode.kerberos.principal"="hadoop/_HOST@REALM.COM"
    "hadoop.security.authentication"="kerberos",
    "hadoop.kerberos.principal"="<doris_test@REALM.COM>",
    "hadoop.kerberos.keytab"="/path/to/doris_test.keytab"
);

3.2 导出到 S3

s3_test 表中的所有数据导出到 s3 上,导出格式为 csv ,以不可见字符 \x07 作为行分隔符。

SQL
1
2
3
4
5
6
7
8
9
EXPORT TABLE s3_test TO "s3://bucket/a/b/c"
PROPERTIES (
    "line_delimiter" = "\\x07"
) WITH s3 (
    "s3.endpoint" = "xxxxx",
    "s3.region" = "xxxxx",
    "s3.secret_key"="xxxx",
    "s3.access_key" = "xxxxx"
)

3.3 导出到本地文件系统

export 数据导出到本地文件系统,需要在 fe.conf 中添加 enable_outfile_to_local=true 并且重启 FE

test 表中的所有数据导出到本地存储:

SQL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-- parquet 格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "columns" = "k1,k2",
  "format" = "parquet"
);

-- orc 格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "columns" = "k1,k2",
  "format" = "orc"
);

-- csv_with_names 格式,以‘AA’为列分割符,‘zz’为行分割符
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "format" = "csv_with_names",
  "column_separator"="AA",
  "line_delimiter" = "zz"
);

-- csv_with_names_and_types 格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "format" = "csv_with_names_and_types"
);

Tip

注意:导出到本地文件系统的功能不适用于公有云用户,仅适用于私有化部署的用户。并且默认用户对集群节点有完全的控制权限。 Doris 对于用户填写的导出路径不会做合法性检查。如果 Doris 的进程用户对该路径无写权限,或路径不存在,则会报错。同时处于安全性考虑,如果该路径已存在同名的文件,则也会导出失败。 Doris 不会管理导出到本地的文件,也不会检查磁盘空间等。这些文件需要用户自行管理,如清理等。

3.4 指定分区导出

导出作业支持仅导出 Doris 内表的部分分区,如仅导出 test 表的 p1p2 分区

SQL
1
2
3
4
5
6
EXPORT TABLE test
PARTITION (p1,p2)
TO "file:///home/user/tmp/"
PROPERTIES (
    "columns" = "k1,k2"
);

3.5 导出时过滤数据

导出作业支持导出时根据谓词条件过滤数据,仅导出符合条件的数据,如仅导出满足 k1 < 50 条件的数据

SQL
1
2
3
4
5
6
7
EXPORT TABLE test
WHERE k1 < 50
TO "file:///home/user/tmp/"
PROPERTIES (
    "columns" = "k1,k2",
    "column_separator"=","
);

3.6 导出外表数据

导出作业支持 Doris Catalog 外表数据:

SQL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
-- 创建一个 catalog
CREATE CATALOG `tpch` PROPERTIES (
    "type" = "trino-connector",
    "trino.connector.name" = "tpch",
    "trino.tpch.column-naming" = "STANDARD",
    "trino.tpch.splits-per-node" = "32"
);

-- 导出 Catalog 外表数据
EXPORT TABLE tpch.sf1.lineitem TO "file:///path/to/exp_"
PROPERTIES(
    "parallelism" = "5",
    "format" = "csv",
    "max_file_size" = "1024MB"
);

Tip

当前 Export 导出 Catalog 外表数据不支持并发导出,即使指定 parallelism 大于 1 ,仍然是单线程导出。

4 最佳实践

4.1 导出一致性

Export 导出支持 partition / tablets 两种粒度。 data_consistency 参数用来指定以何种粒度切分希望导出的表, none 代表 Tablets 级别, partition 代表 Partition 级别。

SQL
1
2
3
4
5
6
EXPORT TABLE test TO "file:///home/user/tmp"
PROPERTIES (
    "format" = "parquet",
    "data_consistency" = "partition",
    "max_file_size" = "512MB"
);

若设置 "data_consistency" = "partition"Export 任务底层构造的多个 SELECT INTO OUTFILE 语句都会导出不同的 partition

若设置 "data_consistency" = "none"Export 任务底层构造的多个 SELECT INTO OUTFILE 语句都会导出不同的 tablets ,但是这些不同的 tablets 有可能属于相同的 partition

关于 Export 底层构造 SELECT INTO OUTFILE 的逻辑,可参阅附录部分。

4.2 导出作业并发度

Export 可以设置不同的并发度来并发导出数据。指定并发度为 5

SQL
1
2
3
4
5
6
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "format" = "parquet",
  "max_file_size" = "512MB",
  "parallelism" = "5"
);

关于 Export 并发导出的原理,可参阅附录部分。

4.3 导出前清空导出目录

SQL
1
2
3
4
5
6
EXPORT TABLE test TO "file:///home/user/tmp"
PROPERTIES (
    "format" = "parquet",
    "max_file_size" = "512MB",
    "delete_existing_files" = "true"
);

如果设置了 "delete_existing_files" = "true" ,导出作业会先将 /home/user/ 目录下所有文件及目录删除,然后导出数据到该目录下。

Warning

注意:若要使用 delete_existing_files 参数,还需要在 fe.conf 中添加配置 enable_delete_existing_files = true 并重启 fe ,此时 delete_existing_files 才会生效。 delete_existing_files = true 是一个危险的操作,建议只在测试环境中使用。

4.4 设置导出文件的大小

导出作业支持设置导出文件的大小,如果单个文件大小超过设定值,则会按照指定大小分成多个文件导出。

SQL
1
2
3
4
5
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
    "format" = "parquet",
    "max_file_size" = "512MB"
);

通过设置 "max_file_size" = "512MB" ,则单个导出文件的最大大小为 512MB

5 注意事项

  • 内存限制

    通常一个 Export 作业的查询计划只有 扫描-导出 两部分,不涉及需要太多内存的计算逻辑。所以通常 2GB 的默认内存限制可以满足需求。

    但在某些场景下,比如一个查询计划,在同一个 BE 上需要扫描的 Tablet 过多,或者 Tablet 的数据版本过多时,可能会导致内存不足。可以调整 session 变量 exec_mem_limit 来调大内存使用限制。

  • 导出数据量

    不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 GB 。过大的导出会导致更多的垃圾文件和更高的重试成本。如果表数据量过大,建议按照分区导出。

    另外, Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟。

  • 导出文件的管理

    如果 Export 作业运行失败,已经生成的文件不会被删除,需要用户手动删除。

  • 数据一致性

    目前在 export 时只是简单检查 tablets 版本是否一致,建议在执行 export 过程中不要对该表进行导入数据操作。

  • 导出超时

    若导出的数据量很大,超过导出的超时时间,则 Export 任务会失败。此时可以在 Export 命令中指定 timeout 参数来增加超时时间并重试 Export 命令。

  • 导出失败

    Export 作业运行过程中,如果 FE 发生重启或切主,则 Export 作业会失败,需要用户重新提交。可以通过 show export 命令查看 Export 任务状态。

  • 导出分区数量

    一个 Export Job 允许导出的分区数量最大为 2000 ,可以在 fe.conf 中添加参数 maximum_number_of_export_partitions 并重启 FE 来修改该设置。

  • 并发导出

    在并发导出时,请注意合理地配置线程数量和并行度,以充分利用系统资源并避免性能瓶颈。在导出过程中,可以实时监控进度和性能指标,以便及时发现问题并进行优化调整。

  • 数据完整性

    导出操作完成后,建议验证导出的数据是否完整和正确,以确保数据的质量和完整性。

6 附录

6.1 并发导出原理

Export 任务的底层是执行 SELECT INTO OUTFILE SQL 语句。用户发起一个 Export 任务后, Doris 会根据 Export 要导出的表构造出一个或多个 SELECT INTO OUTFILE 执行计划,随后将这些 SELECT INTO OUTFILE 执行计划提交给 DorisJob Schedule 任务调度器, Job Schedule 任务调度器会自动调度这些任务并执行。

默认情况下, Export 任务是单线程执行的。为了提高导出的效率, Export 命令可以设置一个 parallelism 参数来并发导出数据。设置 parallelism 大于 1 后, Export 任务会使用多个线程并发的去执行 SELECT INTO OUTFILE 查询计划。 parallelism 参数实际就是指定执行 EXPORT 作业的线程数量。

一个 Export 任务构造一个或多个 SELECT INTO OUTFILE 执行计划的具体逻辑是:

  1. 选择导出的数据的一致性模型

    根据 data_consistency 参数来决定导出的一致性,这个只和语义有关,和并发度无关,用户要先根据自己的需求,选择一致性模型。

  2. 确定并发度

    根据 parallelism 参数确定由多少个线程来运行这些 SELECT INTO OUTFILE 执行计划。 parallelism 决定了最大可能的线程数。

    Tip

    注意:即使 Export 命令设置了 parallelism 参数,该 Export 任务的实际并发线程数量还与 Job Schedule 有关。 Export 任务设置多并发后,每一个并发线程都是 Job Schedule 提供的,所以如果此时 Doris 系统任务较繁忙, Job Schedule 的线程资源较紧张,那么有可能分给 Export 任务的实际线程数量达不到 parallelism 个数,影响 Export 的并发导出。此时可以通过减轻系统负载或调整 FE 配置 async_task_consumer_thread_num 增加 Job Schedule 的总线程数量来缓解这个问题。

  3. 确定每一个 outfile 语句的任务量

    每一个线程会根据 maximum_tablets_of_outfile_in_export 以及数据实际的分区数 / buckets 数来决定要拆分成多少个 outfile

    maximum_tablets_of_outfile_in_exportFE 的配置,默认值为 10 。该参数用于指定 Export 任务切分出来的单个 OutFile 语句中允许的最大 partitions / buckets 数量。修改该配置需要重启 FE

    举例:假设一张表共有 20partition ,每个 partition 都有 5buckets ,那么该表一共有 100buckets 。设置 data_consistency = none 以及 maximum_tablets_of_outfile_in_export = 10

    1. parallelism = 5 情况下

      Export 任务将把该表的 100buckets 分成 5 份,每个线程负责 20buckets 。每个线程负责的 20buckets 又将以 10 个为单位分成 2 组,每组 buckets 各由一个 outfile 查询计划负责。所以最终该 Export 任务有 5 个线程并发执行,每个线程负责 2outfile 语句,每个线程负责的 outfile 语句串行的被执行。

    2. parallelism = 3 情况下

      Export 任务将把该表的 100buckets 分成 3 份, 3 个线程分别负责 343333buckets 。每个线程负责的 buckets 又将以 10 个为单位分成 4 组(最后一组不足 10buckets ),每组 buckets 各由一个 outfile 查询计划负责。所以该 Export 任务最终有 3 个线程并发执行,每个线程负责 4outfile 语句,每个线程负责的 outfile 语句串行的被执行。

    3. parallelism = 120 情况下

      由于该表 buckets 只有 100 个,所以系统会将 parallelism 强制设为 100 ,并以 parallelism = 100 去执行。 Export 任务将把该表的 100buckets 分成 100 份,每个线程负责 1buckets 。每个线程负责的 1buckets 又将以 10 个为单位分成 1 组(该组实际就只有 1buckets ),每组 buckets 由一个 outfile 查询计划负责。所以最终该 Export 任务有 100 个线程并发执行,每个线程负责 1outfile 语句,每个 outfile 语句实际只导出 1buckets

当前版本若希望 Export 有一个较好的性能,建议设置以下参数:

  1. 打开 session 变量 enable_parallel_outfile

  2. 设置 Exportparallelism 参数为较大值,使得每一个线程只负责一个 SELECT INTO OUTFILE 查询计划。

  3. 设置 FE 配置 maximum_tablets_of_outfile_in_export 为较小值,使得每一个 SELECT INTO OUTFILE 查询计划导出的数据量较小。