跳转至

4.2.5 主键模型的更新事务

1 Update 并发控制

默认情况下,并不允许同一时间对同一张表并发进行多个 Update 操作。

主要原因是, Doris 目前支持的是行更新,这意味着,即使用户声明的是 SET v2 = 1 ,实际上,其他所有的 Value 列也会被覆盖一遍(尽管值没有变化)。

这就会存在一个问题,如果同时有两个 Update 操作对同一行进行更新,那么其行为可能是不确定的,也就是可能存在脏数据。

但在实际应用中,如果用户自己可以保证即使并发更新,也不会同时对同一行进行操作的话,就可以手动打开并发限制。通过修改 FE 配置 enable_concurrent_update ,当该配置值设置为 true 时,更新命令将不再提供事务保证。

Warning

注意:开启 enable_concurrent_update 配置后,会有一定的性能风险

2 Sequence 列

Unique 模型主要针对需要唯一主键的场景,可以保证主键唯一性约束,在同一批次中导入或者不同批次中导入的数据,替换顺序不做保证。替换顺序无法保证则无法确定最终导入到表中的具体数据,存在了不确定性。

为了解决这个问题, Doris 支持了 sequence 列,通过用户在导入时指定 sequence 列,相同 key 列下,按照 sequence 列的值进行替换,较大值可以替换较小值,反之则无法替换。该方法将顺序的确定交给了用户,由用户控制替换顺序。

Tip

sequence 列目前只支持 Unique 模型。

2.1 基本原理

通过增加一个隐藏列 DORIS_SEQUENCE_COL 实现,该列的类型由用户在建表时指定,在导入时确定该列具体值,并依据该值决定相同 Key 列下,哪一行生效。

2.1.1 建表

创建 Unique 表时,将按照用户指定类型自动添加一个隐藏列 DORIS_SEQUENCE_COL

2.1.2 导入

导入时, fe 在解析的过程中将隐藏列的值设置成 order by 表达式的值( broker loadroutine load ),或者 function_column.sequence_col 表达式的值( stream load ), value 列将按照该值进行替换。隐藏列 __DORIS_SEQUENCE_COL__ 的值既可以设置为数据源中一列,也可以是表结构中的一列。

2.2 使用语法

Sequence 列建表时有两种方式,一种是建表时设置 sequence_col 属性,一种是建表时设置 sequence_type 属性。

  1. 设置 sequence_col (推荐)

    创建 Unique 表时,指定 sequence 列到表中其他 column 的映射

    Bash
    1
    2
    3
    PROPERTIES (
        "function_column.sequence_col" = 'column_name',
    );
    

    sequence_col 用来指定 sequence 列到表中某一列的映射,该列可以为整型和时间类型( DATEDATETIME ),创建后不能更改该列的类型。

    导入方式和没有 sequence 列时一样,使用相对比较简单,推荐使用。

  2. 设置 sequence_type

    创建 Unique 表时,指定 sequence 列类型

    Bash
    1
    2
    3
    PROPERTIES (
        "function_column.sequence_type" = 'Date',
    );
    

    sequence_type 用来指定 sequence 列的类型,可以为整型和时间类型( DATEDATETIME )。

导入时需要指定 sequence 列到其他列的映射

  1. Stream Load

    stream load 的写法是在 header 中的 function_column.sequence_col 字段添加隐藏列对应的 source_sequence 的映射,示例

    Bash
    1
    curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData <http://host:port/api/testDb/testTbl/_stream_load>
    
  2. Broker Load

    ORDER BY 处设置隐藏列映射的 source_sequence 字段

    SQL
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://host:port/user/data/*/test.txt")
        INTO TABLE `tbl1`
        COLUMNS TERMINATED BY ","
        (k1,k2,source_sequence,v1,v2)
        ORDER BY source_sequence
    )
    WITH BROKER 'broker'
    (
        "username"="user",
        "password"="pass"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
    
  3. Routine Load

    映射方式同上,示例如下

    SQL
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
        [WITH MERGE|APPEND|DELETE]
        COLUMNS(k1, k2, source_sequence, v1, v2),
        WHERE k1  100 and k2 like "%doris%"
        [ORDER BY source_sequence]
        PROPERTIES
        (
            "desired_concurrent_number"="3",
            "max_batch_interval" = "20",
            "max_batch_rows" = "300000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false"
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
            "kafka_topic" = "my_topic",
            "kafka_partitions" = "0,1,2,3",
            "kafka_offsets" = "101,0,0,200"
        );
    

2.3 启用 sequence column 支持

在新建表时如果设置了 function_column.sequence_col 或者 function_column.sequence_type ,则新建表将支持 sequence column

对于一个不支持 sequence column 的表,如果想要使用该功能,可以使用如下语句: ALTER TABLE example_db.my_table ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "Date") 来启用。

如果不确定一个表是否支持 sequence column ,可以通过设置一个 session variable 来显示隐藏列 SET show_hidden_columns=true ,之后使用 desc tablename ,如果输出中有 __DORIS_SEQUENCE_COL__ 列则支持,如果没有则不支持。

2.4 使用示例

