7.3.2 Iceberg¶
自 2.1.6
版本开始, Apache Doris
支持对 Iceberg
的 DDL
和 DML
操作。用户可以直接通过 Apache Doris
在 Iceberg
中创建库表,并将数据写入到 Iceberg
表中。通过该功能,用户可以通过 Apache Doris
对 Iceberg
进行完整的数据查询和写入操作,进一步帮助用户简化湖仓一体架构。
本文介绍在 Apache Doris
中支持的 Iceberg
操作,语法和使用须知。
Tip
这是一个实验功能。
Tip
使用前,请先设置:
set global enable_nereids_planner = true;
set global enable_fallback_to_original_planner = false;
1 元数据创建与删除¶
1.1 Catalog¶
-
创建
SQL 1 2 3 4 5 6 7 8
CREATE CATALOG [IF NOT EXISTS] iceberg PROPERTIES ( "type" = "iceberg", "iceberg.catalog.type" = "hms", "hive.metastore.uris" = "thrift://172.21.16.47:7004", "warehouse" = "hdfs://172.21.16.47:4007/user/hive/warehouse/", "hadoop.username" = "hadoop", "fs.defaultFS" = "hdfs://172.21.16.47:4007" );
上面主要演示了如何在
Apache Doris
中创建HMS Iceberg Catalog
。Apache Doris
目前支持多种类型的Iceberg Catalog
。更多配置,请参阅Iceberg Catalog
注意:
-
如果需要通过
Apache Doris
的Hive Catalog
创建Iceberg
表或写入数据,需要在Catalog
属性中显式增加fs.defaultFS
属性以及warehouse
属性。如果创建Catalog
仅用于查询,则这两个参数可以省略。 -
Hive Catalog
可以查询Iceberg
表,但是不能在Hive Catalog
中创建Iceberg
表。
-
-
删除
SQL 1
DROP CATALOG [IF EXISTS] iceberg;
删除
Catalog
并不会删除Iceberg
中的任何库表信息。仅仅是在Apache Doris
中移除了对这个Iceberg Catalog
的映射。
1.2 Database¶
-
创建
可以通过
SWITCH
语句切换到对应的Catalog
下,执行CREATE DATABASE
语句:SQL 1 2
SWITCH iceberg; CREATE DATABASE [IF NOT EXISTS] iceberg_db;
也可以使用全限定名创建,或指定
location
,如:SQL 1
CREATE DATABASE [IF NOT EXISTS] iceberg.iceberg_db;
之后可以通过
SHOW CREATE DATABASE
命令可以查看Database
的相关信息:SQL 1 2 3 4 5 6
mysql> SHOW CREATE DATABASE iceberg_db; +------------+------------------------------+ | Database | Create Database | +------------+------------------------------+ | iceberg_db | CREATE DATABASE `iceberg_db` | +------------+------------------------------+
-
删除
SQL 1
DROP DATABASE [IF EXISTS] iceberg.iceberg_db;
Warning
对于
Iceberg Database
,必须先删除这个Database
下的所有表后,才能删除Database
,否则会报错。这个操作会同步删除Iceberg
中对应的Database
。
1.3 Table¶
-
创建
Apache Doris
支持在Iceberg
中创建分区或非分区表。SQL 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
-- Create unpartitioned iceberg table CREATE TABLE unpartitioned_table ( `col1` BOOLEAN COMMENT 'col1', `col2` INT COMMENT 'col2', `col3` BIGINT COMMENT 'col3', `col4` FLOAT COMMENT 'col4', `col5` DOUBLE COMMENT 'col5', `col6` DECIMAL(9,4) COMMENT 'col6', `col7` STRING COMMENT 'col7', `col8` DATE COMMENT 'col8', `col9` DATETIME COMMENT 'col9' ) ENGINE=iceberg PROPERTIES ( 'write-format'='parquet' ); -- Create partitioned iceberg table -- The partition columns must be in table's column definition list CREATE TABLE partition_table ( `ts` DATETIME COMMENT 'ts', `col1` BOOLEAN COMMENT 'col1', `col2` INT COMMENT 'col2', `col3` BIGINT COMMENT 'col3', `col4` FLOAT COMMENT 'col4', `col5` DOUBLE COMMENT 'col5', `col6` DECIMAL(9,4) COMMENT 'col6', `col7` STRING COMMENT 'col7', `col8` DATE COMMENT 'col8', `col9` DATETIME COMMENT 'col9', `pt1` STRING COMMENT 'pt1', `pt2` STRING COMMENT 'pt2' ) ENGINE=iceberg PARTITION BY LIST (DAY(ts), pt1, pt2) () PROPERTIES ( 'write-format'='orc', 'compression-codec'='zlib' );
创建后,可以通过
SHOW CREATE TABLE
命令查看Iceberg
的建表语句。关于分区表的分区函数,可以参阅后面的【分区】小节。 -
列类型
在
Apache Doris
中创建Iceberg
表所使用的列类型,和Iceberg
中的列类型对应关系如下Apache Doris Iceberg BOOLEAN BOOLEAN INT INT BIGINT BIGINT FLOAT FLOAT DOUBLE DOUBLE DECIMAL DECIMAL STRING STRING DATE DATE DATETIME TIMESTAMP ARRAY ARRAY MAP MAP STRUCT STRUCT -
注意:目前只支持这些数据类型,其它数据类型会报错。
-
列类型暂时只能为默认的
Nullable
,不支持NOT NULL
。 -
插入数据后,如果类型不能够兼容,例如
'abc'
插入到数值类型,则会转为null
值后插入。
-
-
删除
可以通过
DROP TABLE
语句删除一个Iceberg
表。当前删除表后,会同时删除数据,包括分区数据。 -
分区
Iceberg
中的分区类型对应Apache Doris
中的List
分区。因此,在Apache Doris
中创建Iceberg
分区表,需使用List
分区的建表语句,但无需显式的枚举各个分区。在写入数据时,Apache Doris
会根据数据的值,自动创建对应的Iceberg
分区。-
支持创建单列或多列分区表。
-
支持分区转换函数来支持
Iceberg
隐式分区以及分区演进的功能。具体Iceberg
分区转换函数可以查看Iceberg partition transforms
-
year(ts)
或者years(ts)
-
month(ts)
或者months(ts)
-
day(ts)
或者days(ts)
或者date(ts)
-
hour(ts)
或者hours(ts)
或者date_hour(ts)
-
bucket(N, col)
-
truncate(L, col)
-
-
-
文件格式
-
Parquet
(默认) -
ORC
-
-
压缩格式
-
Parquet
:snappy
,zstd
(默认),plain
。(plain
就是不采用压缩) -
ORC
:snappy
,zlib
(默认),zstd
,plain
。(plain
就是不采用压缩)
-
-
存储介质
-
HDFS
-
对象存储
-
2 数据操作¶
可以通过 INSERT
语句将数据写入到 Iceberg
表中。
支持写入到由 Apache Doris
创建的 Iceberg
表,或者 Iceberg
中已存在的且格式支持的表。
对于分区表,会根据数据,自动写入到对应分区,或者创建新的分区。
目前不支持指定分区写入。
2.1 INSERT¶
INSERT
操作会数据以追加的方式写入到目标表中。
SQL | |
---|---|
1 2 3 4 5 |
|
2.2 INSERT OVERWRITE¶
INSERT OVERWRITE
会使用新的数据完全覆盖原有表中的数据。
SQL | |
---|---|
1 2 |
|
2.3 CTAS(CREATE TABLE AS SELECT)¶
可以通过 CTAS
语句创建 Iceberg
表并写入数据:
SQL | |
---|---|
1 |
|
CTAS
支持指定文件格式、分区方式等信息,如:
SQL | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
3 异常数据和数据转换¶
TODO
3.1 HDFS 文件操作¶
在 HDFS
上的 Iceberg
表数据会写入到最终目录,提交 Iceberg
元数据进行管理。
写入的数据文件名称格式为: <query-id>_<uuid>-<index>.<compress-type>.<file-type>
3.2 对象存储文件操作¶
TODO
4 相关参数¶
4.1 FE¶
TODO
4.2 BE¶
参数名称 | 默认值 | 描述 |
---|---|---|
iceberg_sink_max_file_size | 最大的数据文件大小。当写入数据量超过该大小后会关闭当前文件,滚动产生一个新文件继续写入。 | 1GB |
table_sink_partition_write_max_partition_nums_per_writer | BE 节点上每个 Instance 最大写入的分区数目。 | 128 |
table_sink_non_partition_write_scaling_data_processed_threshold | 非分区表开始 scaling-write 的数据量阈值。每增加table_sink_non_partition_write_scaling_data_processed_threshold 数据就会发送给一个新的 writer(instance) 进行写入。scaling-write 机制主要是为了根据数据量来使用不同数目的 writer(instance) 来进行写入,会随着数据量的增加而增大写入的 writer(instance) 数目,从而提高并发写入的吞吐。当数据量比较少的时候也会节省资源,并且尽可能地减少产生的文件数目。 | 25MB |
table_sink_partition_write_min_data_processed_rebalance_threshold | 分区表开始触发重平衡的最少数据量阈值。如果 当前累积的数据量 - 自从上次触发重平衡或者最开始累积的数据量 >= table_sink_partition_write_min_data_processed_rebalance_threshold,就开始触发重平衡机制。如果发现最终生成的文件大小差异过大,可以调小改阈值来增加均衡度。当然过小的阈值会导致重平衡的成本增加,可能会影响性能。 | 25MB |
table_sink_partition_write_min_partition_data_processed_rebalance_threshold | 分区表开始进行重平衡时的最少的分区数据量阈值。如果 当前分区的数据量 >= 阈值 * 当前分区已经分配的 task 数目,就开始对该分区进行重平衡。如果发现最终生成的文件大小差异过大,可以调小改阈值来增加均衡度。当然过小的阈值会导致重平衡的成本增加,可能会影响性能。 | 15MB |