背景:最近使用 Debezium 做数据库数据监听工具,然后使用 C# 开发管理工具对接 kafka 获取增量同步的数据,这个 C# 管理工具需要支持自定义下游数据库映射,因此再新增加映射关系时需要批量将上游数据全量同步到下游,然后才能从 Kafka 中开始使用增量数据同步,以便保持上下游的数据一致。
刚开始,笔者使用 mysqldump 工具导出表数据,然后使用 C# 读取 SQL 文件,再导入到下游表。可是这样做的性能比较差、速度比较慢,而且不能修改表名称。因为上游表名称是 A , 下游表名称是 v_A ,内部需要映射表时需要加前缀以便区分名称,因此 mysqldump 做这需求,导入数据到下游的 A 表后,还需要后续写一下代码修改表名称。
但是后来笔者发现 MySqlBulkCopy 可以做到很简单的代码完成这些操作,而且具有高性能的特点。
首先定义四个变量,用来存储上下游数据库和表的名称,数据库可以属于不同的数据源,实现跨数据库同步表数据。
var databaseName = "a";
var table = "mytable";
var targetDatabaseName = "b";
var targetTable = "v_mytable";
然后创建两个数据库连接:
var connection = new MySqlConnection("连接字符串");
await targetConnection.OpenAsync();
var targetConnection = new MySqlConnection("连接字符串");
await targetConnection.OpenAsync();
连接字符串需要打开 AllowLoadLocalInfile 配置。
// 支持批量导入 See https://fl.vu/mysql-load-data
AllowLoadLocalInfile = true
第一步,查询上游数据库的表,获取列结构。
string columnSQL = $"SELECT * FROM <code class="kb-btn">databaseName</code>.<code class="kb-btn">table</code> where 1=0;";
var list = await connection.GetTableColumnsAsync(columnSQL);
// 获取列名
List<string> columnNames = list.Select(x => x.ColumnName).ToList();
这个 SQL 的目的是,查不到数据,然后只需要读取表的列字段即可。而上面笔者是使用了 ORM 框架的缘故,有一个 GetTableColumnsAsync 方法可以快速读取列的名称以及类型。
如果你直接使用原生的 IDbConnection,那么可以参考这个方法获取表的结构:https://stackoverflow.com/questions/17353089/get-the-column-names-of-a-table-and-store-them-in-a-string-or-var-c-sharp-asp-ne
最后,从源表中读取数据同步到下游:
string querySQL = $"SELECT * FROM <code class="kb-btn">databaseName</code>.<code class="kb-btn">table</code>;";
using (var reader = await connection.ExecuteReaderAsync(querySQL))
{
bool hasData = true;
while (hasData)
{
int count = 0;
var bulkCopy = new MySqlBulkCopy(targetConnection)
{
DestinationTableName = targetTable
};
var dataTable = new DataTable();
foreach (var item in list)
{
dataTable.Columns.Add(item.ColumnName, item.DataType);
}
while (hasData)
{
hasData = await reader.ReadAsync();
if (hasData)
{
List<object?> values = new List<object?>();
// 执行批量插入操作
foreach (string columnName in columnNames)
{
values.Add(reader[columnName]);
}
dataTable.Rows.Add(values.ToArray());
count++;
}
else if (count != 0)
{
await bulkCopy.WriteToServerAsync(dataTable);
break;
}
if (count == 5000)
{
await bulkCopy.WriteToServerAsync(dataTable);
break;
}
}
}
}
如果每次批量 count=10_0000
时,13w 条数据,笔者测试只需要 6s 左右。
文章评论