下面以 Stream Load 为例为示例来展示使用方式:

  1. 创建支持 sequence col 的表

    创建 unique 模型的 test_table 数据表,并指定 sequence 列映射到表中的 modify_date 列。

    SQL
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    CREATE TABLE test.test_table
    (
        user_id bigint,
        date date,
        group_id bigint,
        modify_date date,
        keyword VARCHAR(128)
    )
    UNIQUE KEY(user_id, date, group_id)
    DISTRIBUTED BY HASH (user_id) BUCKETS 32
    PROPERTIES(
        "function_column.sequence_col" = 'modify_date',
        "replication_num" = "1",
        "in_memory" = "false"
    );
    

    表结构如下:

    SQL
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    MySQL>  desc test_table;
    +-------------+--------------+------+-------+---------+---------+
    | Field       | Type         | Null | Key   | Default | Extra   |
    +-------------+--------------+------+-------+---------+---------+
    | user_id     | BIGINT       | No   | true  | NULL    |         |
    | date        | DATE         | No   | true  | NULL    |         |
    | group_id    | BIGINT       | No   | true  | NULL    |         |
    | modify_date | DATE         | No   | false | NULL    | REPLACE |
    | keyword     | VARCHAR(128) | No   | false | NULL    | REPLACE |
    +-------------+--------------+------+-------+---------+---------+
    
  2. 正常导入数据:

    导入如下数据

    Text Only
    1
    2
    3
    4
    5
    6
    1   2020-02-22  1   2020-02-21  a
    1   2020-02-22  1   2020-02-22  b
    1   2020-02-22  1   2020-03-05  c
    1   2020-02-22  1   2020-02-26  d
    1   2020-02-22  1   2020-02-23  e
    1   2020-02-22  1   2020-02-24  b
    

    此处以 stream load 为例

    Bash
    1
    curl --location-trusted -u root: -T testData <http://host:port/api/test/test_table/_stream_load>
    

    结果为

    SQL
    1
    2
    3
    4
    5
    6
    MySQL> select * from test_table;
    +---------+------------+----------+-------------+---------+
    | user_id | date       | group_id | modify_date | keyword |
    +---------+------------+----------+-------------+---------+
    |       1 | 2020-02-22 |        1 | 2020-03-05  | c       |
    +---------+------------+----------+-------------+---------+
    

    在这次导入中,因 sequence column 的值(也就是 modify_date 中的值)中 '2020-03-05' 为最大值,所以 keyword 列中最终保留了 c

  3. 替换顺序的保证

    上述步骤完成后,接着导入如下数据

    Text Only
    1
    2
    1   2020-02-22  1   2020-02-22  a
    1   2020-02-22  1   2020-02-23  b
    

    查询数据

    SQL
    1
    2
    3
    4
    5
    6
    MySQL [test]> select * from test_table;
    +---------+------------+----------+-------------+---------+
    | user_id | date       | group_id | modify_date | keyword |
    +---------+------------+----------+-------------+---------+
    |       1 | 2020-02-22 |        1 | 2020-03-05  | c       |
    +---------+------------+----------+-------------+---------+
    

    在这次导入的数据中,会比较所有已导入数据的 sequence column (也就是 modify_date ),其中 '2020-03-05' 为最大值,所以 keyword 列中最终保留了 c

  4. 再尝试导入如下数据

    Text Only
    1
    2
    1   2020-02-22  1   2020-02-22  a
    1   2020-02-22  1   2020-03-23  w
    

    查询数据

    SQL
    1
    2
    3
    4
    5
    6
    MySQL [test]> select * from test_table;
    +---------+------------+----------+-------------+---------+
    | user_id | date       | group_id | modify_date | keyword |
    +---------+------------+----------+-------------+---------+
    |       1 | 2020-02-22 |        1 | 2020-03-23  | w       |
    +---------+------------+----------+-------------+---------+
    

    此时就可以替换表中原有的数据。综上,在导入过程中,会比较所有批次的 sequence 列值,选择值最大的记录导入 Doris 表中。

2.5 注意

  1. 为防止误用,在 StreamLoad/BrokerLoad 等导入任务以及行更新 insert 语句中,用户必须显示指定 sequence 列(除非 sequence 列的默认值为 CURRENT_TIMESTAMP ),不然会收到以下报错信息:

    SQL
    1
    Table test_tbl has sequence column, need to specify the sequence column
    
  2. 自版本 2.0 起, DorisUnique Key 表的 Merge-on-Write 实现支持了部分列更新能力,在部分列更新导入中,用户每次可以只更新一部分列,因此并不是必须要包含 sequence 列。若用户提交的导入任务中,包含 sequence 列,则行为无影响;若用户提交的导入任务不包含 sequence 列, Doris 会使用匹配的历史数据中的 sequence 列作为更新后该行的 sequence 列的值。如果历史数据中不存在相同 key 的列,则会自动用 null 或默认值填充。

  3. 当出现并发导入时, Doris 会利用 MVCC 机制来保证数据的正确性。如果两批数据导入都更新了一个相同 key 的不同列,则其中系统版本较高的导入任务会在版本较低的导入任务成功后,使用版本较低的导入任务写入的相同 key 的数据行重新进行补齐。