更改批量插入数据的方式(测试100w条5秒不到),更改数据修改时的替换可能替换到数据的问题

This commit is contained in:
ChenBo
2021-11-28 21:37:10 +08:00
parent 4c7e04376f
commit 846c180191
4 changed files with 104 additions and 18 deletions

View File

@ -1,9 +1,12 @@
using FreeSql.Internal;
using ClickHouse.Client.ADO;
using ClickHouse.Client.Copy;
using FreeSql.Internal;
using FreeSql.Internal.Model;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -30,9 +33,9 @@ namespace FreeSql.ClickHouse.Curd
internal Dictionary<string, bool> InternalIgnore => _ignore;
internal void InternalClearData() => ClearData();
public override int ExecuteAffrows() => base.SplitExecuteAffrows(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000);
public override long ExecuteIdentity() => base.SplitExecuteIdentity(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000);
public override List<T1> ExecuteInserted() => base.SplitExecuteInserted(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000);
public override int ExecuteAffrows() => base.SplitExecuteAffrows(int.MaxValue, int.MaxValue);
public override long ExecuteIdentity() => base.SplitExecuteIdentity(int.MaxValue, int.MaxValue);
public override List<T1> ExecuteInserted() => base.SplitExecuteInserted(int.MaxValue, int.MaxValue);
public override string ToSql()
@ -42,6 +45,81 @@ namespace FreeSql.ClickHouse.Curd
return $"INSERT IGNORE INTO {sql.Substring(12)}";
}
protected override int RawExecuteAffrows()
{
var affrows = 0;
Exception exception = null;
Aop.CurdBeforeEventArgs before=null;
if (_source.Count>1)
{
try
{
before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, null, _params);
_orm.Aop.CurdBeforeHandler?.Invoke(this, before);
using var bulkCopyInterface = new ClickHouseBulkCopy(_orm.Ado.MasterPool.Get().Value as ClickHouseConnection)
{
DestinationTableName = _table.DbName,
BatchSize = _source.Count
};
bulkCopyInterface.WriteToServerAsync(ToDataTable(),default).Wait();
return affrows;
}
catch (Exception ex)
{
exception = ex;
throw ex;
}
finally
{
var after = new Aop.CurdAfterEventArgs(before, exception, affrows);
_orm.Aop.CurdAfterHandler?.Invoke(this, after);
}
}
else
{
var sql = this.ToSql();
before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, sql, _params);
_orm.Aop.CurdBeforeHandler?.Invoke(this, before);
try
{
affrows = _orm.Ado.ExecuteNonQuery(_connection, _transaction, CommandType.Text, sql, _commandTimeout, _params);
}
catch (Exception ex)
{
exception = ex;
throw ex;
}
finally
{
var after = new Aop.CurdAfterEventArgs(before, exception, affrows);
_orm.Aop.CurdAfterHandler?.Invoke(this, after);
}
return affrows;
}
}
private IDictionary<string, object> GetValue<T>(T u, System.Reflection.PropertyInfo[] columns)
{
try
{
Dictionary<string, object> dic = new Dictionary<string, object>();
foreach (var item in columns)
{
object v = null;
if (u != null)
{
v = item.GetValue(u);
}
dic.TryAdd(item.Name, v);
}
return dic;
}
catch (Exception e)
{
throw;
}
}
protected override long RawExecuteIdentity()
{
var sql = this.ToSql();