diff --git a/FreeSql.DbContext/FreeSql.DbContext.xml b/FreeSql.DbContext/FreeSql.DbContext.xml index d9f91124..dc0203b8 100644 --- a/FreeSql.DbContext/FreeSql.DbContext.xml +++ b/FreeSql.DbContext/FreeSql.DbContext.xml @@ -110,13 +110,6 @@ 清空状态数据 - - - 根据 lambda 条件删除数据 - - - - 添加 diff --git a/Providers/FreeSql.Provider.PostgreSQL/FreeSql.Provider.PostgreSQL.csproj b/Providers/FreeSql.Provider.PostgreSQL/FreeSql.Provider.PostgreSQL.csproj index 3938cc17..9c8fd3ae 100644 --- a/Providers/FreeSql.Provider.PostgreSQL/FreeSql.Provider.PostgreSQL.csproj +++ b/Providers/FreeSql.Provider.PostgreSQL/FreeSql.Provider.PostgreSQL.csproj @@ -33,6 +33,10 @@ + + + net45 + diff --git a/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs b/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs index dd5aa832..b2c75baf 100644 --- a/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs +++ b/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs @@ -5,6 +5,7 @@ using System; using System.Data; using System.Linq.Expressions; using System.Text; +using System.Threading.Tasks; public static partial class FreeSqlPostgreSQLGlobalExtensions { @@ -28,6 +29,7 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions /// public static OnConflictDoUpdate OnConflictDoUpdate(this IInsert that, Expression> columns = null) where T1 : class => new FreeSql.PostgreSQL.Curd.OnConflictDoUpdate(that.InsertIdentity(), columns); + #region ExecutePgCopy /// /// PostgreSQL COPY 批量导入功能,封装了 NpgsqlConnection.BeginBinaryImport 方法 /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 @@ -48,7 +50,7 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions public static void ExecutePgCopy(this IInsert that) where T : class { var insert = that as FreeSql.PostgreSQL.Curd.PostgreSQLInsert; - if (insert == null) throw new Exception("ExecuteSqlBulkCopy 是 FreeSql.Provider.PostgreSQL 特有的功能"); + if (insert == null) throw new Exception("ExecutePgCopy 是 FreeSql.Provider.PostgreSQL 特有的功能"); var dt = that.ToDataTable(); if (dt.Rows.Count == 0) return; @@ -106,7 +108,7 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions } else { - throw new NotImplementedException("ExecuteCopy 未实现错误,请反馈给作者"); + throw new NotImplementedException("ExecutePgCopy 未实现错误,请反馈给作者"); } } finally @@ -114,4 +116,77 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions dt.Clear(); } } + +#if net45 +#else + async public static Task ExecutePgCopyAsync(this IInsert that) where T : class + { + var insert = that as FreeSql.PostgreSQL.Curd.PostgreSQLInsert; + if (insert == null) throw new Exception("ExecutePgCopyAsync 是 FreeSql.Provider.PostgreSQL 特有的功能"); + + var dt = that.ToDataTable(); + if (dt.Rows.Count == 0) return; + Func binaryImportAsync = async conn => + { + var copyFromCommand = new StringBuilder().Append("COPY ").Append(insert.InternalCommonUtils.QuoteSqlName(dt.TableName)).Append("("); + var colIndex = 0; + foreach (DataColumn col in dt.Columns) + { + if (colIndex++ > 0) copyFromCommand.Append(", "); + copyFromCommand.Append(insert.InternalCommonUtils.QuoteSqlName(col.ColumnName)); + } + copyFromCommand.Append(") FROM STDIN BINARY"); + using (var writer = conn.BeginBinaryImport(copyFromCommand.ToString())) + { + foreach (DataRow item in dt.Rows) + await writer.WriteRowAsync(System.Threading.CancellationToken.None, item.ItemArray); + writer.Complete(); + } + copyFromCommand.Clear(); + }; + + try + { + if (insert.InternalConnection == null && insert.InternalTransaction == null) + { + using (var conn = await insert.InternalOrm.Ado.MasterPool.GetAsync()) + { + await binaryImportAsync(conn.Value as NpgsqlConnection); + } + } + else if (insert.InternalTransaction != null) + { + await binaryImportAsync(insert.InternalTransaction.Connection as NpgsqlConnection); + } + else if (insert.InternalConnection != null) + { + var conn = insert.InternalConnection as NpgsqlConnection; + var isNotOpen = false; + if (conn.State != System.Data.ConnectionState.Open) + { + isNotOpen = true; + await conn.OpenAsync(); + } + try + { + await binaryImportAsync(conn); + } + finally + { + if (isNotOpen) + await conn.CloseAsync(); + } + } + else + { + throw new NotImplementedException("ExecutePgCopyAsync 未实现错误,请反馈给作者"); + } + } + finally + { + dt.Clear(); + } + } +#endif + #endregion } diff --git a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs index cea0b092..22ac8e2e 100644 --- a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs +++ b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Data.SqlClient; +using System.Threading.Tasks; public static partial class FreeSqlSqlServerGlobalExtensions { @@ -42,6 +43,7 @@ public static partial class FreeSqlSqlServerGlobalExtensions } internal static ConcurrentDictionary)> _dicSetGlobalSelectWithLock = new ConcurrentDictionary)>(); + #region ExecuteSqlBulkCopy /// /// SqlServer SqlCopyBulk 批量插入功能 /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 @@ -135,6 +137,83 @@ public static partial class FreeSqlSqlServerGlobalExtensions dt.Clear(); } } +#if net40 +#else + async public static Task ExecuteSqlBulkCopyAsync(this IInsert that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null) where T : class + { + var insert = that as FreeSql.SqlServer.Curd.SqlServerInsert; + if (insert == null) throw new Exception("ExecuteSqlBulkCopyAsync 是 FreeSql.Provider.SqlServer 特有的功能"); + + var dt = that.ToDataTable(); + if (dt.Rows.Count == 0) return; + + Func writeToServerAsync = bulkCopy => + { + if (batchSize.HasValue) bulkCopy.BatchSize = batchSize.Value; + if (bulkCopyTimeout.HasValue) bulkCopy.BulkCopyTimeout = bulkCopyTimeout.Value; + bulkCopy.DestinationTableName = dt.TableName; + return bulkCopy.WriteToServerAsync(dt); + }; + + try + { + if (insert.InternalConnection == null && insert.InternalTransaction == null) + { + using (var conn = await insert.InternalOrm.Ado.MasterPool.GetAsync()) + { + using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? + new SqlBulkCopy(conn.Value as SqlConnection) : + new SqlBulkCopy(conn.Value as SqlConnection, copyOptions, null)) + { + await writeToServerAsync(bulkCopy); + } + } + } + else if (insert.InternalTransaction != null) + { + using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? + new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection) : + new SqlBulkCopy(insert.InternalTransaction.Connection as SqlConnection, copyOptions, insert.InternalTransaction as SqlTransaction)) + { + await writeToServerAsync(bulkCopy); + } + } + else if (insert.InternalConnection != null) + { + var conn = insert.InternalConnection as SqlConnection; + var isNotOpen = false; + if (conn.State != System.Data.ConnectionState.Open) + { + isNotOpen = true; + await conn.OpenAsync(); + } + try + { + using (var bulkCopy = copyOptions == SqlBulkCopyOptions.Default ? + new SqlBulkCopy(conn) : + new SqlBulkCopy(conn, copyOptions, null)) + { + await writeToServerAsync(bulkCopy); + } + } + finally + { + if (isNotOpen) + conn.Close(); + } + } + else + { + throw new NotImplementedException("ExecuteSqlBulkCopyAsync 未实现错误,请反馈给作者"); + } + } + finally + { + dt.Clear(); + } + } +#endif + #endregion } [Flags]