From 32ea064c4b13482e4bbec09a42b50f2cf8f5460f Mon Sep 17 00:00:00 2001 From: 2881099 <2881099@qq.com> Date: Fri, 24 Mar 2023 22:34:32 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E6=B7=BB=E5=8A=A0=20InsertOrUpdate=20?= =?UTF-8?q?=E9=AB=98=E6=80=A7=E8=83=BD=E6=96=B9=E6=B3=95=20ExecuteSqlBulkC?= =?UTF-8?q?opy=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Examples/base_entity/Program.cs | 4 ++ FreeSql.DbContext/FreeSql.DbContext.xml | 9 +++ .../Internal/CommonProvider/UpdateProvider.cs | 49 +++++++++++----- .../SqlServerExtensions.cs | 58 ++++++++++++++++++- 4 files changed, 105 insertions(+), 15 deletions(-) diff --git a/Examples/base_entity/Program.cs b/Examples/base_entity/Program.cs index 1461666a..b2701460 100644 --- a/Examples/base_entity/Program.cs +++ b/Examples/base_entity/Program.cs @@ -578,6 +578,10 @@ namespace base_entity BaseEntity.Initialization(fsql, () => _asyncUow.Value); #endregion + fsql.InsertOrUpdate() + .SetSource(fsql.Select().ToList()) + .ExecuteSqlBulkCopy(); + var updatejoin01 = fsql.Update() .Join(fsql.Select(), (a, b) => a.GroupId == b.Id) .Set((a, b) => a.Nickname == b.GroupName) diff --git a/FreeSql.DbContext/FreeSql.DbContext.xml b/FreeSql.DbContext/FreeSql.DbContext.xml index 537315e2..26522f10 100644 --- a/FreeSql.DbContext/FreeSql.DbContext.xml +++ b/FreeSql.DbContext/FreeSql.DbContext.xml @@ -800,5 +800,14 @@ + + + 批量注入 Repository,可以参考代码自行调整 + + + + + + diff --git a/FreeSql/Internal/CommonProvider/UpdateProvider.cs b/FreeSql/Internal/CommonProvider/UpdateProvider.cs index 467bd1f5..ddf65a6a 100644 --- a/FreeSql/Internal/CommonProvider/UpdateProvider.cs +++ b/FreeSql/Internal/CommonProvider/UpdateProvider.cs @@ -41,12 +41,18 @@ namespace FreeSql.Internal.CommonProvider public bool _isAutoSyncStructure; - public static int ExecuteBulkUpdate(UpdateProvider update, NativeTuple state, Action> funcBulkCopy) where T1 : class + public static int ExecuteBulkUpdate(UpdateProvider update, NativeTuple state, Action> funcBulkCopy) where T1 : class => + ExecuteBulkCommand(update._source, update._tempPrimarys, update._orm, update._connection, update._transaction, update._table, state, funcBulkCopy); + public static int ExecuteBulkUpsert(InsertOrUpdateProvider upsert, NativeTuple state, Action> funcBulkCopy) where T1 : class => + ExecuteBulkCommand(upsert._source, upsert._tempPrimarys, upsert._orm, upsert._connection, upsert._transaction, upsert._table, state, funcBulkCopy); + + public static int ExecuteBulkCommand(List _source, ColumnInfo[] _tempPrimarys, IFreeSql _orm, DbConnection _connection, DbTransaction _transaction, TableInfo _table, + NativeTuple state, Action> funcBulkCopy) where T1 : class { - if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; - var fsql = update._orm; - var connection = update._connection; - var transaction = update._transaction; + if (_source.Any() != true || _tempPrimarys.Any() == false) return 0; + var fsql = _orm; + var connection = _connection; + var transaction = _transaction; Object poolConn = null; if (connection == null) @@ -61,9 +67,9 @@ namespace FreeSql.Internal.CommonProvider try { var insert = fsql.Insert(); - (insert as InsertProvider)._source.AddRange(update._source); //不能直接 AppendData,防止触发 Aop.AuditValue + (insert as InsertProvider)._source.AddRange(_source); //不能直接 AppendData,防止触发 Aop.AuditValue insert - .AsType(update._table.Type) + .AsType(_table.Type) .WithConnection(connection) .WithTransaction(transaction) .InsertIdentity() @@ -96,12 +102,18 @@ namespace FreeSql.Internal.CommonProvider } #if net40 #else - async public static Task ExecuteBulkUpdateAsync(UpdateProvider update, NativeTuple state, Func, Task> funcBulkCopy) where T1 : class + public static Task ExecuteBulkUpdateAsync(UpdateProvider update, NativeTuple state, Func, Task> funcBulkCopy) where T1 : class => + ExecuteBulkCommandAsync(update._source, update._tempPrimarys, update._orm, update._connection, update._transaction, update._table, state, funcBulkCopy); + public static Task ExecuteBulkUpsertAsync(InsertOrUpdateProvider upsert, NativeTuple state, Func, Task> funcBulkCopy) where T1 : class => + ExecuteBulkCommandAsync(upsert._source, upsert._tempPrimarys, upsert._orm, upsert._connection, upsert._transaction, upsert._table, state, funcBulkCopy); + + async public static Task ExecuteBulkCommandAsync(List _source, ColumnInfo[] _tempPrimarys, IFreeSql _orm, DbConnection _connection, DbTransaction _transaction, TableInfo _table, + NativeTuple state, Func, Task> funcBulkCopy) where T1 : class { - if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; - var fsql = update._orm; - var connection = update._connection; - var transaction = update._transaction; + if (_source.Any() != true || _tempPrimarys.Any() == false) return 0; + var fsql = _orm; + var connection = _connection; + var transaction = _transaction; Object poolConn = null; if (connection == null) @@ -116,9 +128,9 @@ namespace FreeSql.Internal.CommonProvider try { var insert = fsql.Insert(); - (insert as InsertProvider)._source.AddRange(update._source); //不能直接 AppendData,防止触发 Aop.AuditValue + (insert as InsertProvider)._source.AddRange(_source); //不能直接 AppendData,防止触发 Aop.AuditValue insert - .AsType(update._table.Type) + .AsType(_table.Type) .WithConnection(connection) .WithTransaction(transaction) .InsertIdentity() @@ -126,6 +138,15 @@ namespace FreeSql.Internal.CommonProvider .AsTable(state.Item4); (insert as InsertProvider)._isAutoSyncStructure = false; await funcBulkCopy(insert); + switch (fsql.Ado.DataType) + { + case DataType.Oracle: + case DataType.OdbcOracle: + case DataType.CustomOracle: + case DataType.Dameng: + case DataType.OdbcDameng: + return await fsql.Ado.CommandFluent(state.Item2).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync(); + } var affrows = await fsql.Ado.CommandFluent(state.Item2 + ";\r\n" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync(); droped = true; return affrows; diff --git a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs index 91880a60..5b135927 100644 --- a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs +++ b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs @@ -120,6 +120,55 @@ public static partial class FreeSqlSqlServerGlobalExtensions #region ExecuteSqlBulkCopy /// + /// 批量插入或更新(操作的字段数量超过 2000 时收益大) + /// 实现原理:使用 SqlBulkCopy 插入临时表,再执行 MERGE INTO t1 using (select * from #temp) ... + /// + /// + /// + /// + /// + /// + /// + public static int ExecuteSqlBulkCopy(this IInsertOrUpdate that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null) where T : class + { + var upsert = that as InsertOrUpdateProvider; + if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return 0; + var state = ExecuteSqlBulkCopyState(upsert); + return UpdateProvider.ExecuteBulkUpsert(upsert, state, insert => insert.ExecuteSqlBulkCopy(copyOptions, batchSize, bulkCopyTimeout)); + + } + static NativeTuple ExecuteSqlBulkCopyState(InsertOrUpdateProvider upsert) where T : class + { + if (upsert._source.Any() != true) return null; + var _table = upsert._table; + var _commonUtils = upsert._commonUtils; + var updateTableName = upsert._tableRule?.Invoke(_table.DbName) ?? _table.DbName; + var tempTableName = $"#Temp_{updateTableName}"; + if (upsert._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); + if (upsert._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); + if (upsert._connection == null && upsert._orm.Ado.TransactionCurrentThread != null) + upsert.WithTransaction(upsert._orm.Ado.TransactionCurrentThread); + var setColumns = new List(); + var pkColumns = new List(); + foreach (var col in _table.Columns.Values) + { + if (upsert._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name); + else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && upsert._updateIgnore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name); + } + var sql1 = $"SELECT {string.Join(", ", pkColumns.Select(a => _commonUtils.QuoteSqlName(a)))}, {string.Join(", ", setColumns.Select(a => _commonUtils.QuoteSqlName(a)))} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2"; + try + { + upsert._sourceSql = $"select * from {tempTableName}"; + var sql2 = upsert.ToSql(); + var sql3 = $"DROP TABLE {tempTableName}"; + return NativeTuple.Create(sql1, sql2, sql3, tempTableName, pkColumns.Concat(setColumns).ToArray()); + } + finally + { + upsert._sourceSql = null; + } + } + /// /// 批量更新(更新字段数量超过 2000 时收益大) /// 实现原理:使用 SqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 /// @@ -154,7 +203,7 @@ public static partial class FreeSqlSqlServerGlobalExtensions if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name); else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name); } - var sql1 = $"SELECT {string.Join(", ", pkColumns)}, {string.Join(", ", setColumns)} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2"; + var sql1 = $"SELECT {string.Join(", ", pkColumns.Select(a => _commonUtils.QuoteSqlName(a)))}, {string.Join(", ", setColumns.Select(a => _commonUtils.QuoteSqlName(a)))} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2"; var sb = new StringBuilder().Append("UPDATE ").Append(" a SET \r\n ").Append(string.Join(", \r\n ", setColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); sb.Append(" \r\nFROM ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") .Append(" \r\nINNER JOIN ").Append(tempTableName).Append(" b ON ").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); @@ -263,6 +312,13 @@ public static partial class FreeSqlSqlServerGlobalExtensions } #if net40 #else + public static Task ExecuteSqlBulkCopyAsync(this IInsertOrUpdate that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class + { + var upsert = that as InsertOrUpdateProvider; + if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return Task.FromResult(0); + var state = ExecuteSqlBulkCopyState(upsert); + return UpdateProvider.ExecuteBulkUpsertAsync(upsert, state, insert => insert.ExecuteSqlBulkCopyAsync(copyOptions, batchSize, bulkCopyTimeout, cancellationToken)); + } public static Task ExecuteSqlBulkCopyAsync(this IUpdate that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class { var update = that as UpdateProvider;