C# Using MySqlBulkCopy for High-Performance Bulk Data Insertion

2023年12月21日 4052点热度 0人点赞 0条评论
内容目录

背景:最近使用 Debezium 作为数据库数据监听工具,并利用 C# 开发管理工具对接 Kafka 来获取增量同步的数据。这个 C# 管理工具需要支持自定义下游数据库映射,因此在新增映射关系时,需要批量将上游数据全量同步到下游,才能从 Kafka 中开始使用增量数据同步,以保持上下游的数据一致性。

最开始,笔者使用 mysqldump 工具导出表数据,然后通过 C# 读取 SQL 文件,接着导入到下游表。然而,这种做法性能较差、速度很慢,而且不能修改表名称。因为上游表名称为 A,下游表名称为 v_A,在内部需要加上前缀以便区分名称。因此,使用 mysqldump 进行此操作后,需要后续写代码来修改表名称。

但后来笔者发现 MySqlBulkCopy 可以用更简单的代码完成这些操作,并且具有高性能的特点。

首先定义四个变量,用于存储上下游数据库和表的名称,数据库可以属于不同的数据源,实现跨数据库同步表数据。

			var databaseName = "a";
			var table = "mytable";
			var targetDatabaseName = "b";
			var targetTable = "v_mytable";

然后创建两个数据库连接:

			var connection = new MySqlConnection("连接字符串");
			await connection.OpenAsync();
			var targetConnection = new MySqlConnection("连接字符串");
			await targetConnection.OpenAsync();

连接字符串需要打开 AllowLoadLocalInfile 配置。

				// 支持批量导入 See https://fl.vu/mysql-load-data
				AllowLoadLocalInfile = true

第一步,查询上游数据库的表,获取列结构。

string columnSQL = $"SELECT * FROM `{databaseName}`.`{table}` where 1=0;";
var list = await connection.GetTableColumnsAsync(columnSQL);
// 获取列名
List<string> columnNames = list.Select(x => x.ColumnName).ToList();

这个 SQL 的目的是,查不到数据,进而只需读取表的列字段即可。上述代码使用了 ORM 框架的 GetTableColumnsAsync 方法,可以快速读取列的名称和类型。

如果直接使用原生的 IDbConnection,可以参考这个方法获取表的结构:Stack Overflow

最后,从源表读取数据并同步到下游:

			string querySQL = $"SELECT * FROM `{databaseName}`.`{table}`;";
			using (var reader = await connection.ExecuteReaderAsync(querySQL))
			{
				bool hasData = true;
				while (hasData)
				{
					int count = 0;
					var bulkCopy = new MySqlBulkCopy(targetConnection)
					{
						DestinationTableName = targetTable
					};

					var dataTable = new DataTable();
					foreach (var item in list)
					{
						dataTable.Columns.Add(item.ColumnName, item.DataType);
					}
					while (hasData)
					{
						hasData = await reader.ReadAsync();
						if (hasData)
						{
							List<object?> values = new List<object?>();
							// 执行批量插入操作
							foreach (string columnName in columnNames)
							{
								values.Add(reader[columnName]);
							}
							dataTable.Rows.Add(values.ToArray());
							count++;
						}
						else if (count != 0)
						{
							await bulkCopy.WriteToServerAsync(dataTable);
							break;
						}

						if (count == 5000)
						{
							await bulkCopy.WriteToServerAsync(dataTable);
							break;
						}
					}
				}
			}

如果每次批量 count=10_0000 时,对于 13 万条数据,笔者测试只需约 6 秒。

痴者工良

高级程序员劝退师

文章评论