diff --git a/Examples/base_entity/Program.cs b/Examples/base_entity/Program.cs index 17a9e949..f231a775 100644 --- a/Examples/base_entity/Program.cs +++ b/Examples/base_entity/Program.cs @@ -545,15 +545,15 @@ namespace base_entity .UseConnectionString(FreeSql.DataType.SqlServer, "Data Source=.;Integrated Security=True;Initial Catalog=freesqlTest;Pooling=true;Max Pool Size=3;TrustServerCertificate=true") - //.UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=2") + .UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=2") //.UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=toc;Pooling=true;Maximum Pool Size=2") - //.UseNameConvert(FreeSql.Internal.NameConvertType.ToLower) + .UseNameConvert(FreeSql.Internal.NameConvertType.ToLower) //.UseConnectionString(FreeSql.DataType.Oracle, "user id=user1;password=123456;data source=//127.0.0.1:1521/XE;Pooling=true;Max Pool Size=2") //.UseNameConvert(FreeSql.Internal.NameConvertType.ToUpper) - .UseConnectionString(FreeSql.DataType.Dameng, "server=127.0.0.1;port=5236;user id=2user;password=123456789;database=2user;poolsize=5;min pool size=1") - .UseNameConvert(FreeSql.Internal.NameConvertType.ToUpper) + //.UseConnectionString(FreeSql.DataType.Dameng, "server=127.0.0.1;port=5236;user id=2user;password=123456789;database=2user;poolsize=5;min pool size=1") + //.UseNameConvert(FreeSql.Internal.NameConvertType.ToUpper) //.UseConnectionString(FreeSql.DataType.OdbcMySql, "Driver={MySQL ODBC 8.0 Unicode Driver};Server=127.0.0.1;Persist Security Info=False;Trusted_Connection=Yes;UID=root;PWD=root;DATABASE=cccddd_odbc;Charset=utf8;SslMode=none;Max pool size=2") @@ -578,15 +578,15 @@ namespace base_entity BaseEntity.Initialization(fsql, () => _asyncUow.Value); #endregion - fsql.CodeFirst.GetTableByEntity(typeof(User1)).Columns.Values.ToList().ForEach(col => - { - col.Comment = ""; - }); + //fsql.CodeFirst.GetTableByEntity(typeof(User1)).Columns.Values.ToList().ForEach(col => + //{ + // col.Comment = ""; + //}); fsql.Insert(Enumerable.Range(0, 100).Select(a => new User1 { Id = Guid.NewGuid(), Nickname = $"nickname{a}", Username = $"username{a}", Description = $"desc{a}" }).ToArray()).ExecuteAffrows(); fsql.InsertOrUpdate() .SetSource(fsql.Select().ToList()) - .ExecuteDmBulkCopy(); + .ExecutePgCopy(); var updatejoin01 = fsql.Update() .Join(fsql.Select(), (a, b) => a.GroupId == b.Id) diff --git a/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs b/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs index a85c7166..1605cd97 100644 --- a/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs +++ b/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs @@ -37,6 +37,51 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions #region ExecutePgCopy /// + /// 批量插入或更新(操作的字段数量超过 2000 时收益大) + /// 实现原理:使用 PgCopy 插入临时表,再执行 INSERT INTO t1 select * from #temp ON CONFLICT(""id"") DO UPDATE SET ... + /// + /// + /// + /// + public static int ExecutePgCopy(this IInsertOrUpdate that) where T : class + { + var upsert = that as InsertOrUpdateProvider; + if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return 0; + var state = ExecutePgCopyState(upsert); + return UpdateProvider.ExecuteBulkUpsert(upsert, state, insert => insert.ExecutePgCopy()); + } + static NativeTuple ExecutePgCopyState(InsertOrUpdateProvider upsert) where T : class + { + if (upsert._source.Any() != true) return null; + var _table = upsert._table; + var _commonUtils = upsert._commonUtils; + var updateTableName = upsert._tableRule?.Invoke(_table.DbName) ?? _table.DbName; + var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}"; + if (upsert._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); + if (upsert._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); + if (upsert._connection == null && upsert._orm.Ado.TransactionCurrentThread != null) + upsert.WithTransaction(upsert._orm.Ado.TransactionCurrentThread); + var sb = new StringBuilder().Append("CREATE TEMP TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); + foreach (var col in _table.Columns.Values) + { + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", "")); + sb.Append(","); + } + var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) WITH (OIDS=FALSE);").ToString(); + sb.Clear(); + try + { + upsert._sourceSql = $"select * from {tempTableName}"; + var sql2 = upsert.ToSql(); + var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}"; + return NativeTuple.Create(sql1, sql2, sql3, tempTableName, _table.Columns.Values.Select(a => a.Attribute.Name).ToArray()); + } + finally + { + upsert._sourceSql = null; + } + } + /// /// 批量更新(更新字段数量超过 2000 时收益大) /// 实现原理:使用 PgCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 /// @@ -173,6 +218,13 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions #if net45 #else + public static Task ExecutePgCopyAsync(this IInsertOrUpdate that, CancellationToken cancellationToken = default) where T : class + { + var upsert = that as UpdateProvider; + if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return Task.FromResult(0); + var state = ExecutePgCopyState(upsert); + return UpdateProvider.ExecuteBulkUpdateAsync(upsert, state, insert => insert.ExecutePgCopyAsync(cancellationToken)); + } public static Task ExecutePgCopyAsync(this IUpdate that, CancellationToken cancellationToken = default) where T : class { var update = that as UpdateProvider;