From d3fd02200098a01ce51b0776d6be22989b2ca53a Mon Sep 17 00:00:00 2001 From: 2881099 <2881099@qq.com> Date: Wed, 18 Nov 2020 09:42:09 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E5=A2=9E=E5=8A=A0=20Oracle/=E8=BE=BE?= =?UTF-8?q?=E6=A2=A6=20BulkCopy=20=E6=94=AF=E6=8C=81=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- FreeSql.DbContext/FreeSql.DbContext.xml | 9 ++ .../Dameng/Curd/DamengInsertTest.cs | 10 ++ .../Oracle/Curd/OracleInsertTest.cs | 20 ++++ .../Curd/DamengInsert.cs | 4 + .../DamengExtensions.cs | 93 +++++++++++++++++- .../FreeSqlMySqlConnectorGlobalExtensions.cs | 4 +- .../Curd/OracleInsert.cs | 4 + .../FreeSql.Provider.Oracle.csproj | 4 +- .../OracleExtensions.cs | 95 ++++++++++++++++++- .../SqlServerExtensions.cs | 8 +- 10 files changed, 239 insertions(+), 12 deletions(-) diff --git a/FreeSql.DbContext/FreeSql.DbContext.xml b/FreeSql.DbContext/FreeSql.DbContext.xml index b3c14870..e4208f1f 100644 --- a/FreeSql.DbContext/FreeSql.DbContext.xml +++ b/FreeSql.DbContext/FreeSql.DbContext.xml @@ -502,5 +502,14 @@ + + + 批量注入 Repository,可以参考代码自行调整 + + + + + + diff --git a/FreeSql.Tests/FreeSql.Tests/Dameng/Curd/DamengInsertTest.cs b/FreeSql.Tests/FreeSql.Tests/Dameng/Curd/DamengInsertTest.cs index fbbf4931..3929e55d 100644 --- a/FreeSql.Tests/FreeSql.Tests/Dameng/Curd/DamengInsertTest.cs +++ b/FreeSql.Tests/FreeSql.Tests/Dameng/Curd/DamengInsertTest.cs @@ -200,6 +200,16 @@ INTO ""TB_TOPIC_INSERT""(""CLICKS"") VALUES(900) //var items2 = insert.AppendData(items).ExecuteInserted(); } + //[Fact] + //public void ExecuteDmBulkCopy() + //{ + // var items = new List(); + // for (var a = 0; a < 10; a++) items.Add(new Topic { Id = a + 1, Title = $"newtitle{a}", Clicks = a * 100, CreateTime = DateTime.Now }); + + // insert.AppendData(items).InsertIdentity().ExecuteDmBulkCopy(); + // //Dm.DmException:The fastloading dll not loading! + //} + [Fact] public void AsTable() { diff --git a/FreeSql.Tests/FreeSql.Tests/Oracle/Curd/OracleInsertTest.cs b/FreeSql.Tests/FreeSql.Tests/Oracle/Curd/OracleInsertTest.cs index 64315042..4c627ad7 100644 --- a/FreeSql.Tests/FreeSql.Tests/Oracle/Curd/OracleInsertTest.cs +++ b/FreeSql.Tests/FreeSql.Tests/Oracle/Curd/OracleInsertTest.cs @@ -200,6 +200,26 @@ INTO ""TB_TOPIC_INSERT""(""CLICKS"") VALUES(:Clicks_9) //var items2 = insert.AppendData(items).ExecuteInserted(); } + [Fact] + public void ExecuteOracleBulkCopy() + { + var items = new List(); + for (var a = 0; a < 10; a++) items.Add(new Topic_bulkcopy { Title = $"newtitle{a}", Clicks = a * 100, CreateTime = DateTime.Now }); + + g.oracle.Insert().AppendData(items).InsertIdentity().ExecuteOracleBulkCopy(); + //insert.AppendData(items).IgnoreColumns(a => new { a.CreateTime, a.Clicks }).ExecuteSqlBulkCopy(); + // System.NotSupportedException:“DataSet does not support System.Nullable<>.” + } + [Table(Name = "tb_topic_bulkcopy")] + class Topic_bulkcopy + { + public Guid Id { get; set; } + public int? Clicks { get; set; } + public TestTypeInfo Type { get; set; } + public string Title { get; set; } + public DateTime CreateTime { get; set; } + } + [Fact] public void AsTable() { diff --git a/Providers/FreeSql.Provider.Dameng/Curd/DamengInsert.cs b/Providers/FreeSql.Provider.Dameng/Curd/DamengInsert.cs index d07499a3..54d465ca 100644 --- a/Providers/FreeSql.Provider.Dameng/Curd/DamengInsert.cs +++ b/Providers/FreeSql.Provider.Dameng/Curd/DamengInsert.cs @@ -19,6 +19,10 @@ namespace FreeSql.Dameng.Curd { } + internal IFreeSql InternalOrm => _orm as IFreeSql; + internal DbConnection InternalConnection => _connection; + internal DbTransaction InternalTransaction => _transaction; + public override int ExecuteAffrows() => base.SplitExecuteAffrows(_batchValuesLimit > 0 ? _batchValuesLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 999); public override long ExecuteIdentity() => base.SplitExecuteIdentity(_batchValuesLimit > 0 ? _batchValuesLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 999); public override List ExecuteInserted() => base.SplitExecuteInserted(_batchValuesLimit > 0 ? _batchValuesLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 999); diff --git a/Providers/FreeSql.Provider.Dameng/DamengExtensions.cs b/Providers/FreeSql.Provider.Dameng/DamengExtensions.cs index a44e710b..7774195f 100644 --- a/Providers/FreeSql.Provider.Dameng/DamengExtensions.cs +++ b/Providers/FreeSql.Provider.Dameng/DamengExtensions.cs @@ -1,4 +1,8 @@ -public static partial class FreeSqlDamengGlobalExtensions +using Dm; +using FreeSql; +using System; + +public static partial class FreeSqlDamengGlobalExtensions { /// @@ -9,4 +13,91 @@ /// public static string FormatDameng(this string that, params object[] args) => _damengAdo.Addslashes(that, args); static FreeSql.Dameng.DamengAdo _damengAdo = new FreeSql.Dameng.DamengAdo(); + + #region ExecuteDmBulkCopy + /// + /// 达梦 CopyBulk 批量插入功能 + /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 + /// 使用 WithConnection/WithTransaction 传入连接/事务对象 + /// 提示:若本方法不能满足,请使用 IInsert<T>.ToDataTable 方法得到 DataTable 对象后,自行处理。 + /// + /// + /// + /// + /// + /// + public static void ExecuteDmBulkCopy(this IInsert that, DmBulkCopyOptions copyOptions = DmBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null) where T : class + { + var insert = that as FreeSql.Dameng.Curd.DamengInsert; + if (insert == null) throw new Exception("ExecuteDmBulkCopy 是 FreeSql.Provider.Dameng 特有的功能"); + + var dt = that.ToDataTable(); + if (dt.Rows.Count == 0) return; + + Action writeToServer = bulkCopy => + { + if (batchSize.HasValue) bulkCopy.BatchSize = batchSize.Value; + if (bulkCopyTimeout.HasValue) bulkCopy.BulkCopyTimeout = bulkCopyTimeout.Value; + bulkCopy.DestinationTableName = dt.TableName; + for (int i = 0; i < dt.Columns.Count; i++) + bulkCopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName); + bulkCopy.WriteToServer(dt); + }; + + try + { + if (insert.InternalConnection == null && insert.InternalTransaction == null) + { + using (var conn = insert.InternalOrm.Ado.MasterPool.Get()) + { + using (var bulkCopy = copyOptions == DmBulkCopyOptions.Default ? + new DmBulkCopy(conn.Value as DmConnection) : + new DmBulkCopy(conn.Value as DmConnection, copyOptions, insert.InternalTransaction as DmTransaction)) + { + writeToServer(bulkCopy); + } + } + } + else if (insert.InternalTransaction != null) + { + using (var bulkCopy = new DmBulkCopy(insert.InternalTransaction.Connection as DmConnection, copyOptions, insert.InternalTransaction as DmTransaction)) + { + writeToServer(bulkCopy); + } + } + else if (insert.InternalConnection != null) + { + var conn = insert.InternalConnection as DmConnection; + var isNotOpen = false; + if (conn.State != System.Data.ConnectionState.Open) + { + isNotOpen = true; + conn.Open(); + } + try + { + using (var bulkCopy = copyOptions == DmBulkCopyOptions.Default ? + new DmBulkCopy(conn) : + new DmBulkCopy(conn, copyOptions, null)) + { + writeToServer(bulkCopy); + } + } + finally + { + if (isNotOpen) + conn.Close(); + } + } + else + { + throw new NotImplementedException("ExecuteDmBulkCopy 未实现错误,请反馈给作者"); + } + } + finally + { + dt.Clear(); + } + } + #endregion } diff --git a/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs b/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs index ed18ca2c..e24ecc00 100644 --- a/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs +++ b/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs @@ -57,7 +57,7 @@ public static class FreeSqlMySqlConnectorGlobalExtensions } else if (insert.InternalTransaction != null) { - writeToServer(new MySqlBulkCopy(insert.InternalTransaction.Connection as MySqlConnection)); + writeToServer(new MySqlBulkCopy(insert.InternalTransaction.Connection as MySqlConnection, insert.InternalTransaction as MySqlTransaction)); } else if (insert.InternalConnection != null) { @@ -116,7 +116,7 @@ public static class FreeSqlMySqlConnectorGlobalExtensions } else if (insert.InternalTransaction != null) { - await writeToServer(new MySqlBulkCopy(insert.InternalTransaction.Connection as MySqlConnection)); + await writeToServer(new MySqlBulkCopy(insert.InternalTransaction.Connection as MySqlConnection, insert.InternalTransaction as MySqlTransaction)); } else if (insert.InternalConnection != null) { diff --git a/Providers/FreeSql.Provider.Oracle/Curd/OracleInsert.cs b/Providers/FreeSql.Provider.Oracle/Curd/OracleInsert.cs index b4a33b6d..abd8a788 100644 --- a/Providers/FreeSql.Provider.Oracle/Curd/OracleInsert.cs +++ b/Providers/FreeSql.Provider.Oracle/Curd/OracleInsert.cs @@ -20,6 +20,10 @@ namespace FreeSql.Oracle.Curd { } + internal IFreeSql InternalOrm => _orm as IFreeSql; + internal DbConnection InternalConnection => _connection; + internal DbTransaction InternalTransaction => _transaction; + public override int ExecuteAffrows() => base.SplitExecuteAffrows(_batchValuesLimit > 0 ? _batchValuesLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 999); public override long ExecuteIdentity() => base.SplitExecuteIdentity(_batchValuesLimit > 0 ? _batchValuesLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 999); public override List ExecuteInserted() => base.SplitExecuteInserted(_batchValuesLimit > 0 ? _batchValuesLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 999); diff --git a/Providers/FreeSql.Provider.Oracle/FreeSql.Provider.Oracle.csproj b/Providers/FreeSql.Provider.Oracle/FreeSql.Provider.Oracle.csproj index 4f95f860..dbad8099 100644 --- a/Providers/FreeSql.Provider.Oracle/FreeSql.Provider.Oracle.csproj +++ b/Providers/FreeSql.Provider.Oracle/FreeSql.Provider.Oracle.csproj @@ -26,11 +26,11 @@ - + - + diff --git a/Providers/FreeSql.Provider.Oracle/OracleExtensions.cs b/Providers/FreeSql.Provider.Oracle/OracleExtensions.cs index 8913644b..dff0b1c9 100644 --- a/Providers/FreeSql.Provider.Oracle/OracleExtensions.cs +++ b/Providers/FreeSql.Provider.Oracle/OracleExtensions.cs @@ -1,4 +1,8 @@ -public static partial class FreeSqlOracleGlobalExtensions +using FreeSql; +using Oracle.ManagedDataAccess.Client; +using System; + +public static partial class FreeSqlOracleGlobalExtensions { /// @@ -9,4 +13,93 @@ /// public static string FormatOracle(this string that, params object[] args) => _oracleAdo.Addslashes(that, args); static FreeSql.Oracle.OracleAdo _oracleAdo = new FreeSql.Oracle.OracleAdo(); + + #region ExecuteOracleBulkCopy + /// + /// Oracle CopyBulk 批量插入功能 + /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 + /// 使用 WithConnection/WithTransaction 传入连接/事务对象 + /// 提示:若本方法不能满足,请使用 IInsert<T>.ToDataTable 方法得到 DataTable 对象后,自行处理。 + /// + /// + /// + /// + /// + /// + public static void ExecuteOracleBulkCopy(this IInsert that, OracleBulkCopyOptions copyOptions = OracleBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null) where T : class + { + var insert = that as FreeSql.Oracle.Curd.OracleInsert; + if (insert == null) throw new Exception("ExecuteOracleBulkCopy 是 FreeSql.Provider.Oracle 特有的功能"); + + var dt = that.ToDataTable(); + if (dt.Rows.Count == 0) return; + + Action writeToServer = bulkCopy => + { + if (batchSize.HasValue) bulkCopy.BatchSize = batchSize.Value; + if (bulkCopyTimeout.HasValue) bulkCopy.BulkCopyTimeout = bulkCopyTimeout.Value; + bulkCopy.DestinationTableName = dt.TableName; + for (int i = 0; i < dt.Columns.Count; i++) + bulkCopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName); + bulkCopy.WriteToServer(dt); + }; + + try + { + if (insert.InternalConnection == null && insert.InternalTransaction == null) + { + using (var conn = insert.InternalOrm.Ado.MasterPool.Get()) + { + using (var bulkCopy = copyOptions == OracleBulkCopyOptions.Default ? + new OracleBulkCopy(conn.Value as OracleConnection) : + new OracleBulkCopy(conn.Value as OracleConnection, copyOptions)) + { + writeToServer(bulkCopy); + } + } + } + else if (insert.InternalTransaction != null) + { + using (var bulkCopy = copyOptions == OracleBulkCopyOptions.Default ? + new OracleBulkCopy(insert.InternalTransaction.Connection as OracleConnection) : + new OracleBulkCopy(insert.InternalTransaction.Connection as OracleConnection, copyOptions)) + { + writeToServer(bulkCopy); + } + } + else if (insert.InternalConnection != null) + { + var conn = insert.InternalConnection as OracleConnection; + var isNotOpen = false; + if (conn.State != System.Data.ConnectionState.Open) + { + isNotOpen = true; + conn.Open(); + } + try + { + using (var bulkCopy = copyOptions == OracleBulkCopyOptions.Default ? + new OracleBulkCopy(conn) : + new OracleBulkCopy(conn, copyOptions)) + { + writeToServer(bulkCopy); + } + } + finally + { + if (isNotOpen) + conn.Close(); + } + } + else + { + throw new NotImplementedException("ExecuteOracleBulkCopy 未实现错误,请反馈给作者"); + } + } + finally + { + dt.Clear(); + } + } + #endregion } diff --git a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs index a3fe2972..b96e4b42 100644 --- a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs +++ b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs @@ -104,9 +104,7 @@ public static partial class FreeSqlSqlServerGlobalExtensions } else if (insert.InternalTransaction != null) { - using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? - new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection) : - new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection, copyOptions, insert.InternalTransaction as SqlTransaction)) + using (var bulkCopy = new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection, copyOptions, insert.InternalTransaction as SqlTransaction)) { writeToServer(bulkCopy); } @@ -181,9 +179,7 @@ public static partial class FreeSqlSqlServerGlobalExtensions } else if (insert.InternalTransaction != null) { - using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? - new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection) : - new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection, copyOptions, insert.InternalTransaction as SqlTransaction)) + using (var bulkCopy = new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection, copyOptions, insert.InternalTransaction as SqlTransaction)) { await writeToServerAsync(bulkCopy); }