修改异步插入或修改时由于无法获取执行状态导致的无限循环,添加decimal数据类型(最后一位小数点精度会丢失)

This commit is contained in:
chenbo
2021-12-02 16:33:48 +08:00
parent ae0f2e9d0d
commit 0212577933
9 changed files with 257 additions and 23 deletions

View File

@ -35,6 +35,7 @@ namespace FreeSql.ClickHouse
{ typeof(double).FullName, CsToDb.New(DbType.Double, "Float64", "Float64", false, false, 0) },{ typeof(double?).FullName, CsToDb.New(DbType.Double, "Float64", "Nullable(Float64)", false, true, null) },
{ typeof(float).FullName, CsToDb.New(DbType.Single, "Float32","Float32", false, false, 0) },{ typeof(float?).FullName, CsToDb.New(DbType.Single, "Float32","Nullable(Float32)", false, true, null) },
{ typeof(decimal).FullName, CsToDb.New(DbType.Decimal, "Decimal128(19)","Decimal128(19)", false, false, 0) },{ typeof(decimal?).FullName, CsToDb.New(DbType.Decimal, "Nullable(Decimal128(19))","Nullable(Decimal128(19))", false, true, null) },
{ typeof(DateTime).FullName, CsToDb.New(DbType.DateTime, "DateTime('Asia/Shanghai')", "DateTime('Asia/Shanghai')", false, false, new DateTime(1970,1,1)) },{ typeof(DateTime?).FullName, CsToDb.New(DbType.DateTime, "DateTime('Asia/Shanghai')", "Nullable(DateTime('Asia/Shanghai'))", false, true, null) },
@ -114,13 +115,14 @@ namespace FreeSql.ClickHouse
foreach (var uk in tb.Indexes)
{
sb.Append(" \r\n ");
sb.Append("INDEX ").Append(_commonUtils.QuoteSqlName(ReplaceIndexName(uk.Name, tbname[1]))).Append("(");
sb.Append("INDEX ").Append(_commonUtils.QuoteSqlName(ReplaceIndexName(uk.Name, tbname[1])));
foreach (var tbcol in uk.Columns)
{
sb.Append(" ");
sb.Append(_commonUtils.QuoteSqlName(tbcol.Column.Attribute.Name));
sb.Append("TYPE set(8192) GRANULARITY 5, ");
sb.Append("TYPE set(8192) GRANULARITY 5, ");
}
sb.Remove(sb.Length - 2, 2).Append("),");
sb.Remove(sb.Length - 2, 2);
}
sb.Remove(sb.Length - 1, 1);
sb.Append("\r\n) ");

View File

@ -67,6 +67,9 @@ namespace FreeSql.ClickHouse
case "Float32":
case "float":
case "nullable(float32)": return DbType.Single;
case "decimal":
case "decimal128":
case "nullable(decimal128)": return DbType.Decimal;
case "date":
case "nullable(date)": return DbType.Date;
case "datetime":
@ -103,6 +106,7 @@ namespace FreeSql.ClickHouse
{ (int)DbType.Double, new DbToCs("(double?)", "double.Parse({0})", "{0}.ToString()", "double?", typeof(double), typeof(double?), "{0}.Value", "GetDouble") },
{ (int)DbType.Single, new DbToCs("(float?)", "float.Parse({0})", "{0}.ToString()", "float?", typeof(float), typeof(float?), "{0}.Value", "GetFloat") },
{ (int)DbType.Decimal, new DbToCs("(decimal?)", "decimal.Parse({0})", "{0}.ToString()", "decimal?", typeof(decimal), typeof(decimal?), "{0}.Value", "GetDecimal") },
{ (int)DbType.Date, new DbToCs("(DateTime?)", "new DateTime(long.Parse({0}))", "{0}.Ticks.ToString()", "DateTime?", typeof(DateTime), typeof(DateTime?), "{0}.Value", "GetDateTime") },
{ (int)DbType.Date, new DbToCs("(DateTime?)", "new DateTime(long.Parse({0}))", "{0}.Ticks.ToString()", "DateTime?", typeof(DateTime), typeof(DateTime?), "{0}.Value", "GetDateTime") },

View File

@ -35,7 +35,7 @@ namespace FreeSql.ClickHouse
case "System.Byte": return $"cast({getExp(operandExp)} as Int8)";
case "System.Char": return $"substr(cast({getExp(operandExp)} as String), 1, 1)";
case "System.DateTime": return $"cast({getExp(operandExp)} as DateTime)";
case "System.Decimal": return $"cast({getExp(operandExp)} as Float64)";
case "System.Decimal": return $"cast({getExp(operandExp)} as Decimal128(19))";
case "System.Double": return $"cast({getExp(operandExp)} as Float64)";
case "System.Int16": return $"cast({getExp(operandExp)} as Int16)";
case "System.Int32": return $"cast({getExp(operandExp)} as Int32)";
@ -63,7 +63,7 @@ namespace FreeSql.ClickHouse
case "System.Byte": return $"cast({getExp(callExp.Arguments[0])} as Int8)";
case "System.Char": return $"substr(cast({getExp(callExp.Arguments[0])} as String), 1, 1)";
case "System.DateTime": return $"cast({getExp(callExp.Arguments[0])} as DateTime)";
case "System.Decimal": return $"cast({getExp(callExp.Arguments[0])} as Float64)";
case "System.Decimal": return $"cast({getExp(callExp.Arguments[0])} as Decimal128(19))";
case "System.Double": return $"cast({getExp(callExp.Arguments[0])} as Float64)";
case "System.Int16": return $"cast({getExp(callExp.Arguments[0])} as Int16)";
case "System.Int32": return $"cast({getExp(callExp.Arguments[0])} as Int32)";
@ -556,7 +556,7 @@ namespace FreeSql.ClickHouse
case "ToByte": return $"cast({getExp(exp.Arguments[0])} as Int8)";
case "ToChar": return $"substr(cast({getExp(exp.Arguments[0])} as String), 1, 1)";
case "ToDateTime": return $"cast({getExp(exp.Arguments[0])} as DateTime)";
case "ToDecimal": return $"cast({getExp(exp.Arguments[0])} as Float64)";
case "ToDecimal": return $"cast({getExp(exp.Arguments[0])} as Decimal128(19))";
case "ToDouble": return $"cast({getExp(exp.Arguments[0])} as Float64)";
case "ToInt16":
case "ToInt32":

