FreeSql/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs
2020-11-18 09:42:09 +08:00

153 lines
5.9 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeSql;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
#if MySqlConnector
using MySqlConnector;
#else
using MySql.Data.MySqlClient;
#endif
public static class FreeSqlMySqlConnectorGlobalExtensions
{
#region ExecuteMySqlBulkCopy
/// <summary>
/// MySql MySqlCopyBulk 批量插入功能<para></para>
/// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列<para></para>
/// 使用 WithConnection/WithTransaction 传入连接/事务对象<para></para>
/// 提示:若本方法不能满足,请使用 IInsert&lt;T&gt;.ToDataTable 方法得到 DataTable 对象后,自行处理。<para></para>
/// MySqlCopyBulk 与 insert into t values(..),(..),(..) 性能测试参考:<para></para>
/// 插入180000行52列28,405ms 与 38,481ms10列6,504ms 与 11,171ms<para></para>
/// 插入10000行52列1,142ms 与 2,234ms10列339ms 与 866ms<para></para>
/// 插入5000行52列657ms 与 1,136ms10列257ms 与 366ms<para></para>
/// 插入2000行52列451ms 与 284ms10列116ms 与 80ms<para></para>
/// 插入1000行52列435ms 与 239ms10列87ms 与 83ms<para></para>
/// 插入500行52列592ms 与 167ms10列100ms 与 50ms<para></para>
/// 插入100行52列47ms 与 66ms10列16ms 与 24ms<para></para>
/// 插入50行52列22ms 与 30ms10列16ms 与 34ms<para></para>
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="that"></param>
/// <param name="bulkCopyTimeout"></param>
public static void ExecuteMySqlBulkCopy<T>(this IInsert<T> that, int? bulkCopyTimeout = null) where T : class
{
var insert = that as FreeSql.MySql.Curd.MySqlInsert<T>;
if (insert == null) throw new Exception("ExecuteMySqlBulkCopy 是 FreeSql.Provider.MySqlConnector 特有的功能");
var dt = that.ToDataTable();
if (dt.Rows.Count == 0) return;
Action<MySqlBulkCopy> writeToServer = bulkCopy =>
{
if (bulkCopyTimeout.HasValue) bulkCopy.BulkCopyTimeout = bulkCopyTimeout.Value;
bulkCopy.DestinationTableName = dt.TableName;
bulkCopy.WriteToServer(dt);
};
try
{
if (insert.InternalConnection == null && insert.InternalTransaction == null)
{
using (var conn = insert.InternalOrm.Ado.MasterPool.Get())
{
writeToServer(new MySqlBulkCopy(conn.Value as MySqlConnection));
}
}
else if (insert.InternalTransaction != null)
{
writeToServer(new MySqlBulkCopy(insert.InternalTransaction.Connection as MySqlConnection, insert.InternalTransaction as MySqlTransaction));
}
else if (insert.InternalConnection != null)
{
var conn = insert.InternalConnection as MySqlConnection;
var isNotOpen = false;
if (conn.State != System.Data.ConnectionState.Open)
{
isNotOpen = true;
conn.Open();
}
try
{
writeToServer(new MySqlBulkCopy(conn));
}
finally
{
if (isNotOpen)
conn.Close();
}
}
else
{
throw new NotImplementedException("ExecuteMySqlBulkCopy 未实现错误,请反馈给作者");
}
}
finally
{
dt.Clear();
}
}
#if net40
#else
async public static Task ExecuteMySqlBulkCopyAsync<T>(this IInsert<T> that, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
{
var insert = that as FreeSql.MySql.Curd.MySqlInsert<T>;
if (insert == null) throw new Exception("ExecuteMySqlBulkCopyAsync 是 FreeSql.Provider.MySqlConnector 特有的功能");
var dt = that.ToDataTable();
if (dt.Rows.Count == 0) return;
Func<MySqlBulkCopy, Task> writeToServer = bulkCopy =>
{
if (bulkCopyTimeout.HasValue) bulkCopy.BulkCopyTimeout = bulkCopyTimeout.Value;
bulkCopy.DestinationTableName = dt.TableName;
return bulkCopy.WriteToServerAsync(dt, cancellationToken);
};
try
{
if (insert.InternalConnection == null && insert.InternalTransaction == null)
{
using (var conn = insert.InternalOrm.Ado.MasterPool.Get())
{
await writeToServer(new MySqlBulkCopy(conn.Value as MySqlConnection));
}
}
else if (insert.InternalTransaction != null)
{
await writeToServer(new MySqlBulkCopy(insert.InternalTransaction.Connection as MySqlConnection, insert.InternalTransaction as MySqlTransaction));
}
else if (insert.InternalConnection != null)
{
var conn = insert.InternalConnection as MySqlConnection;
var isNotOpen = false;
if (conn.State != System.Data.ConnectionState.Open)
{
isNotOpen = true;
await conn.OpenAsync(cancellationToken);
}
try
{
await writeToServer(new MySqlBulkCopy(conn));
}
finally
{
if (isNotOpen)
await conn.CloseAsync();
}
}
else
{
throw new NotImplementedException("ExecuteMySqlBulkCopyAsync 未实现错误,请反馈给作者");
}
}
finally
{
dt.Clear();
}
}
#endif
#endregion
}