tidb-dm-banner

Author: Minghao Guo (Database Engineer at PingCAP)
Editors: Calvin Weng, Tom Dewan

TiDB, an open-source NewSQL database, features horizontal scalability, which shelters users from the complexity of cross-shard queries and operations. However, when users migrate from a sharding solution to TiDB, the complexity is still there. The TiDB Data Migration (DM) tool supports full data migration and incremental data replication from sharding databases to TiDB.

This article explains how Sync, DM’s core component, processes Data Manipulation Language (DML) statements through the binlog replication workflow. It covers how Sync reads, filters, routes, and converts binlog events, and optimizes executions. 

The Sync binlog replication workflow

The following figure illustrates the binlog replication workflow.

Sync workflow

In this workflow, Sync:

  1. Reads binlog events from MySQL/MariaDB or relay logs.
  2. Converts binlog events.
    • The binlog Filter module filters binlog events according to binlog expressions. You can configure it using filter parameters.
    • The Routing module uses routing rules to convert database or table names. You can configure this module using route parameters.
    • The Expression Filter module uses SQL expressions to filter binlog events. You can configure this module using expression-filter parameters.
  3.  Optimizes DML statements.
    • The Compactor module merges multiple operations on the same row (with the same primary key) into one. You can enable this feature with the syncer.compact parameter.
    • The Causality module first detects conflicts between operations on different rows (with different primary keys) and then groups nonconflicting operations for concurrent processing.
    • The Merger module merges multiple binlog events into one DML statement. You can enable this feature with the syncer.multiple-rows parameter.
  4. Executes DML to the downstream database.
  5. Periodically saves binlog position or global transaction ID (GTID) as the checkpoint.

Optimization logic

Compactor

DM reads the upstream binlog file and replicates changes to the TiDB. If the Compactor module captures multiple changes on one row in a row period of time, it compresses these changes into one. This reduces pressure on the downstream and improves throughput. The following examples show how the changes are compacted:

INSERT + UPDATE => INSERT
INSERT + DELETE => DELETE
UPDATE + UPDATE => UPDATE
UPDATE + DELETE => DELETE
DELETE + INSERT => UPDATE

Causality

In general, MySQL binlog events are replicated in order. However, this mechanism can’t achieve low replication latency under high queries per second (QPS) scenarios. Besides, not all binlog events conflict. DM adopts a conflict detection mechanism called Causality to identify conflicting binlog events. The Causality module uses a disjoint-set structure-like algorithm to group conflicting and non-conflicting binlog events. DML statements of the same conflicting group will be executed sequentially, while DML statements of the non-conflicting group will be executed concurrently to achieve optimal performance. 

Merger

The MySQL replication protocol specifies that one binlog event corresponds to a modifying operation. DM can merge multiple binlog events into one DML statement with the Merger module and then execute the merged command to TiDB. This helps reduce overhead from network interactions. The following examples show how binlog events are merged:

INSERT tb(a,b) VALUES(1,1);
+ INSERT tb(a,b) VALUES(2,2);
= INSERT tb(a,b) VALUES(1,1),(2,2);
UPDATE tb SET a=1, b=1 WHERE a=1;
+ UPDATE tb SET a=2, b=2 WHERE a=2;
= INSERT tb(a,b) VALUES(1,1),(2,2) ON DUPLICATE UPDATE a=VALUES(a), b=VALUES(b)
DELETE tb WHERE a=1
+ DELETE tb WHERE a=2
= DELETE tb WHERE (a) IN (1),(2);

Execution logic

DML generation

DM has a built-in schema tracker to record schemas of both upstream and downstream databases. When receiving a Data Definition Language (DDL) statement, DM updates the table structure in the schema tracker. When DM receives a DML statement, it generates the corresponding DML according to the table structure in the schema tracker. The processing logic is:

  • If both full and incremental replication tasks are in progress, the Sync module will use the table structure dumped from the upstream full replication as its initial upstream table structure.
  • If only the incremental replication task is started, there will be no table structure in the MySQL binlog file. The Sync module will use the corresponding downstream table structure as its initial upstream table structure.
  • The table structure may differ between upstream and downstream. For example, the downstream table may have extra columns, or downstream and upstream tables may have different primary keys. To ensure consistency among data replications, DM records the primary key and unique key of the corresponding downstream table.
  • To form a DML statement, DM:
    1. Generates the DML columns with the upstream table structure recorded in the schema tracker.
    2. Generates the column values of the DML based on the column values recorded in the binlog file.
    3. Generates the WHERE conditions of the DM based on the downstream primary key and unique key recorded in the schema tracker. If the downstream table has no unique key, DM will use all column values in the binlog file as WHERE conditions.

