diff --git a/FreeSql.Tests/FreeSql.Tests/QuestDb/Crud/QuestDbTestInsertAndUpdate.cs b/FreeSql.Tests/FreeSql.Tests/QuestDb/Crud/QuestDbTestInsertAndUpdate.cs index f0b5883b..cde03601 100644 --- a/FreeSql.Tests/FreeSql.Tests/QuestDb/Crud/QuestDbTestInsertAndUpdate.cs +++ b/FreeSql.Tests/FreeSql.Tests/QuestDb/Crud/QuestDbTestInsertAndUpdate.cs @@ -110,6 +110,124 @@ namespace FreeSql.Tests.QuestDb.Crud Assert.True(result > 0); } + [Fact] + public async Task TestRestInsertAsync() + { + var result = await restFsql.Insert(new QuestDb_Model_Test01() + { + Primarys = Guid.NewGuid().ToString(), + CreateTime = DateTime.Now, + Activos = 100.21, + Id = "IdAsync", + IsCompra = true, + NameInsert = "NameInsert", + NameUpdate = "NameUpdate" + }).ExecuteAffrowsAsync(); + Assert.True(result > 0); + } + + [Fact] + public async Task TestRestInsertBatchAsync() + { + var list = new List() + { + new QuestDb_Model_Test01() + { + Primarys = Guid.NewGuid().ToString(), + CreateTime = DateTime.Now, + Activos = 100.21, + Id = "1", + IsCompra = true, + NameInsert = "NameInsertAsync", + NameUpdate = "NameUpdate" + }, + new QuestDb_Model_Test01() + { + Primarys = Guid.NewGuid().ToString(), + CreateTime = DateTime.Now, + Activos = 100.21, + Id = "2", + IsCompra = true, + NameInsert = "NameInsertAsync", + NameUpdate = "NameUpdate" + }, + new QuestDb_Model_Test01() + { + Primarys = Guid.NewGuid().ToString(), + CreateTime = DateTime.Now, + Activos = 100.21, + Id = "3", + IsCompra = true, + NameInsert = "NameInsertAsync", + NameUpdate = "NameUpdate" + }, + }; + var result = await restFsql.Insert(list).ExecuteAffrowsAsync(); + Assert.True(result > 0); + } + + [Fact] + public async Task TestRestInsertColumnsAsync() + { + var list = new List() + { + new QuestDb_Model_Test01() + { + Primarys = Guid.NewGuid().ToString(), + CreateTime = DateTime.Now, + Activos = 100.21, + Id = "1", + IsCompra = true, + NameInsert = "NameInsert", + NameUpdate = "NameUpdate" + }, + new QuestDb_Model_Test01() + { + Primarys = Guid.NewGuid().ToString(), + CreateTime = DateTime.Now, + Activos = 100.21, + Id = "2", + IsCompra = true, + NameInsert = "NameInsert", + NameUpdate = "NameUpdate" + }, + new QuestDb_Model_Test01() + { + Primarys = Guid.NewGuid().ToString(), + CreateTime = DateTime.Now, + Activos = 100.21, + Id = "3", + IsCompra = true, + NameInsert = "NameInsert", + NameUpdate = "NameUpdate" + }, + }; + var result = await restFsql.Insert(list).IgnoreColumns(q => q.NameInsert).ExecuteAffrowsAsync(); + Assert.True(result > 0); + } + + [Fact] + public async Task TestSqlBulkCopy() + { + var list = new List(); + for (int i = 0; i < 10; i++) + { + list.Add(new QuestDb_Model_Test01() + { + Primarys = Guid.NewGuid().ToString(), + CreateTime = DateTime.Now, + Activos = 100 + i, + Id = "1", + IsCompra = true, + NameInsert = "NameInsertAsync", + NameUpdate = "NameUpdate" + }); + } + var result = await restFsql.Insert(list).ExecuteBulkCopyAsync(); + Assert.True(result > 0); + } + + [Fact, Order(4)] public void TestNormalUpdate() { @@ -209,5 +327,7 @@ WHERE (""Id"" = '{primary}')", sql); .Set(q => q.UpdateTime, DateTime.Now) .ExecuteAffrowsAsync(); } + + } } \ No newline at end of file diff --git a/FreeSql.Tests/FreeSql.Tests/QuestDb/Crud/QuestDbTestUpdate.cs b/FreeSql.Tests/FreeSql.Tests/QuestDb/Crud/QuestDbTestUpdate.cs index c70c30e2..edb56d55 100644 --- a/FreeSql.Tests/FreeSql.Tests/QuestDb/Crud/QuestDbTestUpdate.cs +++ b/FreeSql.Tests/FreeSql.Tests/QuestDb/Crud/QuestDbTestUpdate.cs @@ -14,6 +14,104 @@ namespace FreeSql.Tests.QuestDb.Crud [TestCaseOrderer("FreeSql.Tests.QuestDb.Utils.TestOrders", "FreeSql.Tests")] public class QuestDbTestUpdate { - //多线程以及questdb问题转移至 insert中测试 + [Fact] + public void TestNormalRestUpdate() + { + var updateTime = DateTime.Now; + var updateObj = restFsql.Update() + .Set(q => q.NameUpdate, "UpdateNow") + // .Set(q => q.CreateTime, DateTime.Now) 分表的时间不可以随便改 + .Where(q => q.Id == "1"); + var updateSql = updateObj.ToSql(); + Debug.WriteLine(updateSql); + var sql = + $@"UPDATE ""QuestDb_Model_Test01"" SET ""NameUpdate"" = 'UpdateNow' +WHERE (""Id"" = '1')"; + Debug.WriteLine(sql); + Assert.Equal(updateSql, sql); + var result = updateObj.ExecuteAffrows(); + Assert.True(result > 0); + } + + [Fact] + public void TestRestUpdateByModel() + { + var primary = Guid.NewGuid().ToString(); + //先插入 + restFsql.Insert(new QuestDb_Model_Test01() + { + Primarys = primary, + CreateTime = DateTime.Now, + Activos = 100.21, + Id = primary, + IsCompra = true, + NameInsert = "NameInsert", + NameUpdate = "NameUpdate" + }).ExecuteAffrows(); + var updateModel = new QuestDb_Model_Test01 + { + Primarys = primary, + Id = primary, + Activos = 12.65, + }; + var updateObj = restFsql.Update().SetSourceIgnore(updateModel, o => o == null); + var sql = updateObj.ToSql(); + Debug.WriteLine(sql); + var result = updateObj.ExecuteAffrows(); + var resultAsync = restFsql.Update().SetSourceIgnore(updateModel, o => o == null) + .ExecuteAffrows(); + Assert.True(result > 0); + Assert.True(resultAsync > 0); + Assert.Equal( + @$"UPDATE ""QuestDb_Model_Test01"" SET ""Primarys"" = '{primary}', ""NameInsert"" = 'NameDefault', ""Activos"" = 12.65 +WHERE (""Id"" = '{primary}')", sql); + } + + [Fact] + public async Task TestRestUpdateIgnoreColumnsAsync() + { + var primary = Guid.NewGuid().ToString(); + var updateTime = DateTime.Now; + //先插入 + restFsql.Insert(new QuestDb_Model_Test01() + { + Primarys = primary, + CreateTime = DateTime.Now, + Activos = 100.21, + Id = primary, + IsCompra = true, + NameInsert = "NameInsert", + NameUpdate = "NameUpdate" + }).ExecuteAffrows(); + var updateModel = new QuestDb_Model_Test01 + { + Id = primary, + Activos = 12.65, + IsCompra = true, + CreateTime = DateTime.Now + }; + var updateObj = restFsql.Update().SetSource(updateModel) + .IgnoreColumns(q => new { q.Id, q.CreateTime }); + var sql = updateObj.ToSql(); + Debug.WriteLine(sql); + var result = updateObj.ExecuteAffrows(); + var resultAsync = await restFsql.Update().SetSource(updateModel) + .IgnoreColumns(q => new { q.Id, q.CreateTime }).ExecuteAffrowsAsync(); + Assert.True(result > 0); + Assert.True(resultAsync > 0); + Assert.Equal( + $@"UPDATE ""QuestDb_Model_Test01"" SET ""Primarys"" = NULL, ""NameUpdate"" = NULL, ""NameInsert"" = 'NameDefault', ""Activos"" = 12.65, ""UpdateTime"" = NULL, ""IsCompra"" = True +WHERE (""Id"" = '{primary}')", sql); + } + + [Fact] + public async Task TestUpdateToUpdateAsync() + { + //官网demo有问题,暂时放弃此功能 + var result = await restFsql.Select().Where(q => q.Id == "IdAsync" && q.NameInsert == null) + .ToUpdate() + .Set(q => q.UpdateTime, DateTime.Now) + .ExecuteAffrowsAsync(); + } } } \ No newline at end of file diff --git a/FreeSql.Tests/FreeSql.Tests/QuestDb/QuestDbTest.cs b/FreeSql.Tests/FreeSql.Tests/QuestDb/QuestDbTest.cs index 59441ad2..69072f41 100644 --- a/FreeSql.Tests/FreeSql.Tests/QuestDb/QuestDbTest.cs +++ b/FreeSql.Tests/FreeSql.Tests/QuestDb/QuestDbTest.cs @@ -15,5 +15,12 @@ namespace FreeSql.Tests.QuestDb .UseMonitorCommand(cmd => Debug.WriteLine($"Sql:{cmd.CommandText}")) //监听SQL语句 .UseNoneCommandParameter(true) .Build(); + + public static IFreeSql restFsql = new FreeSql.FreeSqlBuilder() + .UseConnectionString(FreeSql.DataType.QuestDb, + @"host=192.168.0.36;port=8812;username=admin;password=quest;database=qdb;ServerCompatibilityMode=NoTypeLoading;") + .UseMonitorCommand(cmd => Debug.WriteLine($"Sql:{cmd.CommandText}")) //监听SQL语句 + .UseQuestDbRestAPI("192.168.0.36:9001", "admin", "ushahL(aer2r") + .Build(); } } \ No newline at end of file diff --git a/Providers/FreeSql.Provider.QuestDb/Curd/QuestDbInsert.cs b/Providers/FreeSql.Provider.QuestDb/Curd/QuestDbInsert.cs index e95508c7..d47224c0 100644 --- a/Providers/FreeSql.Provider.QuestDb/Curd/QuestDbInsert.cs +++ b/Providers/FreeSql.Provider.QuestDb/Curd/QuestDbInsert.cs @@ -1,6 +1,9 @@ using FreeSql.Internal; using FreeSql.Internal.Model; +using NetTopologySuite.Mathematics; +using Newtonsoft.Json; using System; +using System.Collections; using System.Collections.Generic; using System.Data; using System.Data.Common; @@ -11,7 +14,6 @@ using System.Threading.Tasks; namespace FreeSql.QuestDb.Curd { - class QuestDbInsert : Internal.CommonProvider.InsertProvider where T1 : class { public QuestDbInsert(IFreeSql orm, CommonUtils commonUtils, CommonExpression commonExpression) @@ -30,9 +32,42 @@ namespace FreeSql.QuestDb.Curd internal Dictionary InternalIgnore => _ignore; internal void InternalClearData() => ClearData(); - public override int ExecuteAffrows() => base.SplitExecuteAffrows(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000); - public override long ExecuteIdentity() => base.SplitExecuteIdentity(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000); - public override List ExecuteInserted() => base.SplitExecuteInserted(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000); + private int InternelExecuteAffrows() + { + //如果设置了RestAPI的Url则走HTTP + var sql = ToSql(); + var execAsync = RestAPIExtension.ExecAsync(sql).GetAwaiter().GetResult(); + var resultHash = new Hashtable(); + try + { + resultHash = JsonConvert.DeserializeObject(execAsync); + } + catch + { + if (execAsync.Contains("401")) + { + throw new Exception("请确认QuestDb设置的RestAPI账号是否正确."); + } + } + var ddl = resultHash["ddl"]?.ToString(); + return ddl?.ToLower() == "ok" ? 1 : 0; + } + + public override int ExecuteAffrows() + { + if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl)) + { + return base.SplitExecuteAffrows(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, + _batchParameterLimit > 0 ? _batchParameterLimit : 3000); + } + return InternelExecuteAffrows(); + } + + public override long ExecuteIdentity() => base.SplitExecuteIdentity( + _batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000); + + public override List ExecuteInserted() => base.SplitExecuteInserted( + _batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000); protected override long RawExecuteIdentity() { @@ -50,7 +85,8 @@ namespace FreeSql.QuestDb.Curd _orm.Aop.CurdBeforeHandler?.Invoke(this, before); try { - ret = _orm.Ado.ExecuteNonQuery(_connection, _transaction, CommandType.Text, sql, _commandTimeout, _params); + ret = _orm.Ado.ExecuteNonQuery(_connection, _transaction, CommandType.Text, sql, _commandTimeout, + _params); } catch (Exception ex) { @@ -62,14 +98,18 @@ namespace FreeSql.QuestDb.Curd var after = new Aop.CurdAfterEventArgs(before, exception, ret); _orm.Aop.CurdAfterHandler?.Invoke(this, after); } + return 0; } + sql = string.Concat(sql, " RETURNING ", _commonUtils.QuoteSqlName(identCols.First().Value.Attribute.Name)); before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, sql, _params); _orm.Aop.CurdBeforeHandler?.Invoke(this, before); try { - long.TryParse(string.Concat(_orm.Ado.ExecuteScalar(_connection, _transaction, CommandType.Text, sql, _commandTimeout, _params)), out ret); + long.TryParse( + string.Concat(_orm.Ado.ExecuteScalar(_connection, _transaction, CommandType.Text, sql, + _commandTimeout, _params)), out ret); } catch (Exception ex) { @@ -81,6 +121,7 @@ namespace FreeSql.QuestDb.Curd var after = new Aop.CurdAfterEventArgs(before, exception, ret); _orm.Aop.CurdAfterHandler?.Invoke(this, after); } + return ret; } @@ -96,9 +137,11 @@ namespace FreeSql.QuestDb.Curd foreach (var col in _table.Columns.Values) { if (colidx > 0) sb.Append(", "); - sb.Append(_commonUtils.RereadColumn(col, _commonUtils.QuoteSqlName(col.Attribute.Name))).Append(" as ").Append(_commonUtils.QuoteSqlName(col.CsName)); + sb.Append(_commonUtils.RereadColumn(col, _commonUtils.QuoteSqlName(col.Attribute.Name))).Append(" as ") + .Append(_commonUtils.QuoteSqlName(col.CsName)); ++colidx; } + sql = sb.ToString(); var before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, sql, _params); _orm.Aop.CurdBeforeHandler?.Invoke(this, before); @@ -106,7 +149,8 @@ namespace FreeSql.QuestDb.Curd Exception exception = null; try { - ret = _orm.Ado.Query(_table.TypeLazy ?? _table.Type, _connection, _transaction, CommandType.Text, sql, _commandTimeout, _params); + ret = _orm.Ado.Query(_table.TypeLazy ?? _table.Type, _connection, _transaction, CommandType.Text, + sql, _commandTimeout, _params); } catch (Exception ex) { @@ -118,15 +162,31 @@ namespace FreeSql.QuestDb.Curd var after = new Aop.CurdAfterEventArgs(before, exception, ret); _orm.Aop.CurdAfterHandler?.Invoke(this, after); } + return ret; } #if net40 #else - public override Task ExecuteAffrowsAsync(CancellationToken cancellationToken = default) => base.SplitExecuteAffrowsAsync(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); - public override Task ExecuteIdentityAsync(CancellationToken cancellationToken = default) => base.SplitExecuteIdentityAsync(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); - public override Task> ExecuteInsertedAsync(CancellationToken cancellationToken = default) => base.SplitExecuteInsertedAsync(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); + public override Task ExecuteAffrowsAsync(CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl)) + { + return base.SplitExecuteAffrowsAsync(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, + _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); + } + return Task.FromResult(InternelExecuteAffrows()); + } + + public override Task ExecuteIdentityAsync(CancellationToken cancellationToken = default) => + base.SplitExecuteIdentityAsync(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, + _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); + + public override Task> ExecuteInsertedAsync(CancellationToken cancellationToken = default) => + base.SplitExecuteInsertedAsync(_batchValuesLimit > 0 ? _batchValuesLimit : 5000, + _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); + async protected override Task RawExecuteIdentityAsync(CancellationToken cancellationToken = default) { var sql = this.ToSql(); @@ -143,7 +203,8 @@ namespace FreeSql.QuestDb.Curd _orm.Aop.CurdBeforeHandler?.Invoke(this, before); try { - ret = await _orm.Ado.ExecuteNonQueryAsync(_connection, _transaction, CommandType.Text, sql, _commandTimeout, _params, cancellationToken); + ret = await _orm.Ado.ExecuteNonQueryAsync(_connection, _transaction, CommandType.Text, sql, + _commandTimeout, _params, cancellationToken); } catch (Exception ex) { @@ -155,14 +216,18 @@ namespace FreeSql.QuestDb.Curd var after = new Aop.CurdAfterEventArgs(before, exception, ret); _orm.Aop.CurdAfterHandler?.Invoke(this, after); } + return 0; } + sql = string.Concat(sql, " RETURNING ", _commonUtils.QuoteSqlName(identCols.First().Value.Attribute.Name)); before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, sql, _params); _orm.Aop.CurdBeforeHandler?.Invoke(this, before); try { - long.TryParse(string.Concat(await _orm.Ado.ExecuteScalarAsync(_connection, _transaction, CommandType.Text, sql, _commandTimeout, _params, cancellationToken)), out ret); + long.TryParse( + string.Concat(await _orm.Ado.ExecuteScalarAsync(_connection, _transaction, CommandType.Text, sql, + _commandTimeout, _params, cancellationToken)), out ret); } catch (Exception ex) { @@ -174,8 +239,10 @@ namespace FreeSql.QuestDb.Curd var after = new Aop.CurdAfterEventArgs(before, exception, ret); _orm.Aop.CurdAfterHandler?.Invoke(this, after); } + return ret; } + async protected override Task> RawExecuteInsertedAsync(CancellationToken cancellationToken = default) { var sql = this.ToSql(); @@ -188,9 +255,11 @@ namespace FreeSql.QuestDb.Curd foreach (var col in _table.Columns.Values) { if (colidx > 0) sb.Append(", "); - sb.Append(_commonUtils.RereadColumn(col, _commonUtils.QuoteSqlName(col.Attribute.Name))).Append(" as ").Append(_commonUtils.QuoteSqlName(col.CsName)); + sb.Append(_commonUtils.RereadColumn(col, _commonUtils.QuoteSqlName(col.Attribute.Name))).Append(" as ") + .Append(_commonUtils.QuoteSqlName(col.CsName)); ++colidx; } + sql = sb.ToString(); var before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Insert, sql, _params); _orm.Aop.CurdBeforeHandler?.Invoke(this, before); @@ -198,7 +267,8 @@ namespace FreeSql.QuestDb.Curd Exception exception = null; try { - ret = await _orm.Ado.QueryAsync(_table.TypeLazy ?? _table.Type, _connection, _transaction, CommandType.Text, sql, _commandTimeout, _params, cancellationToken); + ret = await _orm.Ado.QueryAsync(_table.TypeLazy ?? _table.Type, _connection, _transaction, + CommandType.Text, sql, _commandTimeout, _params, cancellationToken); } catch (Exception ex) { @@ -210,8 +280,9 @@ namespace FreeSql.QuestDb.Curd var after = new Aop.CurdAfterEventArgs(before, exception, ret); _orm.Aop.CurdAfterHandler?.Invoke(this, after); } + return ret; } #endif } -} +} \ No newline at end of file diff --git a/Providers/FreeSql.Provider.QuestDb/Curd/QuestDbUpdate.cs b/Providers/FreeSql.Provider.QuestDb/Curd/QuestDbUpdate.cs index 262ee7a5..66a56a6b 100644 --- a/Providers/FreeSql.Provider.QuestDb/Curd/QuestDbUpdate.cs +++ b/Providers/FreeSql.Provider.QuestDb/Curd/QuestDbUpdate.cs @@ -1,6 +1,8 @@ using FreeSql.Internal; using FreeSql.Internal.Model; +using Newtonsoft.Json; using System; +using System.Collections; using System.Collections.Generic; using System.Data; using System.Data.Common; @@ -11,10 +13,8 @@ using System.Threading.Tasks; namespace FreeSql.QuestDb.Curd { - class QuestDbUpdate : Internal.CommonProvider.UpdateProvider { - public QuestDbUpdate(IFreeSql orm, CommonUtils commonUtils, CommonExpression commonExpression, object dywhere) : base(orm, commonUtils, commonExpression, dywhere) { @@ -25,11 +25,46 @@ namespace FreeSql.QuestDb.Curd internal StringBuilder InternalSbSetIncr => _setIncr; internal Dictionary InternalIgnore => _ignore; internal void InternalResetSource(List source) => _source = source; - internal string InternalWhereCaseSource(string CsName, Func thenValue) => WhereCaseSource(CsName, thenValue); + + internal string InternalWhereCaseSource(string CsName, Func thenValue) => + WhereCaseSource(CsName, thenValue); + internal void InternalToSqlCaseWhenEnd(StringBuilder sb, ColumnInfo col) => ToSqlCaseWhenEnd(sb, col); - public override int ExecuteAffrows() => base.SplitExecuteAffrows(_batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000); - public override List ExecuteUpdated() => base.SplitExecuteUpdated(_batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000); + private int InternelExecuteAffrows() + { + //如果设置了RestAPI的Url则走HTTP + var sql = ToSql(); + var execAsync = RestAPIExtension.ExecAsync(sql).GetAwaiter().GetResult(); + var resultHash = new Hashtable(); + try + { + resultHash = JsonConvert.DeserializeObject(execAsync); + } + catch + { + if (execAsync.Contains("401")) + { + throw new Exception("请确认QuestDb设置的RestAPI账号是否正确."); + } + } + var ddl = resultHash["ddl"]?.ToString(); + var updated = Convert.ToInt32(resultHash["updated"]); + return ddl?.ToLower() == "ok" ? updated : 0; + } + + public override int ExecuteAffrows() + { + if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl)) + { + return base.SplitExecuteAffrows(_batchRowsLimit > 0 ? _batchRowsLimit : 500, + _batchParameterLimit > 0 ? _batchParameterLimit : 3000); + } + return InternelExecuteAffrows(); + } + + public override List ExecuteUpdated() => base.SplitExecuteUpdated( + _batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000); protected override List RawExecuteUpdated() { @@ -48,10 +83,12 @@ namespace FreeSql.QuestDb.Curd foreach (var col in _table.Columns.Values) { if (colidx > 0) sbret.Append(", "); - sbret.Append(_commonUtils.RereadColumn(col, _commonUtils.QuoteSqlName(col.Attribute.Name))).Append(" as ").Append(_commonUtils.QuoteSqlName(col.CsName)); + sbret.Append(_commonUtils.RereadColumn(col, _commonUtils.QuoteSqlName(col.Attribute.Name))) + .Append(" as ").Append(_commonUtils.QuoteSqlName(col.CsName)); ++colidx; } } + var sql = sb.Append(sbret).ToString(); var before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Update, sql, dbParms); _orm.Aop.CurdBeforeHandler?.Invoke(this, before); @@ -59,7 +96,8 @@ namespace FreeSql.QuestDb.Curd Exception exception = null; try { - var rettmp = _orm.Ado.Query(_table.TypeLazy ?? _table.Type, _connection, _transaction, CommandType.Text, sql, _commandTimeout, dbParms); + var rettmp = _orm.Ado.Query(_table.TypeLazy ?? _table.Type, _connection, _transaction, + CommandType.Text, sql, _commandTimeout, dbParms); ValidateVersionAndThrow(rettmp.Count, sql, dbParms); ret.AddRange(rettmp); } @@ -87,15 +125,18 @@ namespace FreeSql.QuestDb.Curd caseWhen.Append(_commonUtils.RereadColumn(pk, _commonUtils.QuoteSqlName(pk.Attribute.Name))); return; } + caseWhen.Append("("); var pkidx = 0; foreach (var pk in primarys) { if (pkidx > 0) caseWhen.Append(" || '+' || "); if (string.IsNullOrEmpty(InternalTableAlias) == false) caseWhen.Append(InternalTableAlias).Append("."); - caseWhen.Append(_commonUtils.RereadColumn(pk, _commonUtils.QuoteSqlName(pk.Attribute.Name))).Append("::text"); + caseWhen.Append(_commonUtils.RereadColumn(pk, _commonUtils.QuoteSqlName(pk.Attribute.Name))) + .Append("::text"); ++pkidx; } + caseWhen.Append(")"); } @@ -106,6 +147,7 @@ namespace FreeSql.QuestDb.Curd sb.Append(_commonUtils.FormatSql("{0}", primarys[0].GetDbValue(d))); return; } + sb.Append("("); var pkidx = 0; foreach (var pk in primarys) @@ -114,6 +156,7 @@ namespace FreeSql.QuestDb.Curd sb.Append(_commonUtils.FormatSql("{0}", pk.GetDbValue(d))).Append("::text"); ++pkidx; } + sb.Append(")"); } @@ -125,6 +168,7 @@ namespace FreeSql.QuestDb.Curd sb.Append("::text"); return; } + var dbtype = _commonUtils.CodeFirst.GetDbInfo(col.Attribute.MapType)?.dbtype; if (dbtype == null) return; @@ -133,9 +177,22 @@ namespace FreeSql.QuestDb.Curd #if net40 #else - public override Task ExecuteAffrowsAsync(CancellationToken cancellationToken = default) => base.SplitExecuteAffrowsAsync(_batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); - public override Task> ExecuteUpdatedAsync(CancellationToken cancellationToken = default) => base.SplitExecuteUpdatedAsync(_batchRowsLimit > 0 ? _batchRowsLimit : 500, _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); - + public override Task ExecuteAffrowsAsync(CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl)) + { + return base.SplitExecuteAffrowsAsync(_batchRowsLimit > 0 ? _batchRowsLimit : 500, + _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); + } + + return Task.FromResult(InternelExecuteAffrows()); + } + + + public override Task> ExecuteUpdatedAsync(CancellationToken cancellationToken = default) => + base.SplitExecuteUpdatedAsync(_batchRowsLimit > 0 ? _batchRowsLimit : 500, + _batchParameterLimit > 0 ? _batchParameterLimit : 3000, cancellationToken); + async protected override Task> RawExecuteUpdatedAsync(CancellationToken cancellationToken = default) { var ret = new List(); @@ -153,10 +210,12 @@ namespace FreeSql.QuestDb.Curd foreach (var col in _table.Columns.Values) { if (colidx > 0) sbret.Append(", "); - sbret.Append(_commonUtils.RereadColumn(col, _commonUtils.QuoteSqlName(col.Attribute.Name))).Append(" as ").Append(_commonUtils.QuoteSqlName(col.CsName)); + sbret.Append(_commonUtils.RereadColumn(col, _commonUtils.QuoteSqlName(col.Attribute.Name))) + .Append(" as ").Append(_commonUtils.QuoteSqlName(col.CsName)); ++colidx; } } + var sql = sb.Append(sbret).ToString(); var before = new Aop.CurdBeforeEventArgs(_table.Type, _table, Aop.CurdType.Update, sql, dbParms); _orm.Aop.CurdBeforeHandler?.Invoke(this, before); @@ -164,7 +223,8 @@ namespace FreeSql.QuestDb.Curd Exception exception = null; try { - var rettmp = await _orm.Ado.QueryAsync(_table.TypeLazy ?? _table.Type, _connection, _transaction, CommandType.Text, sql, _commandTimeout, dbParms, cancellationToken); + var rettmp = await _orm.Ado.QueryAsync(_table.TypeLazy ?? _table.Type, _connection, + _transaction, CommandType.Text, sql, _commandTimeout, dbParms, cancellationToken); ValidateVersionAndThrow(rettmp.Count, sql, dbParms); ret.AddRange(rettmp); } @@ -184,4 +244,4 @@ namespace FreeSql.QuestDb.Curd } #endif } -} +} \ No newline at end of file diff --git a/Providers/FreeSql.Provider.QuestDb/FreeSql.Provider.QuestDb.csproj b/Providers/FreeSql.Provider.QuestDb/FreeSql.Provider.QuestDb.csproj index bdad827e..4ac9f525 100644 --- a/Providers/FreeSql.Provider.QuestDb/FreeSql.Provider.QuestDb.csproj +++ b/Providers/FreeSql.Provider.QuestDb/FreeSql.Provider.QuestDb.csproj @@ -4,29 +4,31 @@ netstandard2.0; true YeXiangQin;Daily - FreeSql适配QuestDb时序数据库访问 + FreeSql实现QuestDb时序数据库访问 https://github.com/2881099/FreeSql https://github.com/2881099/FreeSql git MIT FreeSql;ORM;QuestDb $(AssemblyName) - logo.png + $(AssemblyName) true true true key.snk false - 3.2.688-preview20230216 + 4.0.0.1 - + + + @@ -41,7 +43,7 @@ - + diff --git a/Providers/FreeSql.Provider.QuestDb/QuestDbContainer.cs b/Providers/FreeSql.Provider.QuestDb/QuestDbContainer.cs new file mode 100644 index 00000000..003c0b3b --- /dev/null +++ b/Providers/FreeSql.Provider.QuestDb/QuestDbContainer.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; + +namespace FreeSql.Provider.QuestDb +{ + internal class QuestDbContainer + { + private static IServiceCollection Services; + public static IServiceProvider ServiceProvider { get; private set; } + + internal static void Initialize(Action service) + { + Services = new ServiceCollection(); + service?.Invoke(Services); + ServiceProvider = Services.BuildServiceProvider(); + } + + internal static T GetService() + { + return ServiceProvider.GetService(); + } + } +} \ No newline at end of file diff --git a/Providers/FreeSql.Provider.QuestDb/QuestDbDbFirst.cs b/Providers/FreeSql.Provider.QuestDb/QuestDbDbFirst.cs index 8c558462..425c8f09 100644 --- a/Providers/FreeSql.Provider.QuestDb/QuestDbDbFirst.cs +++ b/Providers/FreeSql.Provider.QuestDb/QuestDbDbFirst.cs @@ -717,10 +717,33 @@ namespace FreeSql.QuestDb return res; } - public DbTableInfo GetTableByName(string name, bool ignoreCase = true) => - GetTables(null, name, ignoreCase)?.FirstOrDefault(); + public DbTableInfo GetTableByName(string name, bool ignoreCase = true) + { + var tableColumns = _orm.Ado.ExecuteDataTable($"SHOW COLUMNS FROM '{name}'"); + List dbColumnInfos = new List(); + var dbTableInfo = new DbTableInfo() + { + Name = name, + Columns = new List() + }; + foreach (DataRow tableColumnsRow in tableColumns.Rows) + { + dbColumnInfos.Add(new DbColumnInfo() + { + Name = tableColumnsRow["column"].ToString(), + DbTypeText = tableColumnsRow["type"].ToString(), + Table = dbTableInfo, + }); + } - public List GetTablesByDatabase(params string[] database) => GetTables(database, null, false); + dbTableInfo.Columns = dbColumnInfos; + return dbTableInfo; + } + + public List GetTablesByDatabase(params string[] database) + { + return GetTables(database, null, false); + } public List GetTables(string[] database, string tablename, bool ignoreCase) { diff --git a/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs b/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs index 8330c4f8..f53eb886 100644 --- a/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs +++ b/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs @@ -1,253 +1,340 @@ -using FreeSql; +using CsvHelper; +using FreeSql; using FreeSql.Internal.CommonProvider; using FreeSql.Internal.Model; using FreeSql.QuestDb; using FreeSql.QuestDb.Curd; +using Newtonsoft.Json; using Npgsql; using System; +using System.Collections; using System.Collections.Generic; using System.Data; +using System.Globalization; +using System.IO; using System.Linq; using System.Linq.Expressions; +using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; +using System.Web; +using FreeSql.Provider.QuestDb; +using System.Net; public static partial class QuestDbGlobalExtensions { - /// /// 特殊处理类似 string.Format 的使用方法,防止注入,以及 IS NULL 转换 /// /// /// /// - public static string FormatPostgreSQL(this string that, params object[] args) => _postgresqlAdo.Addslashes(that, args); + public static string FormatPostgreSQL(this string that, params object[] args) => + _postgresqlAdo.Addslashes(that, args); + static QuestDbAdo _postgresqlAdo = new QuestDbAdo(); + /// - /// PostgreSQL9.5+ 特有的功能,On Conflict Do Update - /// 注意:此功能会开启插入【自增列】 + /// 逐行读取,包含空行 + /// + /// + /// + private static List SplitByLine(string text) + { + List lines = new List(); + byte[] array = Encoding.UTF8.GetBytes(text); + using (MemoryStream stream = new MemoryStream(array)) + { + using (var sr = new StreamReader(stream)) + { + string line = sr.ReadLine(); + while (line != null) + { + lines.Add(line); + line = sr.ReadLine(); + } + + ; + } + } + + return lines; + } + + /// + /// 批量快速插入 + /// + /// + /// + /// + public static async Task ExecuteBulkCopyAsync(this IInsert that) where T : class + { + var result = 0; + var fileName = $"{Guid.NewGuid()}.csv"; + var filePath = Path.Combine(AppContext.BaseDirectory, fileName); + try + { + var client = QuestDbContainer.GetService().CreateClient(); + var boundary = "---------------" + DateTime.Now.Ticks.ToString("x"); + var name = typeof(T).Name; + var list = new List(); + var insert = that as QuestDbInsert; + insert.InternalOrm.DbFirst.GetTableByName(name).Columns.ForEach(d => + { + if (d.DbTypeText == "TIMESTAMP") + { + list.Add(new Hashtable() + { + { "name", d.Name }, + { "type", d.DbTypeText }, + { "pattern", "yyyy/M/dd HH:mm:ss" } + }); + } + else + { + list.Add(new Hashtable() + { + { "name", d.Name }, + { "type", d.DbTypeText } + }); + } + }); + var schema = JsonConvert.SerializeObject(list); + using (var writer = new StreamWriter(filePath)) + using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture)) + { + csv.WriteRecords(insert._source); + } + + var httpContent = new MultipartFormDataContent(boundary); + if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization)) + client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization); + httpContent.Add(new StringContent(schema), "schema"); + httpContent.Add(new ByteArrayContent(File.ReadAllBytes(filePath)), "data"); + //boundary带双引号 可能导致服务器错误情况 + httpContent.Headers.Remove("Content-Type"); + httpContent.Headers.TryAddWithoutValidation("Content-Type", + "multipart/form-data; boundary=" + boundary); + var httpResponseMessage = + await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent); + var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync(); + var splitByLine = SplitByLine(readAsStringAsync); + Console.WriteLine(readAsStringAsync); + foreach (var s in splitByLine) + { + if (s.Contains("Rows")) + { + var strings = s.Split('|'); + if (strings[1].Trim() == "Rows imported") + { + result = Convert.ToInt32(strings[2].Trim()); + } + } + } + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + finally + { + try + { + File.Delete(filePath); + } + catch + { + } + } + + return result; + } + + /// + /// 批量快速插入 + /// + /// + /// + /// + public static int ExecuteBulkCopy(this IInsert insert) where T : class + { + return ExecuteBulkCopyAsync(insert).GetAwaiter().GetResult(); + } +} + +public static class SampleByExtension +{ + //是否使用该方法 + internal static AsyncLocal IsExistence = new AsyncLocal() + { + Value = false + }; + + internal static AsyncLocal SamoleByString = new AsyncLocal() + { + Value = string.Empty + }; + + internal static void Initialize() + { + IsExistence.Value = false; + SamoleByString.Value = string.Empty; + } + + /// + /// SAMPLE BY用于时间序列数据,将大型数据集汇总为同质时间块的聚合,作为SELECT语句的一部分。对缺少数据的数据集执行查询的用户可以使用FILL关键字指定填充行为 + /// + /// + /// + /// 时长 + /// 单位 + /// + public static ISelect SampleBy(this ISelect select, double time, SampleUnits unit) + { + var _unit = Enum.GetName(typeof(SampleUnits), unit); + IsExistence.Value = true; + var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}}{Environment.NewLine}"; + SamoleByString.Value = string.Format(samoleByTemple, time.ToString(), _unit); + return select; + } +} + +public static class LatestOnExtension +{ + //是否使用该方法 + internal static AsyncLocal IsExistence = new AsyncLocal() + { + Value = false + }; + + internal static AsyncLocal LatestOnString = new AsyncLocal() + { + Value = string.Empty + }; + + internal static void Initialize() + { + IsExistence.Value = false; + LatestOnString.Value = string.Empty; + } + + /// + /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// /// - /// - /// 默认是以主键作为重复判断,也可以指定其他列:a => a.Name | a => new{a.Name,a.Time} | a => new[]{"name","time"} + /// + /// + /// 时间标识 + /// 最新项的列 /// - public static OnConflictDoUpdate OnConflictDoUpdate(this IInsert that, Expression> columns = null) where T1 : class => new OnConflictDoUpdate(that.InsertIdentity(), columns); - - #region ExecutePgCopy - /// - /// 批量更新(更新字段数量超过 2000 时收益大) - /// 实现原理:使用 PgCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 - /// - /// - /// - /// - public static int ExecutePgCopy(this IUpdate that) where T : class + public static ISelect LatestOn(this ISelect select, Expression> timestamp, + Expression> partition) { - var update = that as UpdateProvider; - if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; - var state = ExecutePgCopyState(update); - return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecutePgCopy()); + Provider(timestamp, partition); + return select; } - static NativeTuple ExecutePgCopyState(UpdateProvider update) where T : class - { - if (update._source.Any() != true) return null; - var _table = update._table; - var _commonUtils = update._commonUtils; - var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName; - var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}"; - if (update._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); - if (update._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); - if (update._connection == null && update._orm.Ado.TransactionCurrentThread != null) - update.WithTransaction(update._orm.Ado.TransactionCurrentThread); - var sb = new StringBuilder().Append("CREATE TEMP TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); - var setColumns = new List(); - var pkColumns = new List(); - foreach (var col in _table.Columns.Values) - { - if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name); - else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name); - else continue; - sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", "")); - sb.Append(","); - } - var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) WITH (OIDS=FALSE);").ToString(); - sb.Clear().Append("UPDATE ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") - .Append("\r\nSET \r\n ").Append(string.Join(", \r\n ", setColumns.Select(col => $"{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))) - .Append("\r\nFROM ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b ") - .Append("\r\nWHERE ").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); - var sql2 = sb.ToString(); - sb.Clear(); - var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}"; - return NativeTuple.Create(sql1, sql2, sql3, tempTableName, pkColumns.Concat(setColumns).ToArray()); + private static void Provider(Expression> timestamp, + Expression> partition) + { + IsExistence.Value = true; + var latestOnTemple = $"{Environment.NewLine}LATEST ON {{0}} PARTITION BY {{1}} "; + var expressionVisitor = new QuestDbExpressionVisitor(); + expressionVisitor.Visit(timestamp); + var _timestamp = expressionVisitor.Fields(); + expressionVisitor.Visit(partition); + var _partition = expressionVisitor.Fields(); + LatestOnString.Value = string.Format(latestOnTemple, _timestamp, _partition); } /// - /// PostgreSQL COPY 批量导入功能,封装了 NpgsqlConnection.BeginBinaryImport 方法 - /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 - /// 使用 WithConnection/WithTransaction 传入连接/事务对象 - /// 提示:若本方法不能满足,请使用 IInsert<T>.ToDataTable 方法得到 DataTable 对象后,自行处理。 - /// COPY 与 insert into t values(..),(..),(..) 性能测试参考: - /// 插入180000行,52列:10,090ms 与 46,756ms,10列:4,081ms 与 9,786ms - /// 插入10000行,52列:583ms 与 3,294ms,10列:167ms 与 568ms - /// 插入5000行,52列:337ms 与 2,269ms,10列:93ms 与 366ms - /// 插入2000行,52列:136ms 与 1,019ms,10列:39ms 与 157ms - /// 插入1000行,52列:88ms 与 374ms,10列:21ms 与 102ms - /// 插入500行,52列:61ms 与 209ms,10列:12ms 与 34ms - /// 插入100行,52列:30ms 与 51ms,10列:4ms 与 9ms - /// 插入50行,52列:25ms 与 37ms,10列:2ms 与 6ms + /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// /// - /// - public static void ExecutePgCopy(this IInsert that) where T : class + /// + /// + /// 时间标识 + /// 最新项的列 + /// + public static ISelect LatestOn(this ISelect select, + Expression> timestamp, + Expression> partition) where T2 : class { - var insert = that as QuestDbInsert; - if (insert == null) throw new Exception(CoreStrings.S_Features_Unique("ExecutePgCopy", "PostgreSQL")); - - var dt = that.ToDataTable(); - if (dt.Rows.Count == 0) return; - - Action binaryImport = conn => - { - var copyFromCommand = new StringBuilder().Append("COPY ").Append(insert.InternalCommonUtils.QuoteSqlName(dt.TableName)).Append("("); - var colIndex = 0; - foreach (DataColumn col in dt.Columns) - { - if (colIndex++ > 0) copyFromCommand.Append(", "); - copyFromCommand.Append(insert.InternalCommonUtils.QuoteSqlName(col.ColumnName)); - } - copyFromCommand.Append(") FROM STDIN BINARY"); - using (var writer = conn.BeginBinaryImport(copyFromCommand.ToString())) - { - foreach (DataRow item in dt.Rows) - writer.WriteRow(item.ItemArray); - writer.Complete(); - } - copyFromCommand.Clear(); - }; - - try - { - if (insert.InternalConnection == null && insert.InternalTransaction == null) - { - using (var conn = insert.InternalOrm.Ado.MasterPool.Get()) - { - binaryImport(conn.Value as NpgsqlConnection); - } - } - else if (insert.InternalTransaction != null) - { - binaryImport(insert.InternalTransaction.Connection as NpgsqlConnection); - } - else if (insert.InternalConnection != null) - { - var conn = insert.InternalConnection as NpgsqlConnection; - var isNotOpen = false; - if (conn.State != System.Data.ConnectionState.Open) - { - isNotOpen = true; - conn.Open(); - } - try - { - binaryImport(conn); - } - finally - { - if (isNotOpen) - conn.Close(); - } - } - else - { - throw new NotImplementedException($"ExecutePgCopy {CoreStrings.S_Not_Implemented_FeedBack}"); - } - } - finally - { - dt.Clear(); - } + Provider(timestamp, partition); + return select; } -#if net45 -#else - public static Task ExecutePgCopyAsync(this IUpdate that, CancellationToken cancellationToken = default) where T : class + /// + /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 + /// + /// + /// + /// + /// 时间标识 + /// 最新项的列 + /// + public static ISelect LatestOn(this ISelect select, + Expression> timestamp, + Expression> partition) where T2 : class where T3 : class { - var update = that as UpdateProvider; - if (update._source.Any() != true || update._tempPrimarys.Any() == false) return Task.FromResult(0); - var state = ExecutePgCopyState(update); - return UpdateProvider.ExecuteBulkUpdateAsync(update, state, insert => insert.ExecutePgCopyAsync(cancellationToken)); + Provider(timestamp, partition); + return select; } - async public static Task ExecutePgCopyAsync(this IInsert that, CancellationToken cancellationToken = default) where T : class + + /// + /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 + /// + /// + /// + /// + /// 时间标识 + /// 最新项的列 + /// + public static ISelect LatestOn(this ISelect select, + Expression> timestamp, + Expression> partition) where T2 : class where T3 : class where T4 : class { - var insert = that as QuestDbInsert; - if (insert == null) throw new Exception(CoreStrings.S_Features_Unique("ExecutePgCopyAsync", "PostgreSQL")); - - var dt = that.ToDataTable(); - if (dt.Rows.Count == 0) return; - Func binaryImportAsync = async conn => - { - var copyFromCommand = new StringBuilder().Append("COPY ").Append(insert.InternalCommonUtils.QuoteSqlName(dt.TableName)).Append("("); - var colIndex = 0; - foreach (DataColumn col in dt.Columns) - { - if (colIndex++ > 0) copyFromCommand.Append(", "); - copyFromCommand.Append(insert.InternalCommonUtils.QuoteSqlName(col.ColumnName)); - } - copyFromCommand.Append(") FROM STDIN BINARY"); - using (var writer = conn.BeginBinaryImport(copyFromCommand.ToString())) - { - foreach (DataRow item in dt.Rows) - await writer.WriteRowAsync(cancellationToken, item.ItemArray); - writer.Complete(); - } - copyFromCommand.Clear(); - }; - - try - { - if (insert.InternalConnection == null && insert.InternalTransaction == null) - { - using (var conn = await insert.InternalOrm.Ado.MasterPool.GetAsync()) - { - await binaryImportAsync(conn.Value as NpgsqlConnection); - } - } - else if (insert.InternalTransaction != null) - { - await binaryImportAsync(insert.InternalTransaction.Connection as NpgsqlConnection); - } - else if (insert.InternalConnection != null) - { - var conn = insert.InternalConnection as NpgsqlConnection; - var isNotOpen = false; - if (conn.State != System.Data.ConnectionState.Open) - { - isNotOpen = true; - await conn.OpenAsync(cancellationToken); - } - try - { - await binaryImportAsync(conn); - } - finally - { - if (isNotOpen) - await conn.CloseAsync(); - } - } - else - { - throw new NotImplementedException($"ExecutePgCopyAsync {CoreStrings.S_Not_Implemented_FeedBack}"); - } - } - finally - { - dt.Clear(); - } + Provider(timestamp, partition); + return select; } -#endif - #endregion } + +public static class RestAPIExtension +{ + internal static string BaseUrl = string.Empty; + internal static string authorization = string.Empty; + + internal static async Task ExecAsync(string sql) + { + var result = string.Empty; + var client = QuestDbContainer.GetService().CreateClient(); + var url = $"{BaseUrl}/exec?query={HttpUtility.UrlEncode(sql)}"; + if (!string.IsNullOrWhiteSpace(authorization)) + client.DefaultRequestHeaders.Add("Authorization", authorization); + var httpResponseMessage = await client.GetAsync(url); + result = await httpResponseMessage.Content.ReadAsStringAsync(); + + return result; + } + + public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder buider, string host, string username = "", + string password = "") + { + BaseUrl = host; + if (BaseUrl.EndsWith("/")) + BaseUrl = BaseUrl.Remove(BaseUrl.Length - 1); + + if (!BaseUrl.ToLower().StartsWith("http")) + BaseUrl = $"http://{BaseUrl}"; + if (!string.IsNullOrWhiteSpace(username) && !string.IsNullOrWhiteSpace(password)) + { + var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")); + authorization = $"Basic {base64}"; + } + buider.UseNoneCommandParameter(true); + return buider; + } +} \ No newline at end of file diff --git a/Providers/FreeSql.Provider.QuestDb/QuestDbProvider.cs b/Providers/FreeSql.Provider.QuestDb/QuestDbProvider.cs index e31caf4a..50f1c9a6 100644 --- a/Providers/FreeSql.Provider.QuestDb/QuestDbProvider.cs +++ b/Providers/FreeSql.Provider.QuestDb/QuestDbProvider.cs @@ -1,6 +1,7 @@ using FreeSql.Internal; using FreeSql.Internal.CommonProvider; using FreeSql.PostgreSQL.Curd; +using FreeSql.Provider.QuestDb; using FreeSql.QuestDb.Curd; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -16,6 +17,7 @@ using System.Net.NetworkInformation; using System.Numerics; using System.Reflection; using System.Threading; +using Microsoft.Extensions.DependencyInjection; namespace FreeSql.QuestDb { @@ -134,6 +136,11 @@ namespace FreeSql.QuestDb Select0Provider._dicMethodDataReaderGetValue[typeof(Guid)] = typeof(DbDataReader).GetMethod("GetGuid", new Type[] { typeof(int) }); + + QuestDbContainer.Initialize(service => + { + service.AddHttpClient(); + }); } public override ISelect CreateSelectProvider(object dywhere) => diff --git a/Providers/FreeSql.Provider.QuestDb/QuestDbUtils.cs b/Providers/FreeSql.Provider.QuestDb/QuestDbUtils.cs index f898bf64..d957a940 100644 --- a/Providers/FreeSql.Provider.QuestDb/QuestDbUtils.cs +++ b/Providers/FreeSql.Provider.QuestDb/QuestDbUtils.cs @@ -14,10 +14,16 @@ using System.Globalization; using System.Linq; using System.Linq.Expressions; using System.Net; +using System.Net.Http; using System.Numerics; using System.Text; using System.Threading; using FreeSql.QuestDb; +using System.Web; +using System.Threading.Tasks; +using Newtonsoft.Json; +using System.IO; +using CsvHelper; namespace FreeSql.QuestDb { @@ -382,141 +388,4 @@ namespace FreeSql /// M } - - public static class SampleByExtension - { - //是否使用该方法 - internal static AsyncLocal IsExistence = new AsyncLocal() - { - Value = false - }; - - internal static AsyncLocal SamoleByString = new AsyncLocal() - { - Value = string.Empty - }; - - public static void Initialize() - { - IsExistence.Value = false; - SamoleByString.Value = string.Empty; - } - - /// - /// SAMPLE BY用于时间序列数据,将大型数据集汇总为同质时间块的聚合,作为SELECT语句的一部分。对缺少数据的数据集执行查询的用户可以使用FILL关键字指定填充行为 - /// - /// - /// - /// 时长 - /// 单位 - /// - public static ISelect SampleBy(this ISelect select, double time, SampleUnits unit) - { - var _unit = Enum.GetName(typeof(SampleUnits), unit); - IsExistence.Value = true; - var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}}{Environment.NewLine}"; - SamoleByString.Value = string.Format(samoleByTemple, time.ToString(), _unit); - return select; - } - } - - public static class LatestOnExtension - { - //是否使用该方法 - internal static AsyncLocal IsExistence = new AsyncLocal() - { - Value = false - }; - - internal static AsyncLocal LatestOnString = new AsyncLocal() - { - Value = string.Empty - }; - - public static void Initialize() - { - IsExistence.Value = false; - LatestOnString.Value = string.Empty; - } - - /// - /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 - /// - /// - /// - /// - /// 时间标识 - /// 最新项的列 - /// - public static ISelect LatestOn(this ISelect select, Expression> timestamp, - Expression> partition) - { - Provider(timestamp, partition); - return select; - } - - private static void Provider(Expression> timestamp, - Expression> partition) - { - IsExistence.Value = true; - var latestOnTemple = $"{Environment.NewLine}LATEST ON {{0}} PARTITION BY {{1}} "; - var expressionVisitor = new QuestDbExpressionVisitor(); - expressionVisitor.Visit(timestamp); - var _timestamp = expressionVisitor.Fields(); - expressionVisitor.Visit(partition); - var _partition = expressionVisitor.Fields(); - LatestOnString.Value = string.Format(latestOnTemple, _timestamp, _partition); - } - - /// - /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 - /// - /// - /// - /// - /// 时间标识 - /// 最新项的列 - /// - public static ISelect LatestOn(this ISelect select, - Expression> timestamp, - Expression> partition) where T2 : class - { - Provider(timestamp, partition); - return select; - } - - /// - /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 - /// - /// - /// - /// - /// 时间标识 - /// 最新项的列 - /// - public static ISelect LatestOn(this ISelect select, - Expression> timestamp, - Expression> partition) where T2 : class where T3 : class - { - Provider(timestamp, partition); - return select; - } - - /// - /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 - /// - /// - /// - /// - /// 时间标识 - /// 最新项的列 - /// - public static ISelect LatestOn(this ISelect select, - Expression> timestamp, - Expression> partition) where T2 : class where T3 : class where T4 : class - { - Provider(timestamp, partition); - return select; - } - } } \ No newline at end of file