From 0ac81caaece9316fe0978e8403436f7de0fd5869 Mon Sep 17 00:00:00 2001 From: Daily <963922242@qq.com> Date: Sat, 23 Mar 2024 09:09:56 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96QuestDb=20BulkCopy=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E5=B9=B6=E9=87=8D=E5=91=BD=E5=90=8D=E4=B8=BA?= =?UTF-8?q?=EF=BC=9AExecuteQuestDbBulkCopy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../QuestDbGlobalExtensions.cs | 98 +++++++++---------- 1 file changed, 48 insertions(+), 50 deletions(-) diff --git a/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs b/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs index d47197c9..79d395bb 100644 --- a/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs +++ b/Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs @@ -31,12 +31,12 @@ public static partial class QuestDbGlobalExtensions /// /// public static string FormatQuestDb(this string that, params object[] args) => - _QuestDbAdo.Addslashes(that, args); + _questDbAdo.Addslashes(that, args); - static QuestDbAdo _QuestDbAdo = new QuestDbAdo(); + private static readonly QuestDbAdo _questDbAdo = new QuestDbAdo(); - public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder buider, string host, string username = "", - string password = "") => RestAPIExtension.UseQuestDbRestAPI(buider, host, username, password); + public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder build, string host, string username = "", + string password = "") => RestAPIExtension.UseQuestDbRestAPI(build, host, username, password); /// /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 @@ -53,6 +53,7 @@ public static partial class QuestDbGlobalExtensions LatestOnExtension.InternelImpl(timestamp, partition); return select; } + /// /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// @@ -113,13 +114,15 @@ public static partial class QuestDbGlobalExtensions /// 单位 /// 对准日历 /// - public static ISelect SampleBy(this ISelect select, double time, SampleUnit unit, bool alignToCalendar = false) + public static ISelect SampleBy(this ISelect select, double time, SampleUnit unit, + bool alignToCalendar = false) { SampleByExtension.IsExistence.Value = true; var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}} {{2}}"; string alignToCalendarTemple = ""; if (alignToCalendar) alignToCalendarTemple = "ALIGN TO CALENDAR "; - SampleByExtension.SamoleByString.Value = string.Format(samoleByTemple, time.ToString(), (char)unit, alignToCalendarTemple); + SampleByExtension.SamoleByString.Value = + string.Format(samoleByTemple, time.ToString(), (char)unit, alignToCalendarTemple); return select; } @@ -156,23 +159,25 @@ public static partial class QuestDbGlobalExtensions /// /// 导入时,时间格式 默认:yyyy/M/d H:mm:ss /// - public static async Task ExecuteBulkCopyAsync(this IInsert that,string dateFormat = "yyyy/M/d H:mm:ss") where T : class + public static async Task ExecuteQuestDbBulkCopyAsync(this IInsert that, + string dateFormat = "yyyy/M/d H:mm:ss") where T : class { //思路:通过提供的RestAPI imp,实现快速复制 if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl)) { - throw new Exception("BulkCopy功能需要启用RestAPI,启用方式:new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")"); + throw new Exception( + "BulkCopy功能需要启用RestAPI,启用方式:new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")"); } + var result = 0; - var fileName = $"{Guid.NewGuid()}.csv"; - var filePath = Path.Combine(AppContext.BaseDirectory, fileName); + try { var client = QuestDbContainer.GetService().CreateClient(); var boundary = "---------------" + DateTime.Now.Ticks.ToString("x"); var list = new List(); var insert = that as QuestDbInsert; - var name = insert.InternalTableRuleInvoke(); //获取表名 + var name = insert.InternalTableRuleInvoke(); //获取表名 insert.InternalOrm.DbFirst.GetTableByName(name).Columns.ForEach(d => { if (d.DbTypeText == "TIMESTAMP") @@ -181,7 +186,7 @@ public static partial class QuestDbGlobalExtensions { { "name", d.Name }, { "type", d.DbTypeText }, - { "pattern", dateFormat} + { "pattern", dateFormat } }); } else @@ -194,34 +199,37 @@ public static partial class QuestDbGlobalExtensions } }); var schema = JsonConvert.SerializeObject(list); - //写入CSV文件 - using (var writer = new StreamWriter(filePath)) - using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture)) + using (MemoryStream stream = new MemoryStream()) { - await csv.WriteRecordsAsync(insert._source); - } - - var httpContent = new MultipartFormDataContent(boundary); - if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization)) - client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization); - httpContent.Add(new StringContent(schema), "schema"); - httpContent.Add(new ByteArrayContent(File.ReadAllBytes(filePath)), "data"); - //boundary带双引号 可能导致服务器错误情况 - httpContent.Headers.Remove("Content-Type"); - httpContent.Headers.TryAddWithoutValidation("Content-Type", - "multipart/form-data; boundary=" + boundary); - var httpResponseMessage = - await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent); - var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync(); - var splitByLine = SplitByLine(readAsStringAsync); - foreach (var s in splitByLine) - { - if (s.Contains("Rows")) + //写入CSV文件 + using (var writer = new StreamWriter(stream)) + using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture)) { - var strings = s.Split('|'); - if (strings[1].Trim() == "Rows imported") + await csv.WriteRecordsAsync(insert._source); + } + + var httpContent = new MultipartFormDataContent(boundary); + if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization)) + client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization); + httpContent.Add(new StringContent(schema), "schema"); + httpContent.Add(new ByteArrayContent(stream.ToArray()), "data"); + //boundary带双引号 可能导致服务器错误情况 + httpContent.Headers.Remove("Content-Type"); + httpContent.Headers.TryAddWithoutValidation("Content-Type", + "multipart/form-data; boundary=" + boundary); + var httpResponseMessage = + await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent); + var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync(); + var splitByLine = SplitByLine(readAsStringAsync); + foreach (var s in splitByLine) + { + if (s.Contains("Rows")) { - result = Convert.ToInt32(strings[2].Trim()); + var strings = s.Split('|'); + if (strings[1].Trim() == "Rows imported") + { + result = Convert.ToInt32(strings[2].Trim()); + } } } } @@ -230,17 +238,6 @@ public static partial class QuestDbGlobalExtensions { throw e; } - finally - { - try - { - File.Delete(filePath); - } - catch - { - // ignored - } - } return result; } @@ -252,9 +249,9 @@ public static partial class QuestDbGlobalExtensions /// /// 导入时,时间格式 默认:yyyy/M/d H:mm:ss /// - public static int ExecuteBulkCopy(this IInsert insert,string dateFormat = "yyyy/M/d H:mm:ss") where T : class + public static int ExecuteBulkCopy(this IInsert insert, string dateFormat = "yyyy/M/d H:mm:ss") where T : class { - return ExecuteBulkCopyAsync(insert,dateFormat).ConfigureAwait(false).GetAwaiter().GetResult(); + return ExecuteQuestDbBulkCopyAsync(insert, dateFormat).ConfigureAwait(false).GetAwaiter().GetResult(); } } @@ -344,6 +341,7 @@ static class RestAPIExtension var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")); authorization = $"Basic {base64}"; } + //RestApi需要无参数 buider.UseNoneCommandParameter(true); return buider;