diff --git a/FreeSql/Internal/CommonProvider/InsertProvider.cs b/FreeSql/Internal/CommonProvider/InsertProvider.cs index 296dac53..a501c9c1 100644 --- a/FreeSql/Internal/CommonProvider/InsertProvider.cs +++ b/FreeSql/Internal/CommonProvider/InsertProvider.cs @@ -488,6 +488,7 @@ namespace FreeSql.Internal.CommonProvider if (col.Attribute.IsIdentity == false && _ignore.ContainsKey(col.Attribute.Name)) continue; dt.Columns.Add(col.Attribute.Name, col.Attribute.MapType); } + if (dt.Columns.Count == 0) return dt; foreach (var d in _source) { var row = new object[dt.Columns.Count]; diff --git a/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs b/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs index 5528b09c..dd5aa832 100644 --- a/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs +++ b/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs @@ -1,7 +1,10 @@ using FreeSql; using FreeSql.PostgreSQL.Curd; +using Npgsql; using System; +using System.Data; using System.Linq.Expressions; +using System.Text; public static partial class FreeSqlPostgreSQLGlobalExtensions { @@ -24,4 +27,91 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions /// 默认是以主键作为重复判断,也可以指定其他列:a => a.Name | a => new{a.Name,a.Time} | a => new[]{"name","time"} /// public static OnConflictDoUpdate OnConflictDoUpdate(this IInsert that, Expression> columns = null) where T1 : class => new FreeSql.PostgreSQL.Curd.OnConflictDoUpdate(that.InsertIdentity(), columns); + + /// + /// PostgreSQL COPY 批量导入功能,封装了 NpgsqlConnection.BeginBinaryImport 方法 + /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 + /// 使用 WithConnection/WithTransaction 传入连接/事务对象 + /// 提示:若本方法不能满足,请使用 IInsert<T>.ToDataTable 方法得到 DataTable 对象后,自行处理。 + /// COPY 与 insert into t values(..),(..),(..) 性能测试参考: + /// 插入180000行,52列:10,090ms 与 46,756ms,10列:4,081ms 与 9,786ms + /// 插入10000行,52列:583ms 与 3,294ms,10列:167ms 与 568ms + /// 插入5000行,52列:337ms 与 2,269ms,10列:93ms 与 366ms + /// 插入2000行,52列:136ms 与 1,019ms,10列:39ms 与 157ms + /// 插入1000行,52列:88ms 与 374ms,10列:21ms 与 102ms + /// 插入500行,52列:61ms 与 209ms,10列:12ms 与 34ms + /// 插入100行,52列:30ms 与 51ms,10列:4ms 与 9ms + /// 插入50行,52列:25ms 与 37ms,10列:2ms 与 6ms + /// + /// + /// + public static void ExecutePgCopy(this IInsert that) where T : class + { + var insert = that as FreeSql.PostgreSQL.Curd.PostgreSQLInsert; + if (insert == null) throw new Exception("ExecuteSqlBulkCopy 是 FreeSql.Provider.PostgreSQL 特有的功能"); + + var dt = that.ToDataTable(); + if (dt.Rows.Count == 0) return; + + Action binaryImport = conn => + { + var copyFromCommand = new StringBuilder().Append("COPY ").Append(insert.InternalCommonUtils.QuoteSqlName(dt.TableName)).Append("("); + var colIndex = 0; + foreach (DataColumn col in dt.Columns) + { + if (colIndex++ > 0) copyFromCommand.Append(", "); + copyFromCommand.Append(insert.InternalCommonUtils.QuoteSqlName(col.ColumnName)); + } + copyFromCommand.Append(") FROM STDIN BINARY"); + using (var writer = conn.BeginBinaryImport(copyFromCommand.ToString())) + { + foreach (DataRow item in dt.Rows) + writer.WriteRow(item.ItemArray); + writer.Complete(); + } + copyFromCommand.Clear(); + }; + + try + { + if (insert.InternalConnection == null && insert.InternalTransaction == null) + { + using (var conn = insert.InternalOrm.Ado.MasterPool.Get()) + { + binaryImport(conn.Value as NpgsqlConnection); + } + } + else if (insert.InternalTransaction != null) + { + binaryImport(insert.InternalTransaction.Connection as NpgsqlConnection); + } + else if (insert.InternalConnection != null) + { + var conn = insert.InternalConnection as NpgsqlConnection; + var isNotOpen = false; + if (conn.State != System.Data.ConnectionState.Open) + { + isNotOpen = true; + conn.Open(); + } + try + { + binaryImport(conn); + } + finally + { + if (isNotOpen) + conn.Close(); + } + } + else + { + throw new NotImplementedException("ExecuteCopy 未实现错误,请反馈给作者"); + } + } + finally + { + dt.Clear(); + } + } } diff --git a/Providers/FreeSql.Provider.SqlServer/Curd/SqlServerInsert.cs b/Providers/FreeSql.Provider.SqlServer/Curd/SqlServerInsert.cs index f70d791e..58c74bc1 100644 --- a/Providers/FreeSql.Provider.SqlServer/Curd/SqlServerInsert.cs +++ b/Providers/FreeSql.Provider.SqlServer/Curd/SqlServerInsert.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; using System.Data; using System.Data.Common; -using System.Data.SqlClient; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -20,8 +19,8 @@ namespace FreeSql.SqlServer.Curd } internal IFreeSql InternalOrm => _orm as IFreeSql; - internal SqlConnection InternalConnection => _connection as SqlConnection; - internal SqlTransaction InternalTransaction => _transaction as SqlTransaction; + internal DbConnection InternalConnection => _connection; + internal DbTransaction InternalTransaction => _transaction; public override int ExecuteAffrows() => base.SplitExecuteAffrows(_batchValuesLimit > 0 ? _batchValuesLimit : 1000, _batchParameterLimit > 0 ? _batchParameterLimit : 2100); public override long ExecuteIdentity() => base.SplitExecuteIdentity(_batchValuesLimit > 0 ? _batchValuesLimit : 1000, _batchParameterLimit > 0 ? _batchParameterLimit : 2100); diff --git a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs index 38cb29d5..cea0b092 100644 --- a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs +++ b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs @@ -78,50 +78,61 @@ public static partial class FreeSqlSqlServerGlobalExtensions bulkCopy.WriteToServer(dt); }; - if (insert.InternalConnection == null && insert.InternalTransaction == null) + try { - using (var conn = insert.InternalOrm.Ado.MasterPool.Get()) + if (insert.InternalConnection == null && insert.InternalTransaction == null) + { + using (var conn = insert.InternalOrm.Ado.MasterPool.Get()) + { + using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? + new SqlBulkCopy(conn.Value as SqlConnection) : + new SqlBulkCopy(conn.Value as SqlConnection, copyOptions, null)) + { + writeToServer(bulkCopy); + } + } + } + else if (insert.InternalTransaction != null) { using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? - new SqlBulkCopy(conn.Value as SqlConnection) : - new SqlBulkCopy(conn.Value as SqlConnection, copyOptions, null)) + new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection) : + new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection, copyOptions, insert.InternalTransaction as SqlTransaction)) { writeToServer(bulkCopy); } } - } - else if (insert.InternalTransaction != null) - { - using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? - new SqlBulkCopy(insert.InternalTransaction.Connection) : - new SqlBulkCopy(insert.InternalTransaction.Connection, copyOptions, insert.InternalTransaction)) + else if (insert.InternalConnection != null) { - writeToServer(bulkCopy); + var conn = insert.InternalConnection as SqlConnection; + var isNotOpen = false; + if (conn.State != System.Data.ConnectionState.Open) + { + isNotOpen = true; + conn.Open(); + } + try + { + using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? + new SqlBulkCopy(conn) : + new SqlBulkCopy(conn, copyOptions, null)) + { + writeToServer(bulkCopy); + } + } + finally + { + if (isNotOpen) + conn.Close(); + } + } + else + { + throw new NotImplementedException("ExecuteSqlBulkCopy 未实现错误,请反馈给作者"); } } - else if (insert.InternalConnection != null) + finally { - var conn = insert.InternalConnection; - var isNotOpen = false; - if (conn.State != System.Data.ConnectionState.Open) - { - isNotOpen = true; - conn.Open(); - } - using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? - new SqlBulkCopy(insert.InternalConnection) : - new SqlBulkCopy(insert.InternalConnection, copyOptions, null)) - { - writeToServer(bulkCopy); - } - if (isNotOpen) - { - conn.Close(); - } - } - else - { - throw new NotImplementedException("未实现错误,请反馈给作者"); + dt.Clear(); } } }