优化QuestDb BulkCopy功能,并重命名为:ExecuteQuestDbBulkCopy

This commit is contained in:
Daily 2024-03-23 09:09:56 +08:00
parent ce8d699424
commit 0ac81caaec

View File

@ -31,12 +31,12 @@ public static partial class QuestDbGlobalExtensions
/// <param name="args"></param> /// <param name="args"></param>
/// <returns></returns> /// <returns></returns>
public static string FormatQuestDb(this string that, params object[] args) => public static string FormatQuestDb(this string that, params object[] args) =>
_QuestDbAdo.Addslashes(that, args); _questDbAdo.Addslashes(that, args);
static QuestDbAdo _QuestDbAdo = new QuestDbAdo(); private static readonly QuestDbAdo _questDbAdo = new QuestDbAdo();
public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder buider, string host, string username = "", public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder build, string host, string username = "",
string password = "") => RestAPIExtension.UseQuestDbRestAPI(buider, host, username, password); string password = "") => RestAPIExtension.UseQuestDbRestAPI(build, host, username, password);
/// <summary> /// <summary>
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
@ -53,6 +53,7 @@ public static partial class QuestDbGlobalExtensions
LatestOnExtension.InternelImpl(timestamp, partition); LatestOnExtension.InternelImpl(timestamp, partition);
return select; return select;
} }
/// <summary> /// <summary>
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。 /// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
/// </summary> /// </summary>
@ -113,13 +114,15 @@ public static partial class QuestDbGlobalExtensions
/// <param name="unit">单位</param> /// <param name="unit">单位</param>
/// <param name="alignToCalendar">对准日历</param> /// <param name="alignToCalendar">对准日历</param>
/// <returns></returns> /// <returns></returns>
public static ISelect<T> SampleBy<T>(this ISelect<T> select, double time, SampleUnit unit, bool alignToCalendar = false) public static ISelect<T> SampleBy<T>(this ISelect<T> select, double time, SampleUnit unit,
bool alignToCalendar = false)
{ {
SampleByExtension.IsExistence.Value = true; SampleByExtension.IsExistence.Value = true;
var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}} {{2}}"; var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}} {{2}}";
string alignToCalendarTemple = ""; string alignToCalendarTemple = "";
if (alignToCalendar) alignToCalendarTemple = "ALIGN TO CALENDAR "; if (alignToCalendar) alignToCalendarTemple = "ALIGN TO CALENDAR ";
SampleByExtension.SamoleByString.Value = string.Format(samoleByTemple, time.ToString(), (char)unit, alignToCalendarTemple); SampleByExtension.SamoleByString.Value =
string.Format(samoleByTemple, time.ToString(), (char)unit, alignToCalendarTemple);
return select; return select;
} }
@ -156,23 +159,25 @@ public static partial class QuestDbGlobalExtensions
/// <param name="that"></param> /// <param name="that"></param>
/// <param name="dateFormat">导入时,时间格式 默认:yyyy/M/d H:mm:ss</param> /// <param name="dateFormat">导入时,时间格式 默认:yyyy/M/d H:mm:ss</param>
/// <returns></returns> /// <returns></returns>
public static async Task<int> ExecuteBulkCopyAsync<T>(this IInsert<T> that,string dateFormat = "yyyy/M/d H:mm:ss") where T : class public static async Task<int> ExecuteQuestDbBulkCopyAsync<T>(this IInsert<T> that,
string dateFormat = "yyyy/M/d H:mm:ss") where T : class
{ {
//思路通过提供的RestAPI imp实现快速复制 //思路通过提供的RestAPI imp实现快速复制
if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl)) if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl))
{ {
throw new Exception("BulkCopy功能需要启用RestAPI启用方式new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")"); throw new Exception(
"BulkCopy功能需要启用RestAPI启用方式new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")");
} }
var result = 0; var result = 0;
var fileName = $"{Guid.NewGuid()}.csv";
var filePath = Path.Combine(AppContext.BaseDirectory, fileName);
try try
{ {
var client = QuestDbContainer.GetService<IHttpClientFactory>().CreateClient(); var client = QuestDbContainer.GetService<IHttpClientFactory>().CreateClient();
var boundary = "---------------" + DateTime.Now.Ticks.ToString("x"); var boundary = "---------------" + DateTime.Now.Ticks.ToString("x");
var list = new List<Hashtable>(); var list = new List<Hashtable>();
var insert = that as QuestDbInsert<T>; var insert = that as QuestDbInsert<T>;
var name = insert.InternalTableRuleInvoke(); //获取表名 var name = insert.InternalTableRuleInvoke(); //获取表名
insert.InternalOrm.DbFirst.GetTableByName(name).Columns.ForEach(d => insert.InternalOrm.DbFirst.GetTableByName(name).Columns.ForEach(d =>
{ {
if (d.DbTypeText == "TIMESTAMP") if (d.DbTypeText == "TIMESTAMP")
@ -181,7 +186,7 @@ public static partial class QuestDbGlobalExtensions
{ {
{ "name", d.Name }, { "name", d.Name },
{ "type", d.DbTypeText }, { "type", d.DbTypeText },
{ "pattern", dateFormat} { "pattern", dateFormat }
}); });
} }
else else
@ -194,34 +199,37 @@ public static partial class QuestDbGlobalExtensions
} }
}); });
var schema = JsonConvert.SerializeObject(list); var schema = JsonConvert.SerializeObject(list);
//写入CSV文件 using (MemoryStream stream = new MemoryStream())
using (var writer = new StreamWriter(filePath))
using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture))
{ {
await csv.WriteRecordsAsync(insert._source); //写入CSV文件
} using (var writer = new StreamWriter(stream))
using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture))
var httpContent = new MultipartFormDataContent(boundary);
if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization))
client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization);
httpContent.Add(new StringContent(schema), "schema");
httpContent.Add(new ByteArrayContent(File.ReadAllBytes(filePath)), "data");
//boundary带双引号 可能导致服务器错误情况
httpContent.Headers.Remove("Content-Type");
httpContent.Headers.TryAddWithoutValidation("Content-Type",
"multipart/form-data; boundary=" + boundary);
var httpResponseMessage =
await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent);
var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync();
var splitByLine = SplitByLine(readAsStringAsync);
foreach (var s in splitByLine)
{
if (s.Contains("Rows"))
{ {
var strings = s.Split('|'); await csv.WriteRecordsAsync(insert._source);
if (strings[1].Trim() == "Rows imported") }
var httpContent = new MultipartFormDataContent(boundary);
if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization))
client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization);
httpContent.Add(new StringContent(schema), "schema");
httpContent.Add(new ByteArrayContent(stream.ToArray()), "data");
//boundary带双引号 可能导致服务器错误情况
httpContent.Headers.Remove("Content-Type");
httpContent.Headers.TryAddWithoutValidation("Content-Type",
"multipart/form-data; boundary=" + boundary);
var httpResponseMessage =
await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent);
var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync();
var splitByLine = SplitByLine(readAsStringAsync);
foreach (var s in splitByLine)
{
if (s.Contains("Rows"))
{ {
result = Convert.ToInt32(strings[2].Trim()); var strings = s.Split('|');
if (strings[1].Trim() == "Rows imported")
{
result = Convert.ToInt32(strings[2].Trim());
}
} }
} }
} }
@ -230,17 +238,6 @@ public static partial class QuestDbGlobalExtensions
{ {
throw e; throw e;
} }
finally
{
try
{
File.Delete(filePath);
}
catch
{
// ignored
}
}
return result; return result;
} }
@ -252,9 +249,9 @@ public static partial class QuestDbGlobalExtensions
/// <param name="insert"></param> /// <param name="insert"></param>
/// <param name="dateFormat">导入时,时间格式 默认:yyyy/M/d H:mm:ss</param> /// <param name="dateFormat">导入时,时间格式 默认:yyyy/M/d H:mm:ss</param>
/// <returns></returns> /// <returns></returns>
public static int ExecuteBulkCopy<T>(this IInsert<T> insert,string dateFormat = "yyyy/M/d H:mm:ss") where T : class public static int ExecuteBulkCopy<T>(this IInsert<T> insert, string dateFormat = "yyyy/M/d H:mm:ss") where T : class
{ {
return ExecuteBulkCopyAsync(insert,dateFormat).ConfigureAwait(false).GetAwaiter().GetResult(); return ExecuteQuestDbBulkCopyAsync(insert, dateFormat).ConfigureAwait(false).GetAwaiter().GetResult();
} }
} }
@ -344,6 +341,7 @@ static class RestAPIExtension
var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")); var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"));
authorization = $"Basic {base64}"; authorization = $"Basic {base64}";
} }
//RestApi需要无参数 //RestApi需要无参数
buider.UseNoneCommandParameter(true); buider.UseNoneCommandParameter(true);
return buider; return buider;