Worker count

As mentioned earlier, the Causality module uses a conflict detection algorithm to divide the binlog events into multiple groups for concurrent execution in the downstream database. DM’s concurrency is controlled by the syncer.worker-count parameter. If CPU usage of the downstream TiDB nodes is not high, increasing concurrency can improve the throughput of data replication.

Batch processing

DM accumulates multiple DML statements into one transaction for execution in the downstream database. When receiving a DML statement, DML worker adds it to the cache. If the number of cached statements meets the threshold or no DML statement is received for a prolonged period of time, DML worker executes the cached DML statements to the downstream database. You can configure the number of cached statements via the syncer.batch parameter.

Operators for consistent and safe replication 

Checkpoint

By default, DM’s checkpoint is updated every 30 seconds. Because there are multiple DML workers, the checkpoint process has to find the latest binlog position to be used as the current checkpoint for replication. Binlog events before this point in time will be successfully replicated to TiDB.

Transaction consistency

DM replication is by rows. DM splits an upstream transaction into multiple rows, which are then distributed to DML workers for concurrent executions. If a DM replication task is suspended either by an error, or by intention, the downstream database may stay in an intermediate state where DML statements of an upstream transaction are partly replicated. When this happens, the downstream database is inconsistent with the upstream. To avoid this kind of inconsistency as much as possible, DM 5.3 introduced a 10-second wait time to allow the replication task to complete before it gets suspended. However, there is still a chance of inconsistency if the upstream transaction fails to be replicated within the wait time.

Safe mode

As indicated in the execution logic section, DML executing and checkpoint refreshing are not synchronous. This means the checkpoint refresh and the binlog replication downstream may not be atomic. If DM aborts due to an error, the checkpoint may record a recovery point prior to the abortion time. As a result, when the replication task restarts, duplicate replications might occur. In other words, DM actually provides a logic of at-least-once processing, which means data may be processed more than once. If you restart DM after an exception, DM will enter safe mode to support importing duplicate data. The logic is as follows:

  • If a DM task is suspended gracefully, DM will execute all cached DML statements to the downstream database and refresh the checkpoint. In this case, DM doesn’t enter safe mode after it restarts. The data before the checkpoint is already replicated, and the data after the checkpoint is not replicated yet. There will be no duplicate replications.
  • If a DM task is suspended with an error, DM firstly will try to execute all cached DML statements to the downstream database. This attempt may fail due to possible data conflicts. DM then gets the latest upstream binlog position from the memory and records it as safemode_exit_point, which means the exit point for the safe mode. This exit point and the latest checkpoint are refreshed in downstream for comparison. When the task resumes, possible conditions are as follows:
    • If the checkpoint position is the same as the exit point, it means that all binlog events are replicated to the downstream database when DM is suspended. Therefore, the task will be processed the way a gracefully suspended task is handled, and DM won’t enter the safe mode.
    • If the checkpoint position is earlier than the exit point, it means that some cached DML statements have failed in downstream. In this case, DM will enable the safe mode to replicate binlog events between the checkpoint and the safemode_exit_point. There might be duplicated replications for these events.
    • If there is no value for safemode_exit_point, it means that either the operation to update safemode_exit_point failed or DM was terminated forcibly. In this case, DM can’t identify the exit point of the replication process. Therefore, when the task resumes, DM enables the safe mode for a one-minute interval between the two checkpoints by default. After that, DM stops the safe mode, and the normal replication process resumes.

In the safe mode, DM converts DML statements as follows so that binlog can be replicated at least once:

  • The upstream INSERT statements are converted into REPLACE statements.
  • The upstream UPDATE statements are converted into DELETE + REPLACE statements.

Summary 

While Sync works fine in many scenarios, the processing logic of splitting transactions before concurrent execution may still cause some problems. For example, the replication to the downstream may be suspended due to an inconsistent status with the upstream. Duplicated data replications might be performed in safe mode, which is not acceptable for change data capture from the downstream database. 

A short-term plan to implement exactly-once processing logic for binlog event replication with DM. If you are interested or would like to contribute to the TiDB DM project, join us in TiDB Internals for further discussion.

minghao.guo

About the Author

minghao.guo

More From minghao.guo

Subscribe to Stay Informed!

TiDB Cloud logo-black

TiDB Cloud

Get the massive scale and resiliency of TiDB databases in a fully managed cloud service

TiDB logo-black

TiDB

TiDB is effortlessly scalable, open, and trusted to meet the real-time needs of the digital enterprise