- 添加 InsertOrUpdate 高性能方法 ExecuteSqlBulkCopy;

This commit is contained in:
2881099 2023-03-24 22:34:32 +08:00
parent b2f484dbc4
commit 32ea064c4b
4 changed files with 105 additions and 15 deletions

View File

@ -578,6 +578,10 @@ namespace base_entity
BaseEntity.Initialization(fsql, () => _asyncUow.Value); BaseEntity.Initialization(fsql, () => _asyncUow.Value);
#endregion #endregion
fsql.InsertOrUpdate<User1>()
.SetSource(fsql.Select<User1>().ToList())
.ExecuteSqlBulkCopy();
var updatejoin01 = fsql.Update<User1>() var updatejoin01 = fsql.Update<User1>()
.Join(fsql.Select<UserGroup>(), (a, b) => a.GroupId == b.Id) .Join(fsql.Select<UserGroup>(), (a, b) => a.GroupId == b.Id)
.Set((a, b) => a.Nickname == b.GroupName) .Set((a, b) => a.Nickname == b.GroupName)

View File

@ -800,5 +800,14 @@
<param name="that"></param> <param name="that"></param>
<returns></returns> <returns></returns>
</member> </member>
<member name="M:Microsoft.Extensions.DependencyInjection.FreeSqlRepositoryDependencyInjection.AddFreeRepository(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Action{FreeSql.FluentDataFilter},System.Reflection.Assembly[])">
<summary>
批量注入 Repository可以参考代码自行调整
</summary>
<param name="services"></param>
<param name="globalDataFilter"></param>
<param name="assemblies"></param>
<returns></returns>
</member>
</members> </members>
</doc> </doc>

View File

