using Cronos;
using FreeSql.Internal;
using NetAdmin.Application.Repositories;
using NetAdmin.Application.Services;
using NetAdmin.Domain.Dto.Dependency;
using NetAdmin.Domain.Dto.Sys;
using NetAdmin.Domain.Dto.Sys.Job;
using NetAdmin.Domain.Dto.Sys.JobRecord;
using NetAdmin.Domain.Enums.Sys;
using NetAdmin.SysComponent.Application.Services.Sys.Dependency;
namespace NetAdmin.SysComponent.Application.Services.Sys;
///
public sealed class JobService(BasicRepository rpo, IJobRecordService jobRecordService) //
: RepositoryService(rpo), IJobService
{
///
public async Task BulkDeleteAsync(BulkReq req)
{
req.ThrowIfInvalid();
var ret = 0;
// ReSharper disable once LoopCanBeConvertedToQuery
foreach (var item in req.Items) {
ret += await DeleteAsync(item).ConfigureAwait(false);
}
return ret;
}
///
public Task CountAsync(QueryReq req)
{
req.ThrowIfInvalid();
return QueryInternal(req)
#if DBTYPE_SQLSERVER
.WithLock(SqlServerLock.NoLock | SqlServerLock.NoWait)
#endif
.CountAsync();
}
///
public Task CountRecordAsync(QueryReq req)
{
return jobRecordService.CountAsync(req);
}
///
public async Task CreateAsync(CreateJobReq req)
{
req.ThrowIfInvalid();
var nextExecTime = GetNextExecTime(req.ExecutionCron);
var ret = await Rpo.InsertAsync(req with {
NextExecTime = nextExecTime
, NextTimeId = nextExecTime?.TimeUnixUtc()
, RequestHeader = req.RequestHeaders?.Json()
})
.ConfigureAwait(false);
return ret.Adapt();
}
///
public async Task DeleteAsync(DelReq req)
{
req.ThrowIfInvalid();
var ret = await Rpo.DeleteCascadeByDatabaseAsync(a => a.Id == req.Id).ConfigureAwait(false);
return ret.Count;
}
///
public async Task EditAsync(EditJobReq req)
{
req.ThrowIfInvalid();
var update = Rpo.UpdateDiy.Set(a => a.ExecutionCron == req.ExecutionCron)
.Set(a => a.HttpMethod == req.HttpMethod)
.Set(a => a.JobName == req.JobName)
.SetIf(req.RequestHeaders == null, a => a.RequestHeader, null)
.SetIf(req.RequestHeaders != null, a => a.RequestHeader, req.RequestHeaders.Json())
.Set(a => a.RequestBody == req.RequestBody)
.Set(a => a.RequestUrl == req.RequestUrl)
.Set(a => a.RandomDelayBegin == req.RandomDelayBegin)
.Set(a => a.RandomDelayEnd == req.RandomDelayEnd)
.Set(a => a.UserId == req.UserId)
.Set(a => a.Summary == req.Summary)
.Where(a => a.Id == req.Id);
#if DBTYPE_SQLSERVER
return (await update.ExecuteUpdatedAsync().ConfigureAwait(false)).FirstOrDefault()?.Adapt();
#else
return await update.ExecuteAffrowsAsync().ConfigureAwait(false) <= 0
? null
: await GetAsync(new QueryJobReq { Id = req.Id }).ConfigureAwait(false);
#endif
}
///
public async Task ExecuteAsync(QueryJobReq req)
{
req.ThrowIfInvalid();
var df = new DynamicFilterInfo {
Filters = [
new DynamicFilterInfo {
Field = nameof(QueryJobReq.Enabled)
, Operator = DynamicFilterOperators.Eq
, Value = true
}
, new DynamicFilterInfo {
Field = nameof(QueryJobReq.Status)
, Operator = DynamicFilterOperators.Eq
, Value = JobStatues.Idle
}
]
};
var job = await QueryInternal(new QueryReq { Count = 1, Filter = req, DynamicFilter = df })
.ToOneAsync()
.ConfigureAwait(false) ?? throw new NetAdminInvalidOperationException(Ln.未获取到待执行任务);
var nextExecTime = GetNextExecTime(Chars.FLG_CRON_PER_SECS);
try {
_ = await UpdateAsync( //
job with { NextExecTime = nextExecTime, NextTimeId = nextExecTime?.TimeUnixUtc() }
, [nameof(job.NextExecTime), nameof(job.NextTimeId)])
.ConfigureAwait(false);
}
catch (DbUpdateVersionException) {
throw new NetAdminInvalidOperationException(Ln.并发冲突_请稍后重试);
}
}
///
public Task ExistAsync(QueryReq req)
{
req.ThrowIfInvalid();
return QueryInternal(req)
#if DBTYPE_SQLSERVER
.WithLock(SqlServerLock.NoLock | SqlServerLock.NoWait)
#endif
.AnyAsync();
}
///
public Task ExportAsync(QueryReq req)
{
req.ThrowIfInvalid();
return ExportAsync(QueryInternal, req, Ln.计划作业导出);
}
///
public Task ExportRecordAsync(QueryReq req)
{
req.ThrowIfInvalid();
return jobRecordService.ExportAsync(req);
}
///
public async Task FinishJobAsync(FinishJobReq req)
{
req.ThrowIfInvalid();
var nextExecTime = GetNextExecTime(req.ExecutionCron);
_ = await UpdateAsync(
req with {
Status = JobStatues.Idle
, NextExecTime = nextExecTime
, NextTimeId = nextExecTime?.TimeUnixUtc()
}
, [
nameof(req.Status), nameof(req.NextExecTime), nameof(req.NextTimeId), nameof(req.LastDuration)
, nameof(req.LastStatusCode)
])
.ConfigureAwait(false);
}
///
public async Task GetAsync(QueryJobReq req)
{
req.ThrowIfInvalid();
var ret = await QueryInternal(new QueryReq { Filter = req }).ToOneAsync().ConfigureAwait(false);
return ret.Adapt();
}
///
public async Task GetNextJobAsync()
{
var df = new DynamicFilterInfo {
Filters = [
new DynamicFilterInfo {
Field = nameof(QueryJobReq.NextExecTime)
, Value = DateTime.Now
, Operator = DynamicFilterOperators.LessThan
}
, new DynamicFilterInfo {
Field = nameof(QueryJobReq.Status)
, Value = JobStatues.Idle
, Operator = DynamicFilterOperators.Eq
}
, new DynamicFilterInfo {
Field = nameof(QueryJobReq.Enabled)
, Value = true
, Operator = DynamicFilterOperators.Eq
}
]
};
var job = await QueryInternal(new QueryReq { DynamicFilter = df, Order = Orders.Random })
.Take(1)
.Where(a => !Rpo.Orm.Select()
.As("b")
.Where(b => b.JobId == a.Id && b.TimeId == a.NextTimeId)
.Any())
.ToOneAsync()
.ConfigureAwait(false);
if (job == null) {
return null;
}
#if DBTYPE_SQLSERVER
var ret = await UpdateReturnListAsync( //
job with { Status = JobStatues.Running, LastExecTime = DateTime.Now }
, [nameof(job.Status), nameof(job.LastExecTime)])
.ConfigureAwait(false);
return ret.FirstOrDefault()?.Adapt();
#else
return await UpdateAsync( //
job with { Status = JobStatues.Running, LastExecTime = DateTime.Now }
, [nameof(job.Status), nameof(job.LastExecTime)])
.ConfigureAwait(false) > 0
? await GetAsync(new QueryJobReq { Id = job.Id }).ConfigureAwait(false)
: null;
#endif
}
///
public Task GetRecordAsync(QueryJobRecordReq req)
{
req.ThrowIfInvalid();
return jobRecordService.GetAsync(req);
}
///
public Task> GetRecordBarChartAsync(QueryReq req)
{
req.ThrowIfInvalid();
return jobRecordService.GetBarChartAsync(req);
}
///
public Task> GetRecordPieChartByHttpStatusCodeAsync(QueryReq req)
{
req.ThrowIfInvalid();
return jobRecordService.GetPieChartByHttpStatusCodeAsync(req);
}
///
public Task> GetRecordPieChartByNameAsync(QueryReq req)
{
req.ThrowIfInvalid();
return jobRecordService.GetPieChartByNameAsync(req);
}
///
public async Task> PagedQueryAsync(PagedQueryReq req)
{
req.ThrowIfInvalid();
var list = await QueryInternal(req)
.Page(req.Page, req.PageSize)
#if DBTYPE_SQLSERVER
.WithLock(SqlServerLock.NoLock | SqlServerLock.NoWait)
#endif
.Count(out var total)
.ToListAsync()
.ConfigureAwait(false);
return new PagedQueryRsp(req.Page, req.PageSize, total, list.Adapt>());
}
///
public Task> PagedQueryRecordAsync(PagedQueryReq req)
{
req.ThrowIfInvalid();
return jobRecordService.PagedQueryAsync(req);
}
///
public async Task> QueryAsync(QueryReq req)
{
req.ThrowIfInvalid();
var ret = await QueryInternal(req)
#if DBTYPE_SQLSERVER
.WithLock(SqlServerLock.NoLock | SqlServerLock.NoWait)
#endif
.Take(req.Count)
.ToListAsync()
.ConfigureAwait(false);
return ret.Adapt>();
}
///
public async Task ReleaseStuckTaskAsync()
{
var ret1 = await UpdateAsync( // 运行中,运行时间超过超时设定;置为空闲状态
new Sys_Job { Status = JobStatues.Idle }, [nameof(Sys_Job.Status)], null
, a => a.Status == JobStatues.Running &&
a.LastExecTime < DateTime.Now.AddSeconds(-Numbers.SECS_TIMEOUT_JOB)
, null, true)
.ConfigureAwait(false);
var ret2 = await UpdateAsync( // 空闲中,下次执行时间在当前时间减去超时时间以前;将下次执行时间调整到现在
new Sys_Job { NextExecTime = DateTime.Now, NextTimeId = DateTime.Now.TimeUnixUtc() }
, [nameof(Sys_Job.NextExecTime), nameof(Sys_Job.NextTimeId)], null
, a => a.Status == JobStatues.Idle && a.NextExecTime < DateTime.Now.AddSeconds(-Numbers.SECS_TIMEOUT_JOB)
, null, true)
.ConfigureAwait(false);
return ret1 + ret2;
}
///
public Task SetEnabledAsync(SetJobEnabledReq req)
{
req.ThrowIfInvalid();
return UpdateAsync(req, [nameof(Sys_Job.Enabled)]);
}
private static DateTime? GetNextExecTime(string cron)
{
return CronExpression.Parse(cron, CronFormat.IncludeSeconds)
.GetNextOccurrence(DateTime.UtcNow, TimeZoneInfo.Local)
?.ToLocalTime();
}
private ISelect QueryInternal(QueryReq req)
{
var ret = Rpo.Select.Include(a => a.User)
.WhereDynamicFilter(req.DynamicFilter)
.WhereDynamic(req.Filter)
.WhereIf( //
req.Keywords?.Length > 0
, a => a.Id == req.Keywords.Int64Try(0) || a.JobName.Contains(req.Keywords));
// ReSharper disable once SwitchStatementMissingSomeEnumCasesNoDefault
switch (req.Order) {
case Orders.None:
return ret;
case Orders.Random:
return ret.OrderByRandom();
}
ret = ret.OrderByPropertyNameIf(req.Prop?.Length > 0, req.Prop, req.Order == Orders.Ascending);
if (!req.Prop?.Equals(nameof(req.Filter.LastExecTime), StringComparison.OrdinalIgnoreCase) ?? true) {
ret = ret.OrderByDescending(a => a.LastExecTime);
}
return ret;
}
}