From 5fc7cba0d307b1a1636858462250f8266ba0b3ac Mon Sep 17 00:00:00 2001 From: dailyccc <963922242@qq.com> Date: Tue, 21 Feb 2023 20:28:21 +0800 Subject: [PATCH] =?UTF-8?q?-=E5=A2=9E=E5=8A=A0RestAPI=20ExecuteBulkCopy?= =?UTF-8?q?=E9=80=82=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../QuestDbGlobalExtensions.cs | 505 ++++++++++-------- 1 file changed, 296 insertions(+), 209 deletions(-) diff --git a/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs b/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs index 8330c4f8..f53eb886 100644 --- a/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs +++ b/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs @@ -1,253 +1,340 @@ -using FreeSql; +using CsvHelper; +using FreeSql; using FreeSql.Internal.CommonProvider; using FreeSql.Internal.Model; using FreeSql.QuestDb; using FreeSql.QuestDb.Curd; +using Newtonsoft.Json; using Npgsql; using System; +using System.Collections; using System.Collections.Generic; using System.Data; +using System.Globalization; +using System.IO; using System.Linq; using System.Linq.Expressions; +using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; +using System.Web; +using FreeSql.Provider.QuestDb; +using System.Net; public static partial class QuestDbGlobalExtensions { - /// /// 特殊处理类似 string.Format 的使用方法,防止注入,以及 IS NULL 转换 /// /// /// /// - public static string FormatPostgreSQL(this string that, params object[] args) => _postgresqlAdo.Addslashes(that, args); + public static string FormatPostgreSQL(this string that, params object[] args) => + _postgresqlAdo.Addslashes(that, args); + static QuestDbAdo _postgresqlAdo = new QuestDbAdo(); + /// - /// PostgreSQL9.5+ 特有的功能,On Conflict Do Update - /// 注意:此功能会开启插入【自增列】 + /// 逐行读取,包含空行 + /// + /// + /// + private static List SplitByLine(string text) + { + List lines = new List(); + byte[] array = Encoding.UTF8.GetBytes(text); + using (MemoryStream stream = new MemoryStream(array)) + { + using (var sr = new StreamReader(stream)) + { + string line = sr.ReadLine(); + while (line != null) + { + lines.Add(line); + line = sr.ReadLine(); + } + + ; + } + } + + return lines; + } + + /// + /// 批量快速插入 + /// + /// + /// + /// + public static async Task ExecuteBulkCopyAsync(this IInsert that) where T : class + { + var result = 0; + var fileName = $"{Guid.NewGuid()}.csv"; + var filePath = Path.Combine(AppContext.BaseDirectory, fileName); + try + { + var client = QuestDbContainer.GetService().CreateClient(); + var boundary = "---------------" + DateTime.Now.Ticks.ToString("x"); + var name = typeof(T).Name; + var list = new List(); + var insert = that as QuestDbInsert; + insert.InternalOrm.DbFirst.GetTableByName(name).Columns.ForEach(d => + { + if (d.DbTypeText == "TIMESTAMP") + { + list.Add(new Hashtable() + { + { "name", d.Name }, + { "type", d.DbTypeText }, + { "pattern", "yyyy/M/dd HH:mm:ss" } + }); + } + else + { + list.Add(new Hashtable() + { + { "name", d.Name }, + { "type", d.DbTypeText } + }); + } + }); + var schema = JsonConvert.SerializeObject(list); + using (var writer = new StreamWriter(filePath)) + using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture)) + { + csv.WriteRecords(insert._source); + } + + var httpContent = new MultipartFormDataContent(boundary); + if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization)) + client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization); + httpContent.Add(new StringContent(schema), "schema"); + httpContent.Add(new ByteArrayContent(File.ReadAllBytes(filePath)), "data"); + //boundary带双引号 可能导致服务器错误情况 + httpContent.Headers.Remove("Content-Type"); + httpContent.Headers.TryAddWithoutValidation("Content-Type", + "multipart/form-data; boundary=" + boundary); + var httpResponseMessage = + await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent); + var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync(); + var splitByLine = SplitByLine(readAsStringAsync); + Console.WriteLine(readAsStringAsync); + foreach (var s in splitByLine) + { + if (s.Contains("Rows")) + { + var strings = s.Split('|'); + if (strings[1].Trim() == "Rows imported") + { + result = Convert.ToInt32(strings[2].Trim()); + } + } + } + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + finally + { + try + { + File.Delete(filePath); + } + catch + { + } + } + + return result; + } + + /// + /// 批量快速插入 + /// + /// + /// + /// + public static int ExecuteBulkCopy(this IInsert insert) where T : class + { + return ExecuteBulkCopyAsync(insert).GetAwaiter().GetResult(); + } +} + +public static class SampleByExtension +{ + //是否使用该方法 + internal static AsyncLocal IsExistence = new AsyncLocal() + { + Value = false + }; + + internal static AsyncLocal SamoleByString = new AsyncLocal() + { + Value = string.Empty + }; + + internal static void Initialize() + { + IsExistence.Value = false; + SamoleByString.Value = string.Empty; + } + + /// + /// SAMPLE BY用于时间序列数据,将大型数据集汇总为同质时间块的聚合,作为SELECT语句的一部分。对缺少数据的数据集执行查询的用户可以使用FILL关键字指定填充行为 + /// + /// + /// + /// 时长 + /// 单位 + /// + public static ISelect SampleBy(this ISelect select, double time, SampleUnits unit) + { + var _unit = Enum.GetName(typeof(SampleUnits), unit); + IsExistence.Value = true; + var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}}{Environment.NewLine}"; + SamoleByString.Value = string.Format(samoleByTemple, time.ToString(), _unit); + return select; + } +} + +public static class LatestOnExtension +{ + //是否使用该方法 + internal static AsyncLocal IsExistence = new AsyncLocal() + { + Value = false + }; + + internal static AsyncLocal LatestOnString = new AsyncLocal() + { + Value = string.Empty + }; + + internal static void Initialize() + { + IsExistence.Value = false; + LatestOnString.Value = string.Empty; + } + + /// + /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// /// - /// - /// 默认是以主键作为重复判断,也可以指定其他列:a => a.Name | a => new{a.Name,a.Time} | a => new[]{"name","time"} + /// + /// + /// 时间标识 + /// 最新项的列 /// - public static OnConflictDoUpdate OnConflictDoUpdate(this IInsert that, Expression> columns = null) where T1 : class => new OnConflictDoUpdate(that.InsertIdentity(), columns); - - #region ExecutePgCopy - /// - /// 批量更新(更新字段数量超过 2000 时收益大) - /// 实现原理:使用 PgCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 - /// - /// - /// - /// - public static int ExecutePgCopy(this IUpdate that) where T : class + public static ISelect LatestOn(this ISelect select, Expression> timestamp, + Expression> partition) { - var update = that as UpdateProvider; - if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; - var state = ExecutePgCopyState(update); - return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecutePgCopy()); + Provider(timestamp, partition); + return select; } - static NativeTuple ExecutePgCopyState(UpdateProvider update) where T : class - { - if (update._source.Any() != true) return null; - var _table = update._table; - var _commonUtils = update._commonUtils; - var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName; - var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}"; - if (update._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); - if (update._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); - if (update._connection == null && update._orm.Ado.TransactionCurrentThread != null) - update.WithTransaction(update._orm.Ado.TransactionCurrentThread); - var sb = new StringBuilder().Append("CREATE TEMP TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); - var setColumns = new List(); - var pkColumns = new List(); - foreach (var col in _table.Columns.Values) - { - if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name); - else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name); - else continue; - sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", "")); - sb.Append(","); - } - var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) WITH (OIDS=FALSE);").ToString(); - sb.Clear().Append("UPDATE ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") - .Append("\r\nSET \r\n ").Append(string.Join(", \r\n ", setColumns.Select(col => $"{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))) - .Append("\r\nFROM ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b ") - .Append("\r\nWHERE ").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); - var sql2 = sb.ToString(); - sb.Clear(); - var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}"; - return NativeTuple.Create(sql1, sql2, sql3, tempTableName, pkColumns.Concat(setColumns).ToArray()); + private static void Provider(Expression> timestamp, + Expression> partition) + { + IsExistence.Value = true; + var latestOnTemple = $"{Environment.NewLine}LATEST ON {{0}} PARTITION BY {{1}} "; + var expressionVisitor = new QuestDbExpressionVisitor(); + expressionVisitor.Visit(timestamp); + var _timestamp = expressionVisitor.Fields(); + expressionVisitor.Visit(partition); + var _partition = expressionVisitor.Fields(); + LatestOnString.Value = string.Format(latestOnTemple, _timestamp, _partition); } /// - /// PostgreSQL COPY 批量导入功能,封装了 NpgsqlConnection.BeginBinaryImport 方法 - /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列 - /// 使用 WithConnection/WithTransaction 传入连接/事务对象 - /// 提示:若本方法不能满足,请使用 IInsert<T>.ToDataTable 方法得到 DataTable 对象后,自行处理。 - /// COPY 与 insert into t values(..),(..),(..) 性能测试参考: - /// 插入180000行,52列:10,090ms 与 46,756ms,10列:4,081ms 与 9,786ms - /// 插入10000行,52列:583ms 与 3,294ms,10列:167ms 与 568ms - /// 插入5000行,52列:337ms 与 2,269ms,10列:93ms 与 366ms - /// 插入2000行,52列:136ms 与 1,019ms,10列:39ms 与 157ms - /// 插入1000行,52列:88ms 与 374ms,10列:21ms 与 102ms - /// 插入500行,52列:61ms 与 209ms,10列:12ms 与 34ms - /// 插入100行,52列:30ms 与 51ms,10列:4ms 与 9ms - /// 插入50行,52列:25ms 与 37ms,10列:2ms 与 6ms + /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// /// - /// - public static void ExecutePgCopy(this IInsert that) where T : class + /// + /// + /// 时间标识 + /// 最新项的列 + /// + public static ISelect LatestOn(this ISelect select, + Expression> timestamp, + Expression> partition) where T2 : class { - var insert = that as QuestDbInsert; - if (insert == null) throw new Exception(CoreStrings.S_Features_Unique("ExecutePgCopy", "PostgreSQL")); - - var dt = that.ToDataTable(); - if (dt.Rows.Count == 0) return; - - Action binaryImport = conn => - { - var copyFromCommand = new StringBuilder().Append("COPY ").Append(insert.InternalCommonUtils.QuoteSqlName(dt.TableName)).Append("("); - var colIndex = 0; - foreach (DataColumn col in dt.Columns) - { - if (colIndex++ > 0) copyFromCommand.Append(", "); - copyFromCommand.Append(insert.InternalCommonUtils.QuoteSqlName(col.ColumnName)); - } - copyFromCommand.Append(") FROM STDIN BINARY"); - using (var writer = conn.BeginBinaryImport(copyFromCommand.ToString())) - { - foreach (DataRow item in dt.Rows) - writer.WriteRow(item.ItemArray); - writer.Complete(); - } - copyFromCommand.Clear(); - }; - - try - { - if (insert.InternalConnection == null && insert.InternalTransaction == null) - { - using (var conn = insert.InternalOrm.Ado.MasterPool.Get()) - { - binaryImport(conn.Value as NpgsqlConnection); - } - } - else if (insert.InternalTransaction != null) - { - binaryImport(insert.InternalTransaction.Connection as NpgsqlConnection); - } - else if (insert.InternalConnection != null) - { - var conn = insert.InternalConnection as NpgsqlConnection; - var isNotOpen = false; - if (conn.State != System.Data.ConnectionState.Open) - { - isNotOpen = true; - conn.Open(); - } - try - { - binaryImport(conn); - } - finally - { - if (isNotOpen) - conn.Close(); - } - } - else - { - throw new NotImplementedException($"ExecutePgCopy {CoreStrings.S_Not_Implemented_FeedBack}"); - } - } - finally - { - dt.Clear(); - } + Provider(timestamp, partition); + return select; } -#if net45 -#else - public static Task ExecutePgCopyAsync(this IUpdate that, CancellationToken cancellationToken = default) where T : class + /// + /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 + /// + /// + /// + /// + /// 时间标识 + /// 最新项的列 + /// + public static ISelect LatestOn(this ISelect select, + Expression> timestamp, + Expression> partition) where T2 : class where T3 : class { - var update = that as UpdateProvider; - if (update._source.Any() != true || update._tempPrimarys.Any() == false) return Task.FromResult(0); - var state = ExecutePgCopyState(update); - return UpdateProvider.ExecuteBulkUpdateAsync(update, state, insert => insert.ExecutePgCopyAsync(cancellationToken)); + Provider(timestamp, partition); + return select; } - async public static Task ExecutePgCopyAsync(this IInsert that, CancellationToken cancellationToken = default) where T : class + + /// + /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 + /// + /// + /// + /// + /// 时间标识 + /// 最新项的列 + /// + public static ISelect LatestOn(this ISelect select, + Expression> timestamp, + Expression> partition) where T2 : class where T3 : class where T4 : class { - var insert = that as QuestDbInsert; - if (insert == null) throw new Exception(CoreStrings.S_Features_Unique("ExecutePgCopyAsync", "PostgreSQL")); - - var dt = that.ToDataTable(); - if (dt.Rows.Count == 0) return; - Func binaryImportAsync = async conn => - { - var copyFromCommand = new StringBuilder().Append("COPY ").Append(insert.InternalCommonUtils.QuoteSqlName(dt.TableName)).Append("("); - var colIndex = 0; - foreach (DataColumn col in dt.Columns) - { - if (colIndex++ > 0) copyFromCommand.Append(", "); - copyFromCommand.Append(insert.InternalCommonUtils.QuoteSqlName(col.ColumnName)); - } - copyFromCommand.Append(") FROM STDIN BINARY"); - using (var writer = conn.BeginBinaryImport(copyFromCommand.ToString())) - { - foreach (DataRow item in dt.Rows) - await writer.WriteRowAsync(cancellationToken, item.ItemArray); - writer.Complete(); - } - copyFromCommand.Clear(); - }; - - try - { - if (insert.InternalConnection == null && insert.InternalTransaction == null) - { - using (var conn = await insert.InternalOrm.Ado.MasterPool.GetAsync()) - { - await binaryImportAsync(conn.Value as NpgsqlConnection); - } - } - else if (insert.InternalTransaction != null) - { - await binaryImportAsync(insert.InternalTransaction.Connection as NpgsqlConnection); - } - else if (insert.InternalConnection != null) - { - var conn = insert.InternalConnection as NpgsqlConnection; - var isNotOpen = false; - if (conn.State != System.Data.ConnectionState.Open) - { - isNotOpen = true; - await conn.OpenAsync(cancellationToken); - } - try - { - await binaryImportAsync(conn); - } - finally - { - if (isNotOpen) - await conn.CloseAsync(); - } - } - else - { - throw new NotImplementedException($"ExecutePgCopyAsync {CoreStrings.S_Not_Implemented_FeedBack}"); - } - } - finally - { - dt.Clear(); - } + Provider(timestamp, partition); + return select; } -#endif - #endregion } + +public static class RestAPIExtension +{ + internal static string BaseUrl = string.Empty; + internal static string authorization = string.Empty; + + internal static async Task ExecAsync(string sql) + { + var result = string.Empty; + var client = QuestDbContainer.GetService().CreateClient(); + var url = $"{BaseUrl}/exec?query={HttpUtility.UrlEncode(sql)}"; + if (!string.IsNullOrWhiteSpace(authorization)) + client.DefaultRequestHeaders.Add("Authorization", authorization); + var httpResponseMessage = await client.GetAsync(url); + result = await httpResponseMessage.Content.ReadAsStringAsync(); + + return result; + } + + public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder buider, string host, string username = "", + string password = "") + { + BaseUrl = host; + if (BaseUrl.EndsWith("/")) + BaseUrl = BaseUrl.Remove(BaseUrl.Length - 1); + + if (!BaseUrl.ToLower().StartsWith("http")) + BaseUrl = $"http://{BaseUrl}"; + if (!string.IsNullOrWhiteSpace(username) && !string.IsNullOrWhiteSpace(password)) + { + var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")); + authorization = $"Basic {base64}"; + } + buider.UseNoneCommandParameter(true); + return buider; + } +} \ No newline at end of file