@ -41,12 +41,18 @@ namespace FreeSql.Internal.CommonProvider
public bool _isAutoSyncStructure; public bool _isAutoSyncStructure;
public static int ExecuteBulkUpdate<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string, string[]> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class public static int ExecuteBulkUpdate<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string, string[]> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class =>
ExecuteBulkCommand(update._source, update._tempPrimarys, update._orm, update._connection, update._transaction, update._table, state, funcBulkCopy);
public static int ExecuteBulkUpsert<T1>(InsertOrUpdateProvider<T1> upsert, NativeTuple<string, string, string, string, string[]> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class =>
ExecuteBulkCommand(upsert._source, upsert._tempPrimarys, upsert._orm, upsert._connection, upsert._transaction, upsert._table, state, funcBulkCopy);
public static int ExecuteBulkCommand<T1>(List<T1> _source, ColumnInfo[] _tempPrimarys, IFreeSql _orm, DbConnection _connection, DbTransaction _transaction, TableInfo _table,
NativeTuple<string, string, string, string, string[]> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class
{ {
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; if (_source.Any() != true || _tempPrimarys.Any() == false) return 0;
var fsql = update._orm; var fsql = _orm;
var connection = update._connection; var connection = _connection;
var transaction = update._transaction; var transaction = _transaction;
Object<DbConnection> poolConn = null; Object<DbConnection> poolConn = null;
if (connection == null) if (connection == null)
@ -61,9 +67,9 @@ namespace FreeSql.Internal.CommonProvider
try try
{ {
var insert = fsql.Insert<T1>(); var insert = fsql.Insert<T1>();
(insert as InsertProvider<T1>)._source.AddRange(update._source); //不能直接 AppendData防止触发 Aop.AuditValue (insert as InsertProvider<T1>)._source.AddRange(_source); //不能直接 AppendData防止触发 Aop.AuditValue
insert insert
.AsType(update._table.Type) .AsType(_table.Type)
.WithConnection(connection) .WithConnection(connection)
.WithTransaction(transaction) .WithTransaction(transaction)
.InsertIdentity() .InsertIdentity()
@ -96,12 +102,18 @@ namespace FreeSql.Internal.CommonProvider
} }
#if net40 #if net40
#else #else
async public static Task<int> ExecuteBulkUpdateAsync<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string, string[]> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class public static Task<int> ExecuteBulkUpdateAsync<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string, string[]> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class =>
ExecuteBulkCommandAsync(update._source, update._tempPrimarys, update._orm, update._connection, update._transaction, update._table, state, funcBulkCopy);
public static Task<int> ExecuteBulkUpsertAsync<T1>(InsertOrUpdateProvider<T1> upsert, NativeTuple<string, string, string, string, string[]> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class =>
ExecuteBulkCommandAsync(upsert._source, upsert._tempPrimarys, upsert._orm, upsert._connection, upsert._transaction, upsert._table, state, funcBulkCopy);
async public static Task<int> ExecuteBulkCommandAsync<T1>(List<T1> _source, ColumnInfo[] _tempPrimarys, IFreeSql _orm, DbConnection _connection, DbTransaction _transaction, TableInfo _table,
NativeTuple<string, string, string, string, string[]> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class
{ {
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; if (_source.Any() != true || _tempPrimarys.Any() == false) return 0;
var fsql = update._orm; var fsql = _orm;
var connection = update._connection; var connection = _connection;
var transaction = update._transaction; var transaction = _transaction;
Object<DbConnection> poolConn = null; Object<DbConnection> poolConn = null;
if (connection == null) if (connection == null)
@ -116,9 +128,9 @@ namespace FreeSql.Internal.CommonProvider
try try
{ {
var insert = fsql.Insert<T1>(); var insert = fsql.Insert<T1>();
(insert as InsertProvider<T1>)._source.AddRange(update._source); //不能直接 AppendData防止触发 Aop.AuditValue (insert as InsertProvider<T1>)._source.AddRange(_source); //不能直接 AppendData防止触发 Aop.AuditValue
insert insert
.AsType(update._table.Type) .AsType(_table.Type)
.WithConnection(connection) .WithConnection(connection)
.WithTransaction(transaction) .WithTransaction(transaction)
.InsertIdentity() .InsertIdentity()
@ -126,6 +138,15 @@ namespace FreeSql.Internal.CommonProvider
.AsTable(state.Item4); .AsTable(state.Item4);
(insert as InsertProvider)._isAutoSyncStructure = false; (insert as InsertProvider)._isAutoSyncStructure = false;
await funcBulkCopy(insert); await funcBulkCopy(insert);
switch (fsql.Ado.DataType)
{
case DataType.Oracle:
case DataType.OdbcOracle:
case DataType.CustomOracle:
case DataType.Dameng:
case DataType.OdbcDameng:
return await fsql.Ado.CommandFluent(state.Item2).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync();
}
var affrows = await fsql.Ado.CommandFluent(state.Item2 + ";\r\n" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync(); var affrows = await fsql.Ado.CommandFluent(state.Item2 + ";\r\n" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync();
droped = true; droped = true;
return affrows; return affrows;

View File

@ -120,6 +120,55 @@ public static partial class FreeSqlSqlServerGlobalExtensions
#region ExecuteSqlBulkCopy #region ExecuteSqlBulkCopy
/// <summary> /// <summary>
/// 批量插入或更新(操作的字段数量超过 2000 时收益大)<para></para>
/// 实现原理:使用 SqlBulkCopy 插入临时表,再执行 MERGE INTO t1 using (select * from #temp) ...
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="that"></param>
/// <param name="copyOptions"></param>
/// <param name="batchSize"></param>
/// <param name="bulkCopyTimeout"></param>
/// <returns></returns>
public static int ExecuteSqlBulkCopy<T>(this IInsertOrUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null) where T : class
{
var upsert = that as InsertOrUpdateProvider<T>;
if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return 0;
var state = ExecuteSqlBulkCopyState(upsert);
return UpdateProvider.ExecuteBulkUpsert(upsert, state, insert => insert.ExecuteSqlBulkCopy(copyOptions, batchSize, bulkCopyTimeout));
}
static NativeTuple<string, string, string, string, string[]> ExecuteSqlBulkCopyState<T>(InsertOrUpdateProvider<T> upsert) where T : class
{
if (upsert._source.Any() != true) return null;
var _table = upsert._table;
var _commonUtils = upsert._commonUtils;
var updateTableName = upsert._tableRule?.Invoke(_table.DbName) ?? _table.DbName;
var tempTableName = $"#Temp_{updateTableName}";
if (upsert._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower();
if (upsert._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper();
if (upsert._connection == null && upsert._orm.Ado.TransactionCurrentThread != null)
upsert.WithTransaction(upsert._orm.Ado.TransactionCurrentThread);
var setColumns = new List<string>();
var pkColumns = new List<string>();
foreach (var col in _table.Columns.Values)
{
if (upsert._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name);
else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && upsert._updateIgnore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name);
}
var sql1 = $"SELECT {string.Join(", ", pkColumns.Select(a => _commonUtils.QuoteSqlName(a)))}, {string.Join(", ", setColumns.Select(a => _commonUtils.QuoteSqlName(a)))} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2";
try
{
upsert._sourceSql = $"select * from {tempTableName}";
var sql2 = upsert.ToSql();
var sql3 = $"DROP TABLE {tempTableName}";
return NativeTuple.Create(sql1, sql2, sql3, tempTableName, pkColumns.Concat(setColumns).ToArray());
}
finally
{
upsert._sourceSql = null;
}
}
/// <summary>
/// 批量更新(更新字段数量超过 2000 时收益大)<para></para> /// 批量更新(更新字段数量超过 2000 时收益大)<para></para>
/// 实现原理:使用 SqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 /// 实现原理:使用 SqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新
/// </summary> /// </summary>
@ -154,7 +203,7 @@ public static partial class FreeSqlSqlServerGlobalExtensions
if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name); if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name);
else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name); else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name);
} }
var sql1 = $"SELECT {string.Join(", ", pkColumns)}, {string.Join(", ", setColumns)} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2"; var sql1 = $"SELECT {string.Join(", ", pkColumns.Select(a => _commonUtils.QuoteSqlName(a)))}, {string.Join(", ", setColumns.Select(a => _commonUtils.QuoteSqlName(a)))} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2";
var sb = new StringBuilder().Append("UPDATE ").Append(" a SET \r\n ").Append(string.Join(", \r\n ", setColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); var sb = new StringBuilder().Append("UPDATE ").Append(" a SET \r\n ").Append(string.Join(", \r\n ", setColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}")));
sb.Append(" \r\nFROM ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") sb.Append(" \r\nFROM ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ")
.Append(" \r\nINNER JOIN ").Append(tempTableName).Append(" b ON ").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); .Append(" \r\nINNER JOIN ").Append(tempTableName).Append(" b ON ").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}")));
@ -263,6 +312,13 @@ public static partial class FreeSqlSqlServerGlobalExtensions
} }
#if net40 #if net40
#else #else
public static Task<int> ExecuteSqlBulkCopyAsync<T>(this IInsertOrUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
{
var upsert = that as InsertOrUpdateProvider<T>;
if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return Task.FromResult(0);
var state = ExecuteSqlBulkCopyState(upsert);
return UpdateProvider.ExecuteBulkUpsertAsync(upsert, state, insert => insert.ExecuteSqlBulkCopyAsync(copyOptions, batchSize, bulkCopyTimeout, cancellationToken));
}
public static Task<int> ExecuteSqlBulkCopyAsync<T>(this IUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class public static Task<int> ExecuteSqlBulkCopyAsync<T>(this IUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
{ {
var update = that as UpdateProvider<T>; var update = that as UpdateProvider<T>;