diff --git a/Examples/orm_vs/Program.cs b/Examples/orm_vs/Program.cs index 10facd16..54f365a8 100644 --- a/Examples/orm_vs/Program.cs +++ b/Examples/orm_vs/Program.cs @@ -15,15 +15,16 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using FreeSql; +using FreeSql.Internal.CommonProvider; namespace orm_vs { class Program { static IFreeSql fsql = new FreeSql.FreeSqlBuilder() - .UseConnectionString(FreeSql.DataType.SqlServer, "Data Source=.;Integrated Security=True;Initial Catalog=tedb;Pooling=true;Max Pool Size=20") - //.UseConnectionString(FreeSql.DataType.MySql, "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Max pool size=20") - //.UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=20") + .UseConnectionString(FreeSql.DataType.SqlServer, "Data Source=.;Integrated Security=True;Initial Catalog=tedb1;Pooling=true;Max Pool Size=21;TrustServerCertificate=true") + .UseConnectionString(FreeSql.DataType.MySql, "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Max pool size=21;AllowLoadLocalInfile=true;") + .UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=21") .UseAutoSyncStructure(false) .UseNoneCommandParameter(true) //.UseConfigEntityFromDbFirst(true) @@ -35,12 +36,12 @@ namespace orm_vs { var db = new SqlSugarClient(new ConnectionConfig() { - ConnectionString = "Data Source=.;Integrated Security=True;Initial Catalog=tedb;Pooling=true;Min Pool Size=20;Max Pool Size=20", - DbType = DbType.SqlServer, - //ConnectionString = "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Min Pool Size=20;Max Pool Size=20", + //ConnectionString = "Data Source=.;Integrated Security=True;Initial Catalog=tedb1;Pooling=true;Min Pool Size=20;Max Pool Size=20;TrustServerCertificate=true", + //DbType = DbType.SqlServer, + //ConnectionString = "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Min Pool Size=20;Max Pool Size=20;AllowLoadLocalInfile=true;", //DbType = DbType.MySql, - //ConnectionString = "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=21", - //DbType = DbType.PostgreSQL, + ConnectionString = "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=20", + DbType = DbType.PostgreSQL, IsAutoCloseConnection = true, InitKeyType = InitKeyType.Attribute }); @@ -58,18 +59,15 @@ namespace orm_vs public DbSet PatientExamination_2022s { get; set; } protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { - optionsBuilder.UseSqlServer(@"Data Source=.;Integrated Security=True;Initial Catalog=tedb;Pooling=true;Min Pool Size=21;Max Pool Size=21"); - //optionsBuilder.UseMySql("Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Min Pool Size=21;Max Pool Size=21"); - //optionsBuilder.UseNpgsql("Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=21"); + //optionsBuilder.UseSqlServer(@"Data Source=.;Integrated Security=True;Initial Catalog=tedb1;Pooling=true;Min Pool Size=19;Max Pool Size=19;TrustServerCertificate=true"); + //var connectionString = "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Min Pool Size=19;Max Pool Size=19"; + //optionsBuilder.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString)); + optionsBuilder.UseNpgsql("Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=19"); } protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder); - - modelBuilder.Entity() - .Property(a => a.create_time) - .HasConversion(a => int.Parse(a.ToString()), a => new DateTime(a)); } } @@ -136,7 +134,7 @@ namespace orm_vs } static void TestDapperSelectPatientExamination_2022() { - using (var conn = new SqlConnection("Data Source=.;Integrated Security=True;Initial Catalog=tedb;Pooling=true;Min Pool Size=21;Max Pool Size=22")) + using (var conn = new SqlConnection("Data Source=.;Integrated Security=True;Initial Catalog=tedb1;Pooling=true;Min Pool Size=21;Max Pool Size=22")) { var list = conn.Query("select top 40000 * from PatientExamination_2022"); } @@ -145,28 +143,28 @@ namespace orm_vs static DbConnection fsqlConn = null; static void Main(string[] args) { - var count = 0; - var sws = new List(); - Console.WriteLine("观察查询4万条记录内存,按 Enter 进入下一次,按任易键即出程序。。。"); - //while(Console.ReadKey().Key == ConsoleKey.Enter) - //using (var fcon = fsql.Ado.MasterPool.Get()) - //{ - //fsqlConn = fcon.Value; - for (var a = 0; a < 80; a++) - { - Stopwatch sw = Stopwatch.StartNew(); - TestFreeSqlSelectPatientExamination_2022(); - //TestEfSelectPatientExamination_2022(); - //TestSqlSugarSelectPatientExamination_2022(); - //TestDapperSelectPatientExamination_2022(); - sw.Stop(); - sws.Add(sw.ElapsedMilliseconds); - Console.WriteLine($"第 {++count} 次,查询4万条记录, {sw.ElapsedMilliseconds}ms,平均 {(long)sws.Average()}ms"); - } - //} - Console.ReadKey(); - fsql.Dispose(); - return; + //var count = 0; + //var sws = new List(); + //Console.WriteLine("观察查询4万条记录内存,按 Enter 进入下一次,按任易键即出程序。。。"); + ////while(Console.ReadKey().Key == ConsoleKey.Enter) + ////using (var fcon = fsql.Ado.MasterPool.Get()) + ////{ + // //fsqlConn = fcon.Value; + // for (var a = 0; a < 80; a++) + // { + // Stopwatch sw = Stopwatch.StartNew(); + // TestFreeSqlSelectPatientExamination_2022(); + // //TestEfSelectPatientExamination_2022(); + // //TestSqlSugarSelectPatientExamination_2022(); + // //TestDapperSelectPatientExamination_2022(); + // sw.Stop(); + // sws.Add(sw.ElapsedMilliseconds); + // Console.WriteLine($"第 {++count} 次,查询4万条记录, {sw.ElapsedMilliseconds}ms,平均 {(long)sws.Average()}ms"); + // } + ////} + //Console.ReadKey(); + //fsql.Dispose(); + //return; //fsql.CodeFirst.SyncStructure(typeof(Song), typeof(Song_tag), typeof(Tag)); //sugar.CodeFirst.InitTables(typeof(Song), typeof(Song_tag), typeof(Tag)); @@ -186,13 +184,6 @@ namespace orm_vs var sb = new StringBuilder(); var time = new Stopwatch(); - var sql222 = fsql.Select().Where(a => DateTime.Now.Subtract(a.create_time.Value).TotalHours > 0).ToSql(); - - var conModels = new List(); - conModels.Add(new ConditionalModel { FieldName = "`id` = 1 or 1=1; delete from song_tag; -- ", ConditionalType = ConditionalType.Equal, FieldValue = "1" }); - - var student = sugar.Queryable().Where(conModels).ToList(); - #region ET test ////var t31 = fsql.Select().ToList(); @@ -483,23 +474,10 @@ namespace orm_vs #endregion - var testlist1 = fsql.Select().OrderBy(a => a.id).ToList(); - var testlist2 = new List(); - fsql.Select().OrderBy(a => a.id).ToChunk(2, fetch => + sugar.Aop.OnLogExecuted = (s, e) => { - testlist2.AddRange(fetch.Object); - }); - - var testlist22 = new List(); - fsql.Select().LeftJoin((a, b) => a.id == b.song_id).ToChunk((a, b) => new { a.title, a.create_time, b.tag_id }, 2, fetch => - { - testlist22.AddRange(fetch.Object); - }); - - //sugar.Aop.OnLogExecuted = (s, e) => - //{ - // Trace.WriteLine(s); - //}; + Trace.WriteLine(s); + }; //测试前清空数据 fsql.Delete().Where(a => a.id > 0).ExecuteAffrows(); sugar.Deleteable().Where(a => a.id > 0).ExecuteCommand(); @@ -548,10 +526,10 @@ namespace orm_vs sb.Clear(); Console.WriteLine("更新:"); - Update(sb, 100, 1); + Update(sb, 10, 1); Console.Write(sb.ToString()); sb.Clear(); - Update(sb, 100, 10); + Update(sb, 10, 10); Console.Write(sb.ToString()); sb.Clear(); @@ -602,7 +580,7 @@ namespace orm_vs using (var conn = fsql.Ado.MasterPool.Get()) { for (var a = 0; a < forTime; a++) - Dapper.SqlMapper.Query(conn.Value, $"select top {size} * from freesql_song").ToList(); + Dapper.SqlMapper.Query(conn.Value, $"select * from freesql_song limit {size}").ToList(); } sw.Stop(); sb.AppendLine($"Dapper Select {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms\r\n"); @@ -612,7 +590,7 @@ namespace orm_vs { var songs = Enumerable.Range(0, size).Select(a => new Song { - create_time = DateTime.Now, + create_time = DateTime.Now.ToString(), is_deleted = false, title = $"Insert_{a}", url = $"Url_{a}" @@ -647,6 +625,7 @@ namespace orm_vs try { for (var a = 0; a < forTime; a++) + //sugar.Fastest().BulkCopy(songs.ToList()); sugar.Insertable(songs.ToArray()).ExecuteCommand(); } catch (Exception ex) @@ -682,7 +661,61 @@ namespace orm_vs fsql.Update().SetSource(songs).ExecuteAffrows(); } sw.Stop(); - sb.AppendLine($"FreeSql Update {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms"); + sb.AppendLine($"FreeSql Update1 {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms"); + + + songs = fsql.Select().Limit(size).ToList(); + sw.Restart(); + for (var a = 0; a < forTime; a++) + { + fsql.Update().SetSource(songs).ExecutePgCopy(); + } + sw.Stop(); + sb.AppendLine($"FreeSql BulkCopyUpdate {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms"); + + // songs = fsql.Select().Limit(size).ToList(); + // sw.Restart(); + // for (var a = 0; a < forTime; a++) + // { + // //fsql.Update().SetSource(songs).ExecuteAffrows(); + // var iou = fsql.InsertOrUpdate() as InsertOrUpdateProvider; + // var dbsql = new StringBuilder(); + // var dbparms = new List(); + // iou.WriteSourceSelectUnionAll(songs, dbsql, dbparms); + + // var sql = $@"update freesql_song a + //inner join ( {dbsql} ) b on b.id = a.id + //set a.create_time = b.create_time, a.is_deleted = b.is_deleted, a.title = b.title, a.url = b.url"; + // fsql.Ado.ExecuteNonQuery(System.Data.CommandType.Text, sql, dbparms.ToArray()); + // } + // sw.Stop(); + // sb.AppendLine($"FreeSql Update2(update inner join) {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms"); + + // songs = fsql.Select().Limit(size).ToList(); + // sw.Restart(); + // for (var a = 0; a < forTime; a++) + // { + // var isdroped = false; + // var tempTableName = $"#Temp_freesql_song"; + // fsql.Ado.ExecuteNonQuery($"select * into {tempTableName} from [freesql_song] where 1=2"); + // try + // { + // fsql.Insert(songs).AsTable(tempTableName).ExecuteMySqlBulkCopy(); + // var sql = $@"update freesql_song a + //inner join {tempTableName} b on b.id = a.id; + //set a.create_time = b.create_time, a.is_deleted = b.is_deleted, a.title = b.title, a.url = b.url + //; drop table {tempTableName}; "; + // fsql.Ado.ExecuteNonQuery(System.Data.CommandType.Text, sql); + // isdroped = true; + // } + // finally + // { + // if (isdroped == false) + // fsql.Ado.ExecuteNonQuery($"drop table {tempTableName}"); + // } + // } + // sw.Stop(); + // sb.AppendLine($"FreeSql Update3(update inner join #temp) {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms"); songs = sugar.Queryable().Take(size).ToList(); sw.Restart(); @@ -690,7 +723,7 @@ namespace orm_vs try { for (var a = 0; a < forTime; a++) - sugar.Updateable(songs).ExecuteCommand(); + sugar.Fastest().BulkUpdate(songs); } catch (Exception ex) { @@ -699,21 +732,21 @@ namespace orm_vs sw.Stop(); sb.AppendLine($"SqlSugar Update {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms" + (sugarEx != null ? $"成绩无效,错误:{sugarEx.Message}" : "")); - using (var db = new SongContext()) - { - songs = db.Songs.Take(size).AsNoTracking().ToList(); - } - sw.Restart(); - for (var a = 0; a < forTime; a++) - { + //using (var db = new SongContext()) + //{ + // songs = db.Songs.Take(size).AsNoTracking().ToList(); + //} + //sw.Restart(); + //for (var a = 0; a < forTime; a++) + //{ - using (var db = new SongContext()) - { - //db.Configuration.AutoDetectChangesEnabled = false; - //db.Songs.UpdateRange(songs.ToArray()); - //db.SaveChanges(); - } - } + // using (var db = new SongContext()) + // { + // //db.Configuration.AutoDetectChangesEnabled = false; + // //db.Songs.UpdateRange(songs.ToArray()); + // //db.SaveChanges(); + // } + //} sw.Stop(); sb.AppendLine($"EFCore Update {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms .net5.0无效\r\n"); } @@ -729,7 +762,7 @@ namespace orm_vs [Key] [DatabaseGenerated(DatabaseGeneratedOption.Identity)] public int id { get; set; } - public DateTime? create_time { get; set; } + public string create_time { get; set; } public bool? is_deleted { get; set; } public string title { get; set; } public string url { get; set; } diff --git a/Examples/orm_vs/orm_vs.csproj b/Examples/orm_vs/orm_vs.csproj index 08ab5687..e3ef045b 100644 --- a/Examples/orm_vs/orm_vs.csproj +++ b/Examples/orm_vs/orm_vs.csproj @@ -2,20 +2,20 @@ Exe - net5.0 + net7.0 - - - - - + + + + + - + diff --git a/FreeSql-lite.sln b/FreeSql-lite.sln index 989e5166..75f1ac43 100644 --- a/FreeSql-lite.sln +++ b/FreeSql-lite.sln @@ -83,6 +83,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "aspnetcore_transaction", "E EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FreeSql.Extensions.AggregateRoot", "Extensions\FreeSql.Extensions.AggregateRoot\FreeSql.Extensions.AggregateRoot.csproj", "{5C78C4CE-3CDC-49C3-AF34-556567B95F2A}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "orm_vs", "Examples\orm_vs\orm_vs.csproj", "{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -453,6 +455,18 @@ Global {5C78C4CE-3CDC-49C3-AF34-556567B95F2A}.Release|x64.Build.0 = Release|Any CPU {5C78C4CE-3CDC-49C3-AF34-556567B95F2A}.Release|x86.ActiveCfg = Release|Any CPU {5C78C4CE-3CDC-49C3-AF34-556567B95F2A}.Release|x86.Build.0 = Release|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|x64.ActiveCfg = Debug|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|x64.Build.0 = Debug|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|x86.ActiveCfg = Debug|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|x86.Build.0 = Debug|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|Any CPU.Build.0 = Release|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|x64.ActiveCfg = Release|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|x64.Build.0 = Release|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|x86.ActiveCfg = Release|Any CPU + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -484,10 +498,11 @@ Global {19D2E22A-B000-46B6-AFC8-60BF01A51C9A} = {2A381C57-2697-427B-9F10-55DA11FD02E4} {28163C3B-B2E6-432D-AAC3-F5F19374BE31} = {94C8A78D-AA15-47B2-A348-530CD86BFC1B} {5C78C4CE-3CDC-49C3-AF34-556567B95F2A} = {4A92E8A6-9A6D-41A1-9CDA-DE10899648AA} + {9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D} = {94C8A78D-AA15-47B2-A348-530CD86BFC1B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution - RESX_NeutralResourcesLanguage = en-US - RESX_PrefixTranslations = True SolutionGuid = {089687FA-5D21-40AC-BA8A-AA0D1E1H7F98} + RESX_PrefixTranslations = True + RESX_NeutralResourcesLanguage = en-US EndGlobalSection EndGlobal diff --git a/FreeSql/Internal/CommonProvider/DeleteProvider.cs b/FreeSql/Internal/CommonProvider/DeleteProvider.cs index 38cef112..d77837d4 100644 --- a/FreeSql/Internal/CommonProvider/DeleteProvider.cs +++ b/FreeSql/Internal/CommonProvider/DeleteProvider.cs @@ -51,7 +51,7 @@ namespace FreeSql.Internal.CommonProvider public IDelete WithTransaction(DbTransaction transaction) { _transaction = transaction; - _connection = _transaction?.Connection; + if (transaction != null) _connection = transaction.Connection; return this; } public IDelete WithConnection(DbConnection connection) diff --git a/FreeSql/Internal/CommonProvider/InsertOrUpdateProvider.cs b/FreeSql/Internal/CommonProvider/InsertOrUpdateProvider.cs index 2c6fefdf..557fcdf2 100644 --- a/FreeSql/Internal/CommonProvider/InsertOrUpdateProvider.cs +++ b/FreeSql/Internal/CommonProvider/InsertOrUpdateProvider.cs @@ -61,7 +61,7 @@ namespace FreeSql.Internal.CommonProvider public IInsertOrUpdate WithTransaction(DbTransaction transaction) { _transaction = transaction; - _connection = _transaction?.Connection; + if (transaction != null) _connection = transaction.Connection; return this; } public IInsertOrUpdate WithConnection(DbConnection connection) diff --git a/FreeSql/Internal/CommonProvider/InsertProvider.cs b/FreeSql/Internal/CommonProvider/InsertProvider.cs index d7ea18b3..20562e3c 100644 --- a/FreeSql/Internal/CommonProvider/InsertProvider.cs +++ b/FreeSql/Internal/CommonProvider/InsertProvider.cs @@ -78,7 +78,7 @@ namespace FreeSql.Internal.CommonProvider public IInsert WithTransaction(DbTransaction transaction) { _transaction = transaction; - _connection = _transaction?.Connection; + if (transaction != null) _connection = transaction.Connection; return this; } public IInsert WithConnection(DbConnection connection) diff --git a/FreeSql/Internal/CommonProvider/SelectProvider/Select0Provider.cs b/FreeSql/Internal/CommonProvider/SelectProvider/Select0Provider.cs index a59076bf..ecf4b817 100644 --- a/FreeSql/Internal/CommonProvider/SelectProvider/Select0Provider.cs +++ b/FreeSql/Internal/CommonProvider/SelectProvider/Select0Provider.cs @@ -557,7 +557,7 @@ namespace FreeSql.Internal.CommonProvider public TSelect WithTransaction(DbTransaction transaction) { _transaction = transaction; - _connection = _transaction?.Connection; + if (transaction != null) _connection = transaction.Connection; return this as TSelect; } public TSelect WithConnection(DbConnection connection) diff --git a/FreeSql/Internal/CommonProvider/UpdateProvider.cs b/FreeSql/Internal/CommonProvider/UpdateProvider.cs index 5448e93c..617c36af 100644 --- a/FreeSql/Internal/CommonProvider/UpdateProvider.cs +++ b/FreeSql/Internal/CommonProvider/UpdateProvider.cs @@ -1,5 +1,6 @@ using FreeSql.Extensions.EntityUtil; using FreeSql.Internal.Model; +using FreeSql.Internal.ObjectPool; using System; using System.Collections.Generic; using System.Data; @@ -37,6 +38,87 @@ namespace FreeSql.Internal.CommonProvider public int _commandTimeout = 0; public Action _interceptSql; public object _updateVersionValue; + + + public static int ExecuteBulkUpdate(UpdateProvider update, NativeTuple state, Action> funcBulkCopy) where T1 : class + { + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; + var fsql = update._orm; + var connection = update._connection; + var transaction = update._transaction; + + Object poolConn = null; + if (connection == null) + { + poolConn = fsql.Ado.MasterPool.Get(); + connection = poolConn.Value; + } + try + { + var droped = false; + fsql.Ado.CommandFluent(state.Item1).WithConnection(connection).WithTransaction(transaction).ExecuteNonQuery(); + try + { + funcBulkCopy(fsql.Insert(update._source) + .AsType(update._table.Type) + .WithConnection(connection) + .WithTransaction(transaction) + .InsertIdentity().AsTable(state.Item4)); + var affrows = fsql.Ado.CommandFluent(state.Item2 + ";" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQuery(); + droped = true; + return affrows; + } + finally + { + if (droped == false) fsql.Ado.CommandFluent(state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQuery(); + } + } + finally + { + poolConn?.Dispose(); + } + } +#if net40 +#else + async public static Task ExecuteBulkUpdateAsync(UpdateProvider update, NativeTuple state, Func, Task> funcBulkCopy) where T1 : class + { + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; + var fsql = update._orm; + var connection = update._connection; + var transaction = update._transaction; + + Object poolConn = null; + if (connection == null) + { + poolConn = await fsql.Ado.MasterPool.GetAsync(); + connection = poolConn.Value; + } + try + { + var droped = false; + await fsql.Ado.CommandFluent(state.Item1).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync(); + try + { + await funcBulkCopy(fsql.Insert(update._source) + .AsType(update._table.Type) + .WithConnection(connection) + .WithTransaction(transaction) + .InsertIdentity().AsTable(state.Item4)); + var affrows = await fsql.Ado.CommandFluent(state.Item2 + ";" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync(); + droped = true; + return affrows; + } + finally + { + if (droped == false) await fsql.Ado.CommandFluent(state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync(); + } + } + finally + { + poolConn?.Dispose(); + } + } +#endif } public abstract partial class UpdateProvider : UpdateProvider, IUpdate @@ -95,7 +177,7 @@ namespace FreeSql.Internal.CommonProvider public IUpdate WithTransaction(DbTransaction transaction) { _transaction = transaction; - _connection = _transaction?.Connection; + if (transaction != null) _connection = transaction.Connection; return this; } public IUpdate WithConnection(DbConnection connection) diff --git a/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs b/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs index ccb83ad1..efde19d2 100644 --- a/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs +++ b/Providers/FreeSql.Provider.MySqlConnector/FreeSqlMySqlConnectorGlobalExtensions.cs @@ -4,6 +4,11 @@ using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using System.Threading; +using FreeSql.Internal.Model; +using FreeSql.Internal.CommonProvider; +using FreeSql.Internal.ObjectPool; +using System.Linq; +using System.Data.Common; #if MySqlConnector using MySqlConnector; #else @@ -13,6 +18,78 @@ using MySql.Data.MySqlClient; public static class FreeSqlMySqlConnectorGlobalExtensions { #region ExecuteMySqlBulkCopy + + /// + /// 批量更新(更新字段数量超过 2000 时收益大) + /// 实现原理:使用 MySqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 + /// + /// + /// + /// + /// + public static int ExecuteMySqlBulkCopy(this IUpdate that, int? bulkCopyTimeout = null) where T : class + { + var update = that as UpdateProvider; + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; + var state = ExecuteMySqlBulkCopyState(update); + return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecuteMySqlBulkCopy(bulkCopyTimeout)); + } + static NativeTuple ExecuteMySqlBulkCopyState(UpdateProvider update) where T : class + { + if (update._source.Any() != true) return null; + var _table = update._table; + var _tempPrimarys = update._tempPrimarys; + var _commonUtils = update._commonUtils; + var _ignore = update._ignore; + var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName; + var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}"; + if (update._connection == null) + { + if (update._orm.Ado.TransactionCurrentThread != null) + update.WithTransaction(update._orm.Ado.TransactionCurrentThread); + } + var sb = new StringBuilder(); + sb.Append("CREATE TEMPORARY TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); + foreach (var tbcol in _table.ColumnsByPosition) + { + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(tbcol.Attribute.Name)).Append(" ").Append(tbcol.Attribute.DbType); + sb.Append(","); + } + //if (_tempPrimarys.Any()) + //{ + // sb.Append(" \r\n PRIMARY KEY ("); + // foreach (var tbcol in _tempPrimarys) sb.Append(_commonUtils.QuoteSqlName(tbcol.Attribute.Name)).Append(", "); + // sb.Remove(sb.Length - 2, 2).Append("),"); + //} + var sql1 = sb.Remove(sb.Length - 1, 1).Append(" \r\n) Engine=InnoDB;").ToString(); + + sb.Clear().Append("UPDATE ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") + .Append(" \r\nINNER JOIN ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b ON "); + for (var a = 0; a < _tempPrimarys.Length; a++) + { + var pkname = _commonUtils.QuoteSqlName(_tempPrimarys[a].Attribute.Name); + if (a > 0) sb.Append(" AND "); + sb.Append("b.").Append(pkname).Append(" = a.").Append(pkname); + } + sb.Append("\r\n SET "); + var colidx = 0; + foreach (var col in _table.Columns.Values) + { + if (col.Attribute.IsPrimary) continue; + if (_tempPrimarys.Any(a => a.CsName == col.CsName)) continue; + if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && _ignore.ContainsKey(col.Attribute.Name) == false) + { + if (colidx > 0) sb.Append(","); + sb.Append(" \r\n a.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" = ").Append("b.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)); + ++colidx; + } + } + var sql2 = sb.ToString(); + sb.Clear(); + var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}"; + return NativeTuple.Create(sql1, sql2, sql3, tempTableName); + } + /// /// MySql MySqlCopyBulk 批量插入功能 /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 @@ -93,6 +170,13 @@ public static class FreeSqlMySqlConnectorGlobalExtensions } #if net40 #else + public static Task ExecuteMySqlBulkCopyAsync(this IUpdate that, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class + { + var update = that as UpdateProvider; + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return Task.FromResult(0); + var state = ExecuteMySqlBulkCopyState(update); + return UpdateProvider.ExecuteBulkUpdateAsync(update, state, insert => insert.ExecuteMySqlBulkCopyAsync(bulkCopyTimeout, cancellationToken)); + } async public static Task ExecuteMySqlBulkCopyAsync(this IInsert that, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class { var insert = that as FreeSql.MySql.Curd.MySqlInsert; diff --git a/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs b/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs index 8f3b3179..313ce549 100644 --- a/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs +++ b/Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs @@ -1,9 +1,12 @@ using FreeSql; +using FreeSql.Internal.CommonProvider; +using FreeSql.Internal.Model; using FreeSql.PostgreSQL.Curd; using Npgsql; using System; using System.Data; using System.Diagnostics; +using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading; @@ -32,6 +35,69 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions public static OnConflictDoUpdate OnConflictDoUpdate(this IInsert that, Expression> columns = null) where T1 : class => new FreeSql.PostgreSQL.Curd.OnConflictDoUpdate(that.InsertIdentity(), columns); #region ExecutePgCopy + /// + /// 批量更新(更新字段数量超过 2000 时收益大) + /// 实现原理:使用 PgCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 + /// + /// + /// + /// + public static int ExecutePgCopy(this IUpdate that) where T : class + { + var update = that as UpdateProvider; + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; + var state = ExecutePgCopyState(update); + return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecutePgCopy()); + } + static NativeTuple ExecutePgCopyState(UpdateProvider update) where T : class + { + if (update._source.Any() != true) return null; + var _table = update._table; + var _tempPrimarys = update._tempPrimarys; + var _commonUtils = update._commonUtils; + var _ignore = update._ignore; + var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName; + var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}"; + if (update._connection == null) + { + if (update._orm.Ado.TransactionCurrentThread != null) + update.WithTransaction(update._orm.Ado.TransactionCurrentThread); + } + var sb = new StringBuilder(); + sb.Append("CREATE TEMP TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); + foreach (var tbcol in _table.ColumnsByPosition) + { + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(tbcol.Attribute.Name)).Append(" ").Append(tbcol.Attribute.DbType); + sb.Append(","); + } + var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) WITH (OIDS=FALSE);").ToString(); + + sb.Clear().Append("UPDATE ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a \r\nSET "); + var colidx = 0; + foreach (var col in _table.Columns.Values) + { + if (col.Attribute.IsPrimary) continue; + if (_tempPrimarys.Any(a => a.CsName == col.CsName)) continue; + if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && _ignore.ContainsKey(col.Attribute.Name) == false) + { + if (colidx > 0) sb.Append(","); + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" = ").Append("b.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)); + ++colidx; + } + } + sb.Append(" \r\nFROM ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b \r\nWHERE "); + for (var a = 0; a < _tempPrimarys.Length; a++) + { + var pkname = _commonUtils.QuoteSqlName(_tempPrimarys[a].Attribute.Name); + if (a > 0) sb.Append(" AND "); + sb.Append("b.").Append(pkname).Append(" = a.").Append(pkname); + } + var sql2 = sb.ToString(); + sb.Clear(); + var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}"; + return NativeTuple.Create(sql1, sql2, sql3, tempTableName); + } + /// /// PostgreSQL COPY 批量导入功能,封装了 NpgsqlConnection.BeginBinaryImport 方法 /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 @@ -121,6 +187,13 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions #if net45 #else + public static Task ExecutePgCopyAsync(this IUpdate that, CancellationToken cancellationToken = default) where T : class + { + var update = that as UpdateProvider; + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return Task.FromResult(0); + var state = ExecutePgCopyState(update); + return UpdateProvider.ExecuteBulkUpdateAsync(update, state, insert => insert.ExecutePgCopyAsync(cancellationToken)); + } async public static Task ExecutePgCopyAsync(this IInsert that, CancellationToken cancellationToken = default) where T : class { var insert = that as FreeSql.PostgreSQL.Curd.PostgreSQLInsert; diff --git a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs index 315ae260..01cf9e24 100644 --- a/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs +++ b/Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs @@ -5,6 +5,10 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using FreeSql.Internal.CommonProvider; +using System.Linq; +using System.Text; +using System.Data.Common; +using FreeSql.Internal.ObjectPool; #if microsoft using Microsoft.Data.SqlClient; #else @@ -115,6 +119,65 @@ public static partial class FreeSqlSqlServerGlobalExtensions internal static ConcurrentDictionary>> _dicSetGlobalSelectWithLock = new ConcurrentDictionary>>(); #region ExecuteSqlBulkCopy + /// + /// 批量更新(更新字段数量超过 2000 时收益大) + /// 实现原理:使用 SqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 + /// + /// + /// + /// + /// + /// + /// + public static int ExecuteSqlBulkCopy(this IUpdate that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null) where T : class + { + var update = that as UpdateProvider; + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; + var state = ExecuteSqlBulkCopyState(update); + return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecuteSqlBulkCopy(copyOptions, batchSize, bulkCopyTimeout)); + } + static NativeTuple ExecuteSqlBulkCopyState(UpdateProvider update) where T : class + { + if (update._source.Any() != true) return null; + var _table = update._table; + var _tempPrimarys = update._tempPrimarys; + var _commonUtils = update._commonUtils; + var _ignore = update._ignore; + var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName; + var tempTableName = $"#Temp_{updateTableName}"; + if (update._connection == null) + { + if (update._orm.Ado.TransactionCurrentThread != null) + update.WithTransaction(update._orm.Ado.TransactionCurrentThread); + } + var sql1 = $"SELECT * INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2"; + var sb = new StringBuilder().Append("UPDATE ").Append(" a SET \r\n "); + var colidx = 0; + foreach (var col in _table.Columns.Values) + { + if (col.Attribute.IsPrimary) continue; + if (_tempPrimarys.Any(a => a.CsName == col.CsName)) continue; + if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && _ignore.ContainsKey(col.Attribute.Name) == false) + { + if (colidx > 0) sb.Append(", \r\n "); + sb.Append(" a.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" = ").Append("b.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)); + ++colidx; + } + } + sb.Append(" \r\nFROM ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") + .Append(" \r\nINNER JOIN ").Append(tempTableName).Append(" b ON "); + for (var a = 0; a < _tempPrimarys.Length; a++) + { + var pkname = _commonUtils.QuoteSqlName(_tempPrimarys[a].Attribute.Name); + if (a > 0) sb.Append(" AND "); + sb.Append("b.").Append(pkname).Append(" = a.").Append(pkname); + } + var sql2 = sb.ToString(); + sb.Clear(); + var sql3 = $"DROP TABLE {tempTableName}"; + return NativeTuple.Create(sql1, sql2, sql3, tempTableName); + } + /// /// SqlServer SqlCopyBulk 批量插入功能 /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 @@ -214,6 +277,13 @@ public static partial class FreeSqlSqlServerGlobalExtensions } #if net40 #else + public static Task ExecuteSqlBulkCopyAsync(this IUpdate that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class + { + var update = that as UpdateProvider; + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return Task.FromResult(0); + var state = ExecuteSqlBulkCopyState(update); + return UpdateProvider.ExecuteBulkUpdateAsync(update, state, insert => insert.ExecuteSqlBulkCopyAsync(copyOptions, batchSize, bulkCopyTimeout, cancellationToken)); + } async public static Task ExecuteSqlBulkCopyAsync(this IInsert that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class { var insert = that as FreeSql.SqlServer.Curd.SqlServerInsert;