View File

@ -55,7 +55,6 @@ namespace FreeSql.ClickHouse.Curd
{
try
{
Debug.WriteLine($"开始执行时间:{DateTime.Now}");
before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, null, _params);
_orm.Aop.CurdBeforeHandler?.Invoke(this, before);
using var bulkCopyInterface = new ClickHouseBulkCopy(_orm.Ado.MasterPool.Get().Value as ClickHouseConnection)
@ -170,9 +169,9 @@ namespace FreeSql.ClickHouse.Curd
#if net40
#else
public override Task<int> ExecuteAffrowsAsync(CancellationToken cancellationToken = default) => base.SplitExecuteAffrowsAsync(_batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, _batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, cancellationToken);
public override Task<long> ExecuteIdentityAsync(CancellationToken cancellationToken = default) => base.SplitExecuteIdentityAsync(_batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, _batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, cancellationToken);
public override Task<List<T1>> ExecuteInsertedAsync(CancellationToken cancellationToken = default) => base.SplitExecuteInsertedAsync(_batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, _batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, cancellationToken);
public override Task<int> ExecuteAffrowsAsync(CancellationToken cancellationToken = default) => SplitExecuteAffrowsAsync(_batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, _batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, cancellationToken);
public override Task<long> ExecuteIdentityAsync(CancellationToken cancellationToken = default) => SplitExecuteIdentityAsync(_batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, _batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, cancellationToken);
public override Task<List<T1>> ExecuteInsertedAsync(CancellationToken cancellationToken = default) => SplitExecuteInsertedAsync(_batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, _batchValuesLimit > 0 ? _batchValuesLimit : int.MaxValue, cancellationToken);
async protected override Task<long> RawExecuteIdentityAsync(CancellationToken cancellationToken = default)
{
@ -255,7 +254,8 @@ namespace FreeSql.ClickHouse.Curd
if (ss.Length == 1)
{
_batchProgress?.Invoke(new BatchProgressStatus<T1>(_source, 1, 1));
ret = await this.RawExecuteAffrowsAsync(cancellationToken);
await this.RawExecuteAffrowsAsync(cancellationToken);
ret = _source.Count;
ClearData();
return ret;
}
@ -269,7 +269,8 @@ namespace FreeSql.ClickHouse.Curd
{
_source = ss[a];
_batchProgress?.Invoke(new BatchProgressStatus<T1>(_source, a + 1, ss.Length));
ret += await this.RawExecuteAffrowsAsync(cancellationToken);
await this.RawExecuteAffrowsAsync(cancellationToken);
ret += _source.Count;
}
}
catch (Exception ex)

View File

@ -293,18 +293,20 @@ namespace FreeSql.ClickHouse.Curd
#if net40
#else
public override Task<int> ExecuteAffrowsAsync(CancellationToken cancellationToken = default) => base.SplitExecuteAffrowsAsync(_batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken);
public override Task<List<T1>> ExecuteUpdatedAsync(CancellationToken cancellationToken = default) => base.SplitExecuteUpdatedAsync(_batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken);
public override Task<int> ExecuteAffrowsAsync(CancellationToken cancellationToken = default) => SplitExecuteAffrowsAsync(_batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken);
public override Task<List<T1>> ExecuteUpdatedAsync(CancellationToken cancellationToken = default) => SplitExecuteUpdatedAsync(_batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken);
protected override Task<List<T1>> RawExecuteUpdatedAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException("FreeSql.ClickHouse.Custom 未实现该功能 未实现该功能");
async protected override Task<int> SplitExecuteAffrowsAsync(int valuesLimit, int parameterLimit, CancellationToken cancellationToken = default)
{
var ss = SplitSource(valuesLimit, parameterLimit);
var ret = 0;
if (ss.Length <= 1)
{
if (_source?.Any() == true) _batchProgress?.Invoke(new BatchProgressStatus<T1>(_source, 1, 1));
ret = await this.RawExecuteAffrowsAsync(cancellationToken);
await this.RawExecuteAffrowsAsync(cancellationToken);
ret = _source.Count;
ClearData();
return ret;
}
@ -319,7 +321,8 @@ namespace FreeSql.ClickHouse.Curd
{
_source = ss[a];
_batchProgress?.Invoke(new BatchProgressStatus<T1>(_source, a + 1, ss.Length));
ret += await this.RawExecuteAffrowsAsync(cancellationToken);
await this.RawExecuteAffrowsAsync(cancellationToken);
ret += _source.Count;
}
}
catch (Exception ex)