- 增加 IUpdate<T> BulkCopy 批量更新扩展方法(暂时支持 SqlServer/MySql/PostgreSQL);

This commit is contained in:
2881099 2022-12-06 18:03:16 +08:00
parent 953323e335
commit 6c00bd825e
11 changed files with 453 additions and 96 deletions

View File

@ -15,15 +15,16 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FreeSql;
using FreeSql.Internal.CommonProvider;
namespace orm_vs
{
class Program
{
static IFreeSql fsql = new FreeSql.FreeSqlBuilder()
.UseConnectionString(FreeSql.DataType.SqlServer, "Data Source=.;Integrated Security=True;Initial Catalog=tedb;Pooling=true;Max Pool Size=20")
//.UseConnectionString(FreeSql.DataType.MySql, "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Max pool size=20")
//.UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=20")
.UseConnectionString(FreeSql.DataType.SqlServer, "Data Source=.;Integrated Security=True;Initial Catalog=tedb1;Pooling=true;Max Pool Size=21;TrustServerCertificate=true")
.UseConnectionString(FreeSql.DataType.MySql, "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Max pool size=21;AllowLoadLocalInfile=true;")
.UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=21")
.UseAutoSyncStructure(false)
.UseNoneCommandParameter(true)
//.UseConfigEntityFromDbFirst(true)
@ -35,12 +36,12 @@ namespace orm_vs
{
var db = new SqlSugarClient(new ConnectionConfig()
{
ConnectionString = "Data Source=.;Integrated Security=True;Initial Catalog=tedb;Pooling=true;Min Pool Size=20;Max Pool Size=20",
DbType = DbType.SqlServer,
//ConnectionString = "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Min Pool Size=20;Max Pool Size=20",
//ConnectionString = "Data Source=.;Integrated Security=True;Initial Catalog=tedb1;Pooling=true;Min Pool Size=20;Max Pool Size=20;TrustServerCertificate=true",
//DbType = DbType.SqlServer,
//ConnectionString = "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Min Pool Size=20;Max Pool Size=20;AllowLoadLocalInfile=true;",
//DbType = DbType.MySql,
//ConnectionString = "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=21",
//DbType = DbType.PostgreSQL,
ConnectionString = "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=20",
DbType = DbType.PostgreSQL,
IsAutoCloseConnection = true,
InitKeyType = InitKeyType.Attribute
});
@ -58,18 +59,15 @@ namespace orm_vs
public DbSet<PatientExamination_2022> PatientExamination_2022s { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(@"Data Source=.;Integrated Security=True;Initial Catalog=tedb;Pooling=true;Min Pool Size=21;Max Pool Size=21");
//optionsBuilder.UseMySql("Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Min Pool Size=21;Max Pool Size=21");
//optionsBuilder.UseNpgsql("Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=21");
//optionsBuilder.UseSqlServer(@"Data Source=.;Integrated Security=True;Initial Catalog=tedb1;Pooling=true;Min Pool Size=19;Max Pool Size=19;TrustServerCertificate=true");
//var connectionString = "Data Source=127.0.0.1;Port=3306;User ID=root;Password=root;Initial Catalog=cccddd;Charset=utf8;SslMode=none;Min Pool Size=19;Max Pool Size=19";
//optionsBuilder.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString));
optionsBuilder.UseNpgsql("Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=19");
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.Entity<Song>()
.Property(a => a.create_time)
.HasConversion(a => int.Parse(a.ToString()), a => new DateTime(a));
}
}
@ -136,7 +134,7 @@ namespace orm_vs
}
static void TestDapperSelectPatientExamination_2022()
{
using (var conn = new SqlConnection("Data Source=.;Integrated Security=True;Initial Catalog=tedb;Pooling=true;Min Pool Size=21;Max Pool Size=22"))
using (var conn = new SqlConnection("Data Source=.;Integrated Security=True;Initial Catalog=tedb1;Pooling=true;Min Pool Size=21;Max Pool Size=22"))
{
var list = conn.Query<PatientExamination_2022>("select top 40000 * from PatientExamination_2022");
}
@ -145,28 +143,28 @@ namespace orm_vs
static DbConnection fsqlConn = null;
static void Main(string[] args)
{
var count = 0;
var sws = new List<long>();
Console.WriteLine("观察查询4万条记录内存按 Enter 进入下一次,按任易键即出程序。。。");
//while(Console.ReadKey().Key == ConsoleKey.Enter)
//using (var fcon = fsql.Ado.MasterPool.Get())
//{
//fsqlConn = fcon.Value;
for (var a = 0; a < 80; a++)
{
Stopwatch sw = Stopwatch.StartNew();
TestFreeSqlSelectPatientExamination_2022();
//TestEfSelectPatientExamination_2022();
//TestSqlSugarSelectPatientExamination_2022();
//TestDapperSelectPatientExamination_2022();
sw.Stop();
sws.Add(sw.ElapsedMilliseconds);
Console.WriteLine($"第 {++count} 次查询4万条记录, {sw.ElapsedMilliseconds}ms平均 {(long)sws.Average()}ms");
}
//}
Console.ReadKey();
fsql.Dispose();
return;
//var count = 0;
//var sws = new List<long>();
//Console.WriteLine("观察查询4万条记录内存按 Enter 进入下一次,按任易键即出程序。。。");
////while(Console.ReadKey().Key == ConsoleKey.Enter)
////using (var fcon = fsql.Ado.MasterPool.Get())
////{
// //fsqlConn = fcon.Value;
// for (var a = 0; a < 80; a++)
// {
// Stopwatch sw = Stopwatch.StartNew();
// TestFreeSqlSelectPatientExamination_2022();
// //TestEfSelectPatientExamination_2022();
// //TestSqlSugarSelectPatientExamination_2022();
// //TestDapperSelectPatientExamination_2022();
// sw.Stop();
// sws.Add(sw.ElapsedMilliseconds);
// Console.WriteLine($"第 {++count} 次查询4万条记录, {sw.ElapsedMilliseconds}ms平均 {(long)sws.Average()}ms");
// }
////}
//Console.ReadKey();
//fsql.Dispose();
//return;
//fsql.CodeFirst.SyncStructure(typeof(Song), typeof(Song_tag), typeof(Tag));
//sugar.CodeFirst.InitTables(typeof(Song), typeof(Song_tag), typeof(Tag));
@ -186,13 +184,6 @@ namespace orm_vs
var sb = new StringBuilder();
var time = new Stopwatch();
var sql222 = fsql.Select<Song>().Where(a => DateTime.Now.Subtract(a.create_time.Value).TotalHours > 0).ToSql();
var conModels = new List<IConditionalModel>();
conModels.Add(new ConditionalModel { FieldName = "`id` = 1 or 1=1; delete from song_tag; -- ", ConditionalType = ConditionalType.Equal, FieldValue = "1" });
var student = sugar.Queryable<Song>().Where(conModels).ToList();
#region ET test
////var t31 = fsql.Select<xxx>().ToList();
@ -483,23 +474,10 @@ namespace orm_vs
#endregion
var testlist1 = fsql.Select<Song>().OrderBy(a => a.id).ToList();
var testlist2 = new List<Song>();
fsql.Select<Song>().OrderBy(a => a.id).ToChunk(2, fetch =>
sugar.Aop.OnLogExecuted = (s, e) =>
{
testlist2.AddRange(fetch.Object);
});
var testlist22 = new List<object>();
fsql.Select<Song, Song_tag>().LeftJoin((a, b) => a.id == b.song_id).ToChunk((a, b) => new { a.title, a.create_time, b.tag_id }, 2, fetch =>
{
testlist22.AddRange(fetch.Object);
});
//sugar.Aop.OnLogExecuted = (s, e) =>
//{
// Trace.WriteLine(s);
//};
Trace.WriteLine(s);
};
//测试前清空数据
fsql.Delete<Song>().Where(a => a.id > 0).ExecuteAffrows();
sugar.Deleteable<Song>().Where(a => a.id > 0).ExecuteCommand();
@ -548,10 +526,10 @@ namespace orm_vs
sb.Clear();
Console.WriteLine("更新:");
Update(sb, 100, 1);
Update(sb, 10, 1);
Console.Write(sb.ToString());
sb.Clear();
Update(sb, 100, 10);
Update(sb, 10, 10);
Console.Write(sb.ToString());
sb.Clear();
@ -602,7 +580,7 @@ namespace orm_vs
using (var conn = fsql.Ado.MasterPool.Get())
{
for (var a = 0; a < forTime; a++)
Dapper.SqlMapper.Query<Song>(conn.Value, $"select top {size} * from freesql_song").ToList();
Dapper.SqlMapper.Query<Song>(conn.Value, $"select * from freesql_song limit {size}").ToList();
}
sw.Stop();
sb.AppendLine($"Dapper Select {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms\r\n");
@ -612,7 +590,7 @@ namespace orm_vs
{
var songs = Enumerable.Range(0, size).Select(a => new Song
{
create_time = DateTime.Now,
create_time = DateTime.Now.ToString(),
is_deleted = false,
title = $"Insert_{a}",
url = $"Url_{a}"
@ -647,6 +625,7 @@ namespace orm_vs
try
{
for (var a = 0; a < forTime; a++)
//sugar.Fastest<Song>().BulkCopy(songs.ToList());
sugar.Insertable(songs.ToArray()).ExecuteCommand();
}
catch (Exception ex)
@ -682,7 +661,61 @@ namespace orm_vs
fsql.Update<Song>().SetSource(songs).ExecuteAffrows();
}
sw.Stop();
sb.AppendLine($"FreeSql Update {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms");
sb.AppendLine($"FreeSql Update1 {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms");
songs = fsql.Select<Song>().Limit(size).ToList();
sw.Restart();
for (var a = 0; a < forTime; a++)
{
fsql.Update<Song>().SetSource(songs).ExecutePgCopy();
}
sw.Stop();
sb.AppendLine($"FreeSql BulkCopyUpdate {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms");
// songs = fsql.Select<Song>().Limit(size).ToList();
// sw.Restart();
// for (var a = 0; a < forTime; a++)
// {
// //fsql.Update<Song>().SetSource(songs).ExecuteAffrows();
// var iou = fsql.InsertOrUpdate<Song>() as InsertOrUpdateProvider<Song>;
// var dbsql = new StringBuilder();
// var dbparms = new List<DbParameter>();
// iou.WriteSourceSelectUnionAll(songs, dbsql, dbparms);
// var sql = $@"update freesql_song a
//inner join ( {dbsql} ) b on b.id = a.id
//set a.create_time = b.create_time, a.is_deleted = b.is_deleted, a.title = b.title, a.url = b.url";
// fsql.Ado.ExecuteNonQuery(System.Data.CommandType.Text, sql, dbparms.ToArray());
// }
// sw.Stop();
// sb.AppendLine($"FreeSql Update2(update inner join) {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms");
// songs = fsql.Select<Song>().Limit(size).ToList();
// sw.Restart();
// for (var a = 0; a < forTime; a++)
// {
// var isdroped = false;
// var tempTableName = $"#Temp_freesql_song";
// fsql.Ado.ExecuteNonQuery($"select * into {tempTableName} from [freesql_song] where 1=2");
// try
// {
// fsql.Insert(songs).AsTable(tempTableName).ExecuteMySqlBulkCopy();
// var sql = $@"update freesql_song a
//inner join {tempTableName} b on b.id = a.id;
//set a.create_time = b.create_time, a.is_deleted = b.is_deleted, a.title = b.title, a.url = b.url
//; drop table {tempTableName}; ";
// fsql.Ado.ExecuteNonQuery(System.Data.CommandType.Text, sql);
// isdroped = true;
// }
// finally
// {
// if (isdroped == false)
// fsql.Ado.ExecuteNonQuery($"drop table {tempTableName}");
// }
// }
// sw.Stop();
// sb.AppendLine($"FreeSql Update3(update inner join #temp) {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms");
songs = sugar.Queryable<Song>().Take(size).ToList();
sw.Restart();
@ -690,7 +723,7 @@ namespace orm_vs
try
{
for (var a = 0; a < forTime; a++)
sugar.Updateable(songs).ExecuteCommand();
sugar.Fastest<Song>().BulkUpdate(songs);
}
catch (Exception ex)
{
@ -699,21 +732,21 @@ namespace orm_vs
sw.Stop();
sb.AppendLine($"SqlSugar Update {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms" + (sugarEx != null ? $"成绩无效,错误:{sugarEx.Message}" : ""));
using (var db = new SongContext())
{
songs = db.Songs.Take(size).AsNoTracking().ToList();
}
sw.Restart();
for (var a = 0; a < forTime; a++)
{
//using (var db = new SongContext())
//{
// songs = db.Songs.Take(size).AsNoTracking().ToList();
//}
//sw.Restart();
//for (var a = 0; a < forTime; a++)
//{
using (var db = new SongContext())
{
//db.Configuration.AutoDetectChangesEnabled = false;
//db.Songs.UpdateRange(songs.ToArray());
//db.SaveChanges();
}
}
// using (var db = new SongContext())
// {
// //db.Configuration.AutoDetectChangesEnabled = false;
// //db.Songs.UpdateRange(songs.ToArray());
// //db.SaveChanges();
// }
//}
sw.Stop();
sb.AppendLine($"EFCore Update {size}条数据,循环{forTime}次,耗时{sw.ElapsedMilliseconds}ms .net5.0无效\r\n");
}
@ -729,7 +762,7 @@ namespace orm_vs
[Key]
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
public int id { get; set; }
public DateTime? create_time { get; set; }
public string create_time { get; set; }
public bool? is_deleted { get; set; }
public string title { get; set; }
public string url { get; set; }

View File

@ -2,20 +2,20 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net7.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.35" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="3.1.4" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="3.1.0" />
<PackageReference Include="sqlSugarCore" Version="5.0.8" />
<PackageReference Include="Dapper" Version="2.0.123" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="7.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="7.0.0-alpha.1" />
<PackageReference Include="sqlSugarCore" Version="5.1.3.38" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\FreeSql\FreeSql.csproj" />
<ProjectReference Include="..\..\Providers\FreeSql.Provider.MySql\FreeSql.Provider.MySql.csproj" />
<ProjectReference Include="..\..\Providers\FreeSql.Provider.MySqlConnector\FreeSql.Provider.MySqlConnector.csproj" />
<ProjectReference Include="..\..\Providers\FreeSql.Provider.PostgreSQL\FreeSql.Provider.PostgreSQL.csproj" />
<ProjectReference Include="..\..\Providers\FreeSql.Provider.SqlServer\FreeSql.Provider.SqlServer.csproj" />
</ItemGroup>

View File

@ -83,6 +83,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "aspnetcore_transaction", "E
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FreeSql.Extensions.AggregateRoot", "Extensions\FreeSql.Extensions.AggregateRoot\FreeSql.Extensions.AggregateRoot.csproj", "{5C78C4CE-3CDC-49C3-AF34-556567B95F2A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "orm_vs", "Examples\orm_vs\orm_vs.csproj", "{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -453,6 +455,18 @@ Global
{5C78C4CE-3CDC-49C3-AF34-556567B95F2A}.Release|x64.Build.0 = Release|Any CPU
{5C78C4CE-3CDC-49C3-AF34-556567B95F2A}.Release|x86.ActiveCfg = Release|Any CPU
{5C78C4CE-3CDC-49C3-AF34-556567B95F2A}.Release|x86.Build.0 = Release|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|x64.ActiveCfg = Debug|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|x64.Build.0 = Debug|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|x86.ActiveCfg = Debug|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Debug|x86.Build.0 = Debug|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|Any CPU.Build.0 = Release|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|x64.ActiveCfg = Release|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|x64.Build.0 = Release|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|x86.ActiveCfg = Release|Any CPU
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -484,10 +498,11 @@ Global
{19D2E22A-B000-46B6-AFC8-60BF01A51C9A} = {2A381C57-2697-427B-9F10-55DA11FD02E4}
{28163C3B-B2E6-432D-AAC3-F5F19374BE31} = {94C8A78D-AA15-47B2-A348-530CD86BFC1B}
{5C78C4CE-3CDC-49C3-AF34-556567B95F2A} = {4A92E8A6-9A6D-41A1-9CDA-DE10899648AA}
{9D7EA01A-110A-4A0C-A46B-9A0FBC88DD3D} = {94C8A78D-AA15-47B2-A348-530CD86BFC1B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
RESX_NeutralResourcesLanguage = en-US
RESX_PrefixTranslations = True
SolutionGuid = {089687FA-5D21-40AC-BA8A-AA0D1E1H7F98}
RESX_PrefixTranslations = True
RESX_NeutralResourcesLanguage = en-US
EndGlobalSection
EndGlobal

View File

@ -51,7 +51,7 @@ namespace FreeSql.Internal.CommonProvider
public IDelete<T1> WithTransaction(DbTransaction transaction)
{
_transaction = transaction;
_connection = _transaction?.Connection;
if (transaction != null) _connection = transaction.Connection;
return this;
}
public IDelete<T1> WithConnection(DbConnection connection)

View File

@ -61,7 +61,7 @@ namespace FreeSql.Internal.CommonProvider
public IInsertOrUpdate<T1> WithTransaction(DbTransaction transaction)
{
_transaction = transaction;
_connection = _transaction?.Connection;
if (transaction != null) _connection = transaction.Connection;
return this;
}
public IInsertOrUpdate<T1> WithConnection(DbConnection connection)

View File

@ -78,7 +78,7 @@ namespace FreeSql.Internal.CommonProvider
public IInsert<T1> WithTransaction(DbTransaction transaction)
{
_transaction = transaction;
_connection = _transaction?.Connection;
if (transaction != null) _connection = transaction.Connection;
return this;
}
public IInsert<T1> WithConnection(DbConnection connection)

View File

@ -557,7 +557,7 @@ namespace FreeSql.Internal.CommonProvider
public TSelect WithTransaction(DbTransaction transaction)
{
_transaction = transaction;
_connection = _transaction?.Connection;
if (transaction != null) _connection = transaction.Connection;
return this as TSelect;
}
public TSelect WithConnection(DbConnection connection)

View File

@ -1,5 +1,6 @@
using FreeSql.Extensions.EntityUtil;
using FreeSql.Internal.Model;
using FreeSql.Internal.ObjectPool;
using System;
using System.Collections.Generic;
using System.Data;
@ -37,6 +38,87 @@ namespace FreeSql.Internal.CommonProvider
public int _commandTimeout = 0;
public Action<StringBuilder> _interceptSql;
public object _updateVersionValue;
public static int ExecuteBulkUpdate<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class
{
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0;
var fsql = update._orm;
var connection = update._connection;
var transaction = update._transaction;
Object<DbConnection> poolConn = null;
if (connection == null)
{
poolConn = fsql.Ado.MasterPool.Get();
connection = poolConn.Value;
}
try
{
var droped = false;
fsql.Ado.CommandFluent(state.Item1).WithConnection(connection).WithTransaction(transaction).ExecuteNonQuery();
try
{
funcBulkCopy(fsql.Insert<T1>(update._source)
.AsType(update._table.Type)
.WithConnection(connection)
.WithTransaction(transaction)
.InsertIdentity().AsTable(state.Item4));
var affrows = fsql.Ado.CommandFluent(state.Item2 + ";" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQuery();
droped = true;
return affrows;
}
finally
{
if (droped == false) fsql.Ado.CommandFluent(state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQuery();
}
}
finally
{
poolConn?.Dispose();
}
}
#if net40
#else
async public static Task<int> ExecuteBulkUpdateAsync<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class
{
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0;
var fsql = update._orm;
var connection = update._connection;
var transaction = update._transaction;
Object<DbConnection> poolConn = null;
if (connection == null)
{
poolConn = await fsql.Ado.MasterPool.GetAsync();
connection = poolConn.Value;
}
try
{
var droped = false;
await fsql.Ado.CommandFluent(state.Item1).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync();
try
{
await funcBulkCopy(fsql.Insert<T1>(update._source)
.AsType(update._table.Type)
.WithConnection(connection)
.WithTransaction(transaction)
.InsertIdentity().AsTable(state.Item4));
var affrows = await fsql.Ado.CommandFluent(state.Item2 + ";" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync();
droped = true;
return affrows;
}
finally
{
if (droped == false) await fsql.Ado.CommandFluent(state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync();
}
}
finally
{
poolConn?.Dispose();
}
}
#endif
}
public abstract partial class UpdateProvider<T1> : UpdateProvider, IUpdate<T1>
@ -95,7 +177,7 @@ namespace FreeSql.Internal.CommonProvider
public IUpdate<T1> WithTransaction(DbTransaction transaction)
{
_transaction = transaction;
_connection = _transaction?.Connection;
if (transaction != null) _connection = transaction.Connection;
return this;
}
public IUpdate<T1> WithConnection(DbConnection connection)

View File

@ -4,6 +4,11 @@ using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using FreeSql.Internal.Model;
using FreeSql.Internal.CommonProvider;
using FreeSql.Internal.ObjectPool;
using System.Linq;
using System.Data.Common;
#if MySqlConnector
using MySqlConnector;
#else
@ -13,6 +18,78 @@ using MySql.Data.MySqlClient;
public static class FreeSqlMySqlConnectorGlobalExtensions
{
#region ExecuteMySqlBulkCopy
/// <summary>
/// 批量更新(更新字段数量超过 2000 时收益大)<para></para>
/// 实现原理:使用 MySqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="that"></param>
/// <param name="bulkCopyTimeout"></param>
/// <returns></returns>
public static int ExecuteMySqlBulkCopy<T>(this IUpdate<T> that, int? bulkCopyTimeout = null) where T : class
{
var update = that as UpdateProvider<T>;
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0;
var state = ExecuteMySqlBulkCopyState(update);
return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecuteMySqlBulkCopy(bulkCopyTimeout));
}
static NativeTuple<string, string, string, string> ExecuteMySqlBulkCopyState<T>(UpdateProvider<T> update) where T : class
{
if (update._source.Any() != true) return null;
var _table = update._table;
var _tempPrimarys = update._tempPrimarys;
var _commonUtils = update._commonUtils;
var _ignore = update._ignore;
var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName;
var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}";
if (update._connection == null)
{
if (update._orm.Ado.TransactionCurrentThread != null)
update.WithTransaction(update._orm.Ado.TransactionCurrentThread);
}
var sb = new StringBuilder();
sb.Append("CREATE TEMPORARY TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( ");
foreach (var tbcol in _table.ColumnsByPosition)
{
sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(tbcol.Attribute.Name)).Append(" ").Append(tbcol.Attribute.DbType);
sb.Append(",");
}
//if (_tempPrimarys.Any())
//{
// sb.Append(" \r\n PRIMARY KEY (");
// foreach (var tbcol in _tempPrimarys) sb.Append(_commonUtils.QuoteSqlName(tbcol.Attribute.Name)).Append(", ");
// sb.Remove(sb.Length - 2, 2).Append("),");
//}
var sql1 = sb.Remove(sb.Length - 1, 1).Append(" \r\n) Engine=InnoDB;").ToString();
sb.Clear().Append("UPDATE ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ")
.Append(" \r\nINNER JOIN ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b ON ");
for (var a = 0; a < _tempPrimarys.Length; a++)
{
var pkname = _commonUtils.QuoteSqlName(_tempPrimarys[a].Attribute.Name);
if (a > 0) sb.Append(" AND ");
sb.Append("b.").Append(pkname).Append(" = a.").Append(pkname);
}
sb.Append("\r\n SET ");
var colidx = 0;
foreach (var col in _table.Columns.Values)
{
if (col.Attribute.IsPrimary) continue;
if (_tempPrimarys.Any(a => a.CsName == col.CsName)) continue;
if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && _ignore.ContainsKey(col.Attribute.Name) == false)
{
if (colidx > 0) sb.Append(",");
sb.Append(" \r\n a.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" = ").Append("b.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name));
++colidx;
}
}
var sql2 = sb.ToString();
sb.Clear();
var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}";
return NativeTuple.Create(sql1, sql2, sql3, tempTableName);
}
/// <summary>
/// MySql MySqlCopyBulk 批量插入功能<para></para>
/// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列<para></para>
@ -93,6 +170,13 @@ public static class FreeSqlMySqlConnectorGlobalExtensions
}
#if net40
#else
public static Task<int> ExecuteMySqlBulkCopyAsync<T>(this IUpdate<T> that, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
{
var update = that as UpdateProvider<T>;
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return Task.FromResult(0);
var state = ExecuteMySqlBulkCopyState(update);
return UpdateProvider.ExecuteBulkUpdateAsync(update, state, insert => insert.ExecuteMySqlBulkCopyAsync(bulkCopyTimeout, cancellationToken));
}
async public static Task ExecuteMySqlBulkCopyAsync<T>(this IInsert<T> that, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
{
var insert = that as FreeSql.MySql.Curd.MySqlInsert<T>;

View File

@ -1,9 +1,12 @@
using FreeSql;
using FreeSql.Internal.CommonProvider;
using FreeSql.Internal.Model;
using FreeSql.PostgreSQL.Curd;
using Npgsql;
using System;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
@ -32,6 +35,69 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions
public static OnConflictDoUpdate<T1> OnConflictDoUpdate<T1>(this IInsert<T1> that, Expression<Func<T1, object>> columns = null) where T1 : class => new FreeSql.PostgreSQL.Curd.OnConflictDoUpdate<T1>(that.InsertIdentity(), columns);
#region ExecutePgCopy
/// <summary>
/// 批量更新(更新字段数量超过 2000 时收益大)<para></para>
/// 实现原理:使用 PgCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="that"></param>
/// <returns></returns>
public static int ExecutePgCopy<T>(this IUpdate<T> that) where T : class
{
var update = that as UpdateProvider<T>;
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0;
var state = ExecutePgCopyState(update);
return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecutePgCopy());
}
static NativeTuple<string, string, string, string> ExecutePgCopyState<T>(UpdateProvider<T> update) where T : class
{
if (update._source.Any() != true) return null;
var _table = update._table;
var _tempPrimarys = update._tempPrimarys;
var _commonUtils = update._commonUtils;
var _ignore = update._ignore;
var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName;
var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}";
if (update._connection == null)
{
if (update._orm.Ado.TransactionCurrentThread != null)
update.WithTransaction(update._orm.Ado.TransactionCurrentThread);
}
var sb = new StringBuilder();
sb.Append("CREATE TEMP TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( ");
foreach (var tbcol in _table.ColumnsByPosition)
{
sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(tbcol.Attribute.Name)).Append(" ").Append(tbcol.Attribute.DbType);
sb.Append(",");
}
var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) WITH (OIDS=FALSE);").ToString();
sb.Clear().Append("UPDATE ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a \r\nSET ");
var colidx = 0;
foreach (var col in _table.Columns.Values)
{
if (col.Attribute.IsPrimary) continue;
if (_tempPrimarys.Any(a => a.CsName == col.CsName)) continue;
if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && _ignore.ContainsKey(col.Attribute.Name) == false)
{
if (colidx > 0) sb.Append(",");
sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" = ").Append("b.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name));
++colidx;
}
}
sb.Append(" \r\nFROM ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b \r\nWHERE ");
for (var a = 0; a < _tempPrimarys.Length; a++)
{
var pkname = _commonUtils.QuoteSqlName(_tempPrimarys[a].Attribute.Name);
if (a > 0) sb.Append(" AND ");
sb.Append("b.").Append(pkname).Append(" = a.").Append(pkname);
}
var sql2 = sb.ToString();
sb.Clear();
var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}";
return NativeTuple.Create(sql1, sql2, sql3, tempTableName);
}
/// <summary>
/// PostgreSQL COPY 批量导入功能,封装了 NpgsqlConnection.BeginBinaryImport 方法<para></para>
/// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列<para></para>
@ -121,6 +187,13 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions
#if net45
#else
public static Task<int> ExecutePgCopyAsync<T>(this IUpdate<T> that, CancellationToken cancellationToken = default) where T : class
{
var update = that as UpdateProvider<T>;
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return Task.FromResult(0);
var state = ExecutePgCopyState(update);
return UpdateProvider.ExecuteBulkUpdateAsync(update, state, insert => insert.ExecutePgCopyAsync(cancellationToken));
}
async public static Task ExecutePgCopyAsync<T>(this IInsert<T> that, CancellationToken cancellationToken = default) where T : class
{
var insert = that as FreeSql.PostgreSQL.Curd.PostgreSQLInsert<T>;

View File

@ -5,6 +5,10 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using FreeSql.Internal.CommonProvider;
using System.Linq;
using System.Text;
using System.Data.Common;
using FreeSql.Internal.ObjectPool;
#if microsoft
using Microsoft.Data.SqlClient;
#else
@ -115,6 +119,65 @@ public static partial class FreeSqlSqlServerGlobalExtensions
internal static ConcurrentDictionary<Guid, NativeTuple<SqlServerLock, Dictionary<Type, bool>>> _dicSetGlobalSelectWithLock = new ConcurrentDictionary<Guid, NativeTuple<SqlServerLock, Dictionary<Type, bool>>>();
#region ExecuteSqlBulkCopy
/// <summary>
/// 批量更新(更新字段数量超过 2000 时收益大)<para></para>
/// 实现原理:使用 SqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="that"></param>
/// <param name="copyOptions"></param>
/// <param name="batchSize"></param>
/// <param name="bulkCopyTimeout"></param>
/// <returns></returns>
public static int ExecuteSqlBulkCopy<T>(this IUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null) where T : class
{
var update = that as UpdateProvider<T>;
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0;
var state = ExecuteSqlBulkCopyState(update);
return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecuteSqlBulkCopy(copyOptions, batchSize, bulkCopyTimeout));
}
static NativeTuple<string, string, string, string> ExecuteSqlBulkCopyState<T>(UpdateProvider<T> update) where T : class
{
if (update._source.Any() != true) return null;
var _table = update._table;
var _tempPrimarys = update._tempPrimarys;
var _commonUtils = update._commonUtils;
var _ignore = update._ignore;
var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName;
var tempTableName = $"#Temp_{updateTableName}";
if (update._connection == null)
{
if (update._orm.Ado.TransactionCurrentThread != null)
update.WithTransaction(update._orm.Ado.TransactionCurrentThread);
}
var sql1 = $"SELECT * INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2";
var sb = new StringBuilder().Append("UPDATE ").Append(" a SET \r\n ");
var colidx = 0;
foreach (var col in _table.Columns.Values)
{
if (col.Attribute.IsPrimary) continue;
if (_tempPrimarys.Any(a => a.CsName == col.CsName)) continue;
if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && _ignore.ContainsKey(col.Attribute.Name) == false)
{
if (colidx > 0) sb.Append(", \r\n ");
sb.Append(" a.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" = ").Append("b.").Append(_commonUtils.QuoteSqlName(col.Attribute.Name));
++colidx;
}
}
sb.Append(" \r\nFROM ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ")
.Append(" \r\nINNER JOIN ").Append(tempTableName).Append(" b ON ");
for (var a = 0; a < _tempPrimarys.Length; a++)
{
var pkname = _commonUtils.QuoteSqlName(_tempPrimarys[a].Attribute.Name);
if (a > 0) sb.Append(" AND ");
sb.Append("b.").Append(pkname).Append(" = a.").Append(pkname);
}
var sql2 = sb.ToString();
sb.Clear();
var sql3 = $"DROP TABLE {tempTableName}";
return NativeTuple.Create(sql1, sql2, sql3, tempTableName);
}
/// <summary>
/// SqlServer SqlCopyBulk 批量插入功能<para></para>
/// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列<para></para>
@ -214,6 +277,13 @@ public static partial class FreeSqlSqlServerGlobalExtensions
}
#if net40
#else
public static Task<int> ExecuteSqlBulkCopyAsync<T>(this IUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
{
var update = that as UpdateProvider<T>;
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return Task.FromResult(0);
var state = ExecuteSqlBulkCopyState(update);
return UpdateProvider.ExecuteBulkUpdateAsync(update, state, insert => insert.ExecuteSqlBulkCopyAsync(copyOptions, batchSize, bulkCopyTimeout, cancellationToken));
}
async public static Task ExecuteSqlBulkCopyAsync<T>(this IInsert<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
{
var insert = that as FreeSql.SqlServer.Curd.SqlServerInsert<T>;