using CsvHelper;
using FreeSql;
using FreeSql.Internal.CommonProvider;
using FreeSql.Internal.Model;
using FreeSql.QuestDb;
using FreeSql.QuestDb.Curd;
using Newtonsoft.Json;
using Npgsql;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Web;
using FreeSql.Provider.QuestDb;
using System.Net;
public static partial class QuestDbGlobalExtensions
{
///
/// 特殊处理类似 string.Format 的使用方法,防止注入,以及 IS NULL 转换
///
///
///
///
public static string FormatQuestDb(this string that, params object[] args) =>
_QuestDbAdo.Addslashes(that, args);
static QuestDbAdo _QuestDbAdo = new QuestDbAdo();
public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder buider, string host, string username = "",
string password = "") => RestAPIExtension.UseQuestDbRestAPI(buider, host, username, password);
///
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
///
///
///
///
/// 时间标识
/// 最新项的列
///
public static ISelect LatestOn(this ISelect select, Expression> timestamp,
Expression> partition)
{
LatestOnExtension.InternelImpl(timestamp, partition);
return select;
}
///
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
///
///
///
///
/// 时间标识
/// 最新项的列
///
public static ISelect LatestOn(this ISelect select,
Expression> timestamp,
Expression> partition) where T2 : class
{
LatestOnExtension.InternelImpl(timestamp, partition);
return select;
}
///
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
///
///
///
///
/// 时间标识
/// 最新项的列
///
public static ISelect LatestOn(this ISelect select,
Expression> timestamp,
Expression> partition) where T2 : class where T3 : class
{
LatestOnExtension.InternelImpl(timestamp, partition);
return select;
}
///
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
///
///
///
///
/// 时间标识
/// 最新项的列
///
public static ISelect LatestOn(this ISelect select,
Expression> timestamp,
Expression> partition) where T2 : class where T3 : class where T4 : class
{
LatestOnExtension.InternelImpl(timestamp, partition);
return select;
}
///
/// SAMPLE BY用于时间序列数据,将大型数据集汇总为同质时间块的聚合,作为SELECT语句的一部分。对缺少数据的数据集执行查询的用户可以使用FILL关键字指定填充行为
///
///
///
/// 时长
/// 单位
/// 对准日历
///
public static ISelect SampleBy(this ISelect select, double time, SampleUnits unit, bool alignToCalendar = false)
{
var _unit = Enum.GetName(typeof(SampleUnits), unit);
SampleByExtension.IsExistence.Value = true;
var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}} {{2}}";
string alignToCalendarTemple = "";
if (alignToCalendar)
alignToCalendarTemple = "ALIGN TO CALENDAR ";
SampleByExtension.SamoleByString.Value = string.Format(samoleByTemple, time.ToString(), _unit, alignToCalendarTemple);
return select;
}
///
/// 逐行读取,包含空行
///
///
///
private static List SplitByLine(string text)
{
List lines = new List();
byte[] array = Encoding.UTF8.GetBytes(text);
using (MemoryStream stream = new MemoryStream(array))
{
using (var sr = new StreamReader(stream))
{
string line = sr.ReadLine();
while (line != null)
{
lines.Add(line);
line = sr.ReadLine();
}
}
}
return lines;
}
///
/// 批量快速插入
///
///
///
///
public static async Task ExecuteBulkCopyAsync(this IInsert that) where T : class
{
//思路:通过提供的RestAPI imp,实现快速复制
if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl))
{
throw new Exception("BulkCopy功能需要启用RestAPI,启用方式:new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")");
}
var result = 0;
var fileName = $"{Guid.NewGuid()}.csv";
var filePath = Path.Combine(AppContext.BaseDirectory, fileName);
try
{
var client = QuestDbContainer.GetService().CreateClient();
var boundary = "---------------" + DateTime.Now.Ticks.ToString("x");
var name = typeof(T).Name;
var list = new List();
var insert = that as QuestDbInsert;
insert.InternalOrm.DbFirst.GetTableByName(name).Columns.ForEach(d =>
{
if (d.DbTypeText == "TIMESTAMP")
{
list.Add(new Hashtable()
{
{ "name", d.Name },
{ "type", d.DbTypeText },
{ "pattern", "yyyy/M/d H:mm:ss" }
});
}
else
{
list.Add(new Hashtable()
{
{ "name", d.Name },
{ "type", d.DbTypeText }
});
}
});
var schema = JsonConvert.SerializeObject(list);
//写入CSV文件
using (var writer = new StreamWriter(filePath))
using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture))
{
csv.WriteRecords(insert._source);
}
var httpContent = new MultipartFormDataContent(boundary);
if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization))
client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization);
httpContent.Add(new StringContent(schema), "schema");
httpContent.Add(new ByteArrayContent(File.ReadAllBytes(filePath)), "data");
//boundary带双引号 可能导致服务器错误情况
httpContent.Headers.Remove("Content-Type");
httpContent.Headers.TryAddWithoutValidation("Content-Type",
"multipart/form-data; boundary=" + boundary);
var httpResponseMessage =
await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent);
var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync();
var splitByLine = SplitByLine(readAsStringAsync);
Console.WriteLine(readAsStringAsync);
foreach (var s in splitByLine)
{
if (s.Contains("Rows"))
{
var strings = s.Split('|');
if (strings[1].Trim() == "Rows imported")
{
result = Convert.ToInt32(strings[2].Trim());
}
}
}
}
catch (Exception e)
{
throw e;
}
finally
{
try
{
File.Delete(filePath);
}
catch { }
}
return result;
}
///
/// 批量快速插入
///
///
///
///
public static int ExecuteBulkCopy(this IInsert insert) where T : class
{
return ExecuteBulkCopyAsync(insert).GetAwaiter().GetResult();
}
}
static class SampleByExtension
{
//是否使用该方法
internal static AsyncLocal IsExistence = new AsyncLocal()
{
Value = false
};
internal static AsyncLocal SamoleByString = new AsyncLocal()
{
Value = string.Empty
};
internal static void Initialize()
{
IsExistence.Value = false;
SamoleByString.Value = string.Empty;
}
}
static class LatestOnExtension
{
//是否使用该方法
internal static AsyncLocal IsExistence = new AsyncLocal()
{
Value = false
};
internal static AsyncLocal LatestOnString = new AsyncLocal()
{
Value = string.Empty
};
internal static void Initialize()
{
IsExistence.Value = false;
LatestOnString.Value = string.Empty;
}
internal static void InternelImpl(Expression> timestamp,
Expression> partition)
{
IsExistence.Value = true;
var latestOnTemple = $"{Environment.NewLine}LATEST ON {{0}} PARTITION BY {{1}} ";
var expressionVisitor = new QuestDbExpressionVisitor();
expressionVisitor.Visit(timestamp);
var _timestamp = expressionVisitor.Fields();
expressionVisitor.Visit(partition);
var _partition = expressionVisitor.Fields();
LatestOnString.Value = string.Format(latestOnTemple, _timestamp, _partition);
}
}
static class RestAPIExtension
{
internal static string BaseUrl = string.Empty;
internal static string authorization = string.Empty;
internal static async Task ExecAsync(string sql)
{
//HTTP GET 执行SQL
var result = string.Empty;
var client = QuestDbContainer.GetService().CreateClient();
var url = $"{BaseUrl}/exec?query={HttpUtility.UrlEncode(sql)}";
if (!string.IsNullOrWhiteSpace(authorization))
client.DefaultRequestHeaders.Add("Authorization", authorization);
var httpResponseMessage = await client.GetAsync(url);
result = await httpResponseMessage.Content.ReadAsStringAsync();
return result;
}
internal static FreeSqlBuilder UseQuestDbRestAPI(FreeSqlBuilder buider, string host, string username = "",
string password = "")
{
BaseUrl = host;
if (BaseUrl.EndsWith("/"))
BaseUrl = BaseUrl.Remove(BaseUrl.Length - 1);
if (!BaseUrl.ToLower().StartsWith("http"))
BaseUrl = $"http://{BaseUrl}";
//生成TOKEN
if (!string.IsNullOrWhiteSpace(username) && !string.IsNullOrWhiteSpace(password))
{
var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"));
authorization = $"Basic {base64}";
}
//RESTAPI需要无参数
buider.UseNoneCommandParameter(true);
return buider;
}
}