From 1e55cf4e25c039cc0893af9ddec1c1b0712b60fe Mon Sep 17 00:00:00 2001 From: 2881099 <2881099@qq.com> Date: Wed, 23 Feb 2022 11:55:39 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E4=BF=AE=E5=A4=8D=20Clickhouse=20=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=B1=A0=E4=BD=BF=E7=94=A8=E9=97=AE=E9=A2=98=EF=BC=9B?= =?UTF-8?q?#646=20#968=20#969=20#943?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- FreeSql.DbContext/FreeSql.DbContext.xml | 9 --- .../Curd/ClickHouseInsert.cs | 76 +++++++++++++++++-- 2 files changed, 68 insertions(+), 17 deletions(-) diff --git a/FreeSql.DbContext/FreeSql.DbContext.xml b/FreeSql.DbContext/FreeSql.DbContext.xml index bdd16ff9..da7ace6b 100644 --- a/FreeSql.DbContext/FreeSql.DbContext.xml +++ b/FreeSql.DbContext/FreeSql.DbContext.xml @@ -538,14 +538,5 @@ - - - 批量注入 Repository,可以参考代码自行调整 - - - - - - diff --git a/Providers/FreeSql.Provider.ClickHouse/Curd/ClickHouseInsert.cs b/Providers/FreeSql.Provider.ClickHouse/Curd/ClickHouseInsert.cs index ce6b2f9c..db63dad4 100644 --- a/Providers/FreeSql.Provider.ClickHouse/Curd/ClickHouseInsert.cs +++ b/Providers/FreeSql.Provider.ClickHouse/Curd/ClickHouseInsert.cs @@ -50,20 +50,23 @@ namespace FreeSql.ClickHouse.Curd { var affrows = 0; Exception exception = null; - Aop.CurdBeforeEventArgs before=null; - if (_source.Count>1) + Aop.CurdBeforeEventArgs before = null; + if (_source.Count > 1) { try { before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, null, _params); _orm.Aop.CurdBeforeHandler?.Invoke(this, before); - using var bulkCopyInterface = new ClickHouseBulkCopy(_orm.Ado.MasterPool.Get().Value as ClickHouseConnection) + using (var conn = _orm.Ado.MasterPool.Get()) { - DestinationTableName = _table.DbName, - BatchSize = _source.Count - }; - var data=ToDataTable(); - bulkCopyInterface.WriteToServerAsync(data, default).Wait(); + using var bulkCopyInterface = new ClickHouseBulkCopy(conn.Value as ClickHouseConnection) + { + DestinationTableName = _table.DbName, + BatchSize = _source.Count + }; + var data = ToDataTable(); + bulkCopyInterface.WriteToServerAsync(data, default).Wait(); + } return affrows; } catch (Exception ex) @@ -173,6 +176,63 @@ namespace FreeSql.ClickHouse.Curd public override Task ExecuteIdentityAsync(CancellationToken cancellationToken = default) => SplitExecuteIdentityAsync(_batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, _batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, cancellationToken); public override Task> ExecuteInsertedAsync(CancellationToken cancellationToken = default) => SplitExecuteInsertedAsync(_batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, _batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, cancellationToken); + async protected override Task RawExecuteAffrowsAsync(CancellationToken cancellationToken = default) + { + var affrows = 0; + Exception exception = null; + Aop.CurdBeforeEventArgs before = null; + if (_source.Count > 1) + { + try + { + before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, null, _params); + _orm.Aop.CurdBeforeHandler?.Invoke(this, before); + using (var conn = await _orm.Ado.MasterPool.GetAsync()) + { + using var bulkCopyInterface = new ClickHouseBulkCopy(conn.Value as ClickHouseConnection) + { + DestinationTableName = _table.DbName, + BatchSize = _source.Count + }; + var data = ToDataTable(); + await bulkCopyInterface.WriteToServerAsync(data, default); + } + return affrows; + } + catch (Exception ex) + { + exception = ex; + throw ex; + } + finally + { + var after = new Aop.CurdAfterEventArgs(before, exception, affrows); + _orm.Aop.CurdAfterHandler?.Invoke(this, after); + } + } + else + { + var sql = this.ToSql(); + before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, sql, _params); + _orm.Aop.CurdBeforeHandler?.Invoke(this, before); + try + { + affrows = await _orm.Ado.ExecuteNonQueryAsync(_connection, null, CommandType.Text, sql, _commandTimeout, _params); + } + catch (Exception ex) + { + exception = ex; + throw ex; + } + finally + { + var after = new Aop.CurdAfterEventArgs(before, exception, affrows); + _orm.Aop.CurdAfterHandler?.Invoke(this, after); + } + return affrows; + } + } + async protected override Task RawExecuteIdentityAsync(CancellationToken cancellationToken = default) { //var sql = this.ToSql();