using CsvHelper; using FreeSql; using FreeSql.Internal.CommonProvider; using FreeSql.Internal.Model; using FreeSql.QuestDb; using FreeSql.QuestDb.Curd; using Newtonsoft.Json; using Npgsql; using System; using System.Collections; using System.Collections.Generic; using System.Data; using System.Globalization; using System.IO; using System.Linq; using System.Linq.Expressions; using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Web; using FreeSql.Provider.QuestDb; using System.Net; public static partial class QuestDbGlobalExtensions { /// /// 特殊处理类似 string.Format 的使用方法,防止注入,以及 IS NULL 转换 /// /// /// /// public static string FormatQuestDb(this string that, params object[] args) => _questDbAdo.Addslashes(that, args); private static readonly QuestDbAdo _questDbAdo = new QuestDbAdo(); public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder build, string host, string username = "", string password = "") => RestAPIExtension.UseQuestDbRestAPI(build, host, username, password); /// /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// /// /// /// /// 时间标识 /// 最新项的列 /// public static ISelect LatestOn(this ISelect select, Expression> timestamp, Expression> partition) { LatestOnExtension.InternelImpl(timestamp, partition); return select; } /// /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// /// /// /// /// 时间标识 /// 最新项的列 /// public static ISelect LatestOn(this ISelect select, Expression> timestamp, Expression> partition) where T2 : class { LatestOnExtension.InternelImpl(timestamp, partition); return select; } /// /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// /// /// /// /// 时间标识 /// 最新项的列 /// public static ISelect LatestOn(this ISelect select, Expression> timestamp, Expression> partition) where T2 : class where T3 : class { LatestOnExtension.InternelImpl(timestamp, partition); return select; } /// /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// /// /// /// /// 时间标识 /// 最新项的列 /// public static ISelect LatestOn(this ISelect select, Expression> timestamp, Expression> partition) where T2 : class where T3 : class where T4 : class { LatestOnExtension.InternelImpl(timestamp, partition); return select; } /// /// SAMPLE BY用于时间序列数据,将大型数据集汇总为同质时间块的聚合,作为SELECT语句的一部分。对缺少数据的数据集执行查询的用户可以使用FILL关键字指定填充行为 /// /// /// /// 时长 /// 单位 /// 对准日历 /// public static ISelect SampleBy(this ISelect select, double time, SampleUnit unit, bool alignToCalendar = false) { SampleByExtension.IsExistence.Value = true; var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}} {{2}}"; string alignToCalendarTemple = ""; if (alignToCalendar) alignToCalendarTemple = "ALIGN TO CALENDAR "; SampleByExtension.SamoleByString.Value = string.Format(samoleByTemple, time.ToString(), (char)unit, alignToCalendarTemple); return select; } /// /// 逐行读取,包含空行 /// /// /// private static List SplitByLine(string text) { List lines = new List(); byte[] array = Encoding.UTF8.GetBytes(text); using (MemoryStream stream = new MemoryStream(array)) { using (var sr = new StreamReader(stream)) { string line = sr.ReadLine(); while (line != null) { lines.Add(line); line = sr.ReadLine(); } } } return lines; } /// /// 批量快速插入 /// /// /// /// 导入时,时间格式 默认:yyyy/M/d H:mm:ss /// public static async Task ExecuteQuestDbBulkCopyAsync(this IInsert that, string dateFormat = "yyyy/M/d H:mm:ss") where T : class { //思路:通过提供的RestAPI imp,实现快速复制 if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl)) { throw new Exception( "BulkCopy功能需要启用RestAPI,启用方式:new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")"); } var result = 0; try { var client = QuestDbContainer.GetService().CreateClient(); var boundary = "---------------" + DateTime.Now.Ticks.ToString("x"); var list = new List(); var insert = that as QuestDbInsert; var name = insert.InternalTableRuleInvoke(); //获取表名 insert.InternalOrm.DbFirst.GetTableByName(name).Columns.ForEach(d => { if (d.DbTypeText == "TIMESTAMP") { list.Add(new Hashtable() { { "name", d.Name }, { "type", d.DbTypeText }, { "pattern", dateFormat } }); } else { list.Add(new Hashtable() { { "name", d.Name }, { "type", d.DbTypeText } }); } }); var schema = JsonConvert.SerializeObject(list); using (MemoryStream stream = new MemoryStream()) { //写入CSV文件 using (var writer = new StreamWriter(stream)) using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture)) { await csv.WriteRecordsAsync(insert._source); } var httpContent = new MultipartFormDataContent(boundary); if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization)) client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization); httpContent.Add(new StringContent(schema), "schema"); httpContent.Add(new ByteArrayContent(stream.ToArray()), "data"); //boundary带双引号 可能导致服务器错误情况 httpContent.Headers.Remove("Content-Type"); httpContent.Headers.TryAddWithoutValidation("Content-Type", "multipart/form-data; boundary=" + boundary); var httpResponseMessage = await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent); var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync(); var splitByLine = SplitByLine(readAsStringAsync); foreach (var s in splitByLine) { if (s.Contains("Rows")) { var strings = s.Split('|'); if (strings[1].Trim() == "Rows imported") { result = Convert.ToInt32(strings[2].Trim()); } } } } } catch (Exception e) { throw e; } return result; } /// /// 批量快速插入 /// /// /// /// 导入时,时间格式 默认:yyyy/M/d H:mm:ss /// public static int ExecuteQuestDbBulkCopy(this IInsert insert, string dateFormat = "yyyy/M/d H:mm:ss") where T : class { return ExecuteQuestDbBulkCopyAsync(insert, dateFormat).ConfigureAwait(false).GetAwaiter().GetResult(); } } static class SampleByExtension { //是否使用该方法 internal static AsyncLocal IsExistence = new AsyncLocal() { Value = false }; internal static AsyncLocal SamoleByString = new AsyncLocal() { Value = string.Empty }; internal static void Initialize() { IsExistence.Value = false; SamoleByString.Value = string.Empty; } } static class LatestOnExtension { //是否使用该方法 internal static AsyncLocal IsExistence = new AsyncLocal() { Value = false }; internal static AsyncLocal LatestOnString = new AsyncLocal() { Value = string.Empty }; internal static void Initialize() { IsExistence.Value = false; LatestOnString.Value = string.Empty; } internal static void InternelImpl(Expression> timestamp, Expression> partition) { IsExistence.Value = true; var latestOnTemple = $"{Environment.NewLine}LATEST ON {{0}} PARTITION BY {{1}} "; var expressionVisitor = new QuestDbExpressionVisitor(); expressionVisitor.Visit(timestamp); var _timestamp = expressionVisitor.Fields(); expressionVisitor.Visit(partition); var _partition = expressionVisitor.Fields(); LatestOnString.Value = string.Format(latestOnTemple, _timestamp, _partition); } } static class RestAPIExtension { internal static string BaseUrl = string.Empty; internal static string authorization = string.Empty; internal static async Task ExecAsync(string sql) { //HTTP GET 执行SQL var result = string.Empty; var client = QuestDbContainer.GetService().CreateClient(); var url = $"{BaseUrl}/exec?query={HttpUtility.UrlEncode(sql)}"; if (!string.IsNullOrWhiteSpace(authorization)) client.DefaultRequestHeaders.Add("Authorization", authorization); var httpResponseMessage = await client.GetAsync(url); result = await httpResponseMessage.Content.ReadAsStringAsync(); return result; } internal static FreeSqlBuilder UseQuestDbRestAPI(FreeSqlBuilder buider, string host, string username = "", string password = "") { BaseUrl = host; if (BaseUrl.EndsWith("/")) BaseUrl = BaseUrl.Remove(BaseUrl.Length - 1); if (!BaseUrl.ToLower().StartsWith("http")) BaseUrl = $"http://{BaseUrl}"; //生成TOKEN if (!string.IsNullOrWhiteSpace(username) && !string.IsNullOrWhiteSpace(password)) { var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")); authorization = $"Basic {base64}"; } //RestApi需要无参数 buider.UseNoneCommandParameter(true); return buider; } }