From 41ba6c37bec206a0fda7aac10779ddb111e415a9 Mon Sep 17 00:00:00 2001 From: 2881099 <2881099@qq.com> Date: Thu, 16 Mar 2023 09:08:29 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E5=A2=9E=E5=8A=A0=20Oracle/=E8=BE=BE?= =?UTF-8?q?=E6=A2=A6=20=E6=89=B9=E9=87=8F=E6=9B=B4=E6=96=B0=20BulkCopy=20?= =?UTF-8?q?=E6=89=A9=E5=B1=95=E6=96=B9=E6=B3=95=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Oracle/Curd/OracleInsertTest.cs | 9 ++- .../Internal/CommonProvider/UpdateProvider.cs | 9 +++ .../DamengExtensions.cs | 57 +++++++++++++++++++ .../OracleExtensions.cs | 56 ++++++++++++++++++ 4 files changed, 128 insertions(+), 3 deletions(-) diff --git a/FreeSql.Tests/FreeSql.Tests/Oracle/Curd/OracleInsertTest.cs b/FreeSql.Tests/FreeSql.Tests/Oracle/Curd/OracleInsertTest.cs index 6b75fa19..7780e20c 100644 --- a/FreeSql.Tests/FreeSql.Tests/Oracle/Curd/OracleInsertTest.cs +++ b/FreeSql.Tests/FreeSql.Tests/Oracle/Curd/OracleInsertTest.cs @@ -1,4 +1,4 @@ -using FreeSql.DataAnnotations; +using FreeSql.DataAnnotations; using System; using System.Collections.Generic; using System.Linq; @@ -235,13 +235,16 @@ INTO ""TB_TOPIC_INSERT""(""CLICKS"") VALUES(:Clicks_9) public void ExecuteOracleBulkCopy() { var items = new List(); - for (var a = 0; a < 10; a++) items.Add(new Topic_bulkcopy { Title = $"newtitle{a}", Clicks = a * 100, CreateTime = DateTime.Now }); + for (var a = 0; a < 100; a++) items.Add(new Topic_bulkcopy { Title = $"newtitle{a}", Clicks = a * 100, CreateTime = DateTime.Now }); + g.oracle.Delete().Where("1=1").ExecuteAffrows(); g.oracle.Insert().AppendData(items).InsertIdentity().ExecuteOracleBulkCopy(); //insert.AppendData(items).IgnoreColumns(a => new { a.CreateTime, a.Clicks }).ExecuteSqlBulkCopy(); // System.NotSupportedException:“DataSet does not support System.Nullable<>.” + + g.oracle.Update().SetSource(items).ExecuteOracleBulkCopy(); } - [Table(Name = "tb_topic_bulkcopy")] + [Table(Name = "tb_topic_bk1")] class Topic_bulkcopy { public Guid Id { get; set; } diff --git a/FreeSql/Internal/CommonProvider/UpdateProvider.cs b/FreeSql/Internal/CommonProvider/UpdateProvider.cs index 300459fe..467bd1f5 100644 --- a/FreeSql/Internal/CommonProvider/UpdateProvider.cs +++ b/FreeSql/Internal/CommonProvider/UpdateProvider.cs @@ -71,6 +71,15 @@ namespace FreeSql.Internal.CommonProvider .AsTable(state.Item4); (insert as InsertProvider)._isAutoSyncStructure = false; funcBulkCopy(insert); + switch (fsql.Ado.DataType) + { + case DataType.Oracle: + case DataType.OdbcOracle: + case DataType.CustomOracle: + case DataType.Dameng: + case DataType.OdbcDameng: + return fsql.Ado.CommandFluent(state.Item2).WithConnection(connection).WithTransaction(transaction).ExecuteNonQuery(); + } var affrows = fsql.Ado.CommandFluent(state.Item2 + ";\r\n" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQuery(); droped = true; return affrows; diff --git a/Providers/FreeSql.Provider.Dameng/DamengExtensions.cs b/Providers/FreeSql.Provider.Dameng/DamengExtensions.cs index ba3573ab..cfb256cb 100644 --- a/Providers/FreeSql.Provider.Dameng/DamengExtensions.cs +++ b/Providers/FreeSql.Provider.Dameng/DamengExtensions.cs @@ -1,6 +1,11 @@ using Dm; using FreeSql; +using FreeSql.Internal.CommonProvider; +using FreeSql.Internal.Model; using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; public static partial class FreeSqlDamengGlobalExtensions { @@ -15,6 +20,58 @@ public static partial class FreeSqlDamengGlobalExtensions static FreeSql.Dameng.DamengAdo _damengAdo = new FreeSql.Dameng.DamengAdo(); #region ExecuteDmBulkCopy + + /// + /// 批量更新(更新字段数量超过 2000 时收益大) + /// 实现原理:使用 OracleBulkCopy 插入临时表,再使用 MERGE INTO 联表更新 + /// + /// + /// + /// + public static int ExecuteDmBulkCopy(this IUpdate that) where T : class + { + var update = that as UpdateProvider; + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; + var state = ExecuteDmBulkCopyState(update); + return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecuteDmBulkCopy()); + } + static NativeTuple ExecuteDmBulkCopyState(UpdateProvider update) where T : class + { + if (update._source.Any() != true) return null; + var _table = update._table; + var _commonUtils = update._commonUtils; + var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName; + var tempTableName = $"Temp_{Guid.NewGuid().ToString("N").ToUpper().Substring(0, 24)}"; + if (update._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); + if (update._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); + if (update._connection == null && update._orm.Ado.TransactionCurrentThread != null) + update.WithTransaction(update._orm.Ado.TransactionCurrentThread); + var sb = new StringBuilder().Append("CREATE GLOBAL TEMPORARY TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); + var setColumns = new List(); + var pkColumns = new List(); + foreach (var col in _table.Columns.Values) + { + if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name); + else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name); + else continue; + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", "")); + sb.Append(","); + } + var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) ON COMMIT PRESERVE ROWS").ToString(); + + sb.Clear().Append("MERGE INTO ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") + .Append(" \r\nUSING ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b ON (").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))) + .Append(") \r\nWHEN MATCHED THEN") + .Append(" \r\nUPDATE SET ").Append(string.Join(", \r\n ", setColumns.Select(col => $"{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); + var sql2 = sb.ToString(); + sb.Clear(); + var sql3 = $"BEGIN \r\n" + + $"execute immediate 'TRUNCATE TABLE {_commonUtils.QuoteSqlName(tempTableName)}';\r\n" + + $"execute immediate 'DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}';\r\n" + + $"END;"; + return NativeTuple.Create(sql1, sql2, sql3, tempTableName, pkColumns.Concat(setColumns).ToArray()); + } + /// /// 达梦 CopyBulk 批量插入功能 /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 diff --git a/Providers/FreeSql.Provider.Oracle/OracleExtensions.cs b/Providers/FreeSql.Provider.Oracle/OracleExtensions.cs index c3409da5..13ff188f 100644 --- a/Providers/FreeSql.Provider.Oracle/OracleExtensions.cs +++ b/Providers/FreeSql.Provider.Oracle/OracleExtensions.cs @@ -1,10 +1,15 @@ using FreeSql; +using FreeSql.Internal.CommonProvider; +using FreeSql.Internal.Model; #if oledb using System.Data.OleDb; #else using Oracle.ManagedDataAccess.Client; #endif using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; public static partial class FreeSqlOracleGlobalExtensions { @@ -21,6 +26,57 @@ public static partial class FreeSqlOracleGlobalExtensions #if oledb #else #region ExecuteOracleBulkCopy + /// + /// 批量更新(更新字段数量超过 2000 时收益大) + /// 实现原理:使用 OracleBulkCopy 插入临时表,再使用 MERGE INTO 联表更新 + /// + /// + /// + /// + public static int ExecuteOracleBulkCopy(this IUpdate that) where T : class + { + var update = that as UpdateProvider; + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; + var state = ExecuteOracleBulkCopyState(update); + return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecuteOracleBulkCopy()); + } + static NativeTuple ExecuteOracleBulkCopyState(UpdateProvider update) where T : class + { + if (update._source.Any() != true) return null; + var _table = update._table; + var _commonUtils = update._commonUtils; + var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName; + var tempTableName = $"Temp_{Guid.NewGuid().ToString("N").ToUpper().Substring(0, 24)}"; + if (update._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); + if (update._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); + if (update._connection == null && update._orm.Ado.TransactionCurrentThread != null) + update.WithTransaction(update._orm.Ado.TransactionCurrentThread); + var sb = new StringBuilder().Append("CREATE GLOBAL TEMPORARY TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); + var setColumns = new List(); + var pkColumns = new List(); + foreach (var col in _table.Columns.Values) + { + if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name); + else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name); + else continue; + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", "")); + sb.Append(","); + } + var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) ON COMMIT PRESERVE ROWS").ToString(); + + sb.Clear().Append("MERGE INTO ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") + .Append(" \r\nUSING ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b ON (").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))) + .Append(") \r\nWHEN MATCHED THEN") + .Append(" \r\nUPDATE SET ").Append(string.Join(", \r\n ", setColumns.Select(col => $"{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); + var sql2 = sb.ToString(); + sb.Clear(); + var sql3 = $"BEGIN \r\n" + + $"execute immediate 'TRUNCATE TABLE {_commonUtils.QuoteSqlName(tempTableName)}';\r\n" + + $"execute immediate 'DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}';\r\n" + + $"END;"; + return NativeTuple.Create(sql1, sql2, sql3, tempTableName, pkColumns.Concat(setColumns).ToArray()); + } + /// /// Oracle CopyBulk 批量插入功能 /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列