feat: 计划作业 (#87)

This commit is contained in:
2024-02-02 14:05:39 +08:00
committed by GitHub
parent 473b0c26f2
commit 8293ec0297
70 changed files with 2171 additions and 14 deletions

View File

@@ -0,0 +1,92 @@
using NetAdmin.Domain.Dto.Dependency;
using NetAdmin.Domain.Dto.Sys.Job;
using NetAdmin.Host.Attributes;
using NetAdmin.Host.Controllers;
using NetAdmin.SysComponent.Application.Modules.Sys;
using NetAdmin.SysComponent.Application.Services.Sys.Dependency;
using NetAdmin.SysComponent.Cache.Sys.Dependency;
namespace NetAdmin.SysComponent.Host.Controllers.Sys;
/// <summary>
/// 计划作业服务
/// </summary>
[ApiDescriptionSettings(nameof(Sys), Module = nameof(Sys))]
public sealed class JobController(IJobCache cache) : ControllerBase<IJobCache, IJobService>(cache), IJobModule
{
/// <summary>
/// 批量删除计划作业
/// </summary>
[Transaction]
public Task<int> BulkDeleteAsync(BulkReq<DelReq> req)
{
return Cache.BulkDeleteAsync(req);
}
/// <summary>
/// 创建计划作业
/// </summary>
[Transaction]
public Task<QueryJobRsp> CreateAsync(CreateJobReq req)
{
return Cache.CreateAsync(req);
}
/// <summary>
/// 删除计划作业
/// </summary>
[Transaction]
public Task<int> DeleteAsync(DelReq req)
{
return Cache.DeleteAsync(req);
}
/// <summary>
/// 计划作业是否存在
/// </summary>
public Task<bool> ExistAsync(QueryReq<QueryJobReq> req)
{
return Cache.ExistAsync(req);
}
/// <summary>
/// 获取单个计划作业
/// </summary>
public Task<QueryJobRsp> GetAsync(QueryJobReq req)
{
return Cache.GetAsync(req);
}
/// <summary>
/// 分页查询计划作业
/// </summary>
public Task<PagedQueryRsp<QueryJobRsp>> PagedQueryAsync(PagedQueryReq<QueryJobReq> req)
{
return Cache.PagedQueryAsync(req);
}
/// <summary>
/// 查询计划作业
/// </summary>
public Task<IEnumerable<QueryJobRsp>> QueryAsync(QueryReq<QueryJobReq> req)
{
return Cache.QueryAsync(req);
}
/// <summary>
/// 启用/禁用作业
/// </summary>
public Task SetEnabledAsync(UpdateJobReq req)
{
return Cache.SetEnabledAsync(req);
}
/// <summary>
/// 更新计划作业
/// </summary>
[Transaction]
public Task<QueryJobRsp> UpdateAsync(UpdateJobReq req)
{
return Cache.UpdateAsync(req);
}
}

View File

@@ -0,0 +1,21 @@
using Furion.Schedule;
using NetAdmin.SysComponent.Host.Jobs;
namespace NetAdmin.SysComponent.Host.Extensions;
/// <summary>
/// ServiceCollection 扩展方法
/// </summary>
[SuppressSniffer]
public static class ServiceCollectionExtensions
{
/// <summary>
/// 添加定时任务
/// </summary>
public static IServiceCollection AddSchedules(this IServiceCollection me)
{
return me.AddSchedule( //
builder => builder //
.AddJob<ScheduledJob>(false, Triggers.PeriodSeconds(5).SetRunOnStart(true)));
}
}

View File

@@ -0,0 +1,141 @@
using FreeSql.Internal;
using Furion.RemoteRequest;
using Furion.RemoteRequest.Extensions;
using Furion.Schedule;
using NetAdmin.Domain.DbMaps.Sys;
using NetAdmin.Domain.Dto.Sys.Job;
using NetAdmin.Domain.Dto.Sys.JobRecord;
using NetAdmin.Host.BackgroundRunning;
using NetAdmin.Host.Extensions;
using NetAdmin.SysComponent.Application.Services.Sys.Dependency;
namespace NetAdmin.SysComponent.Host.Jobs;
/// <summary>
/// 计划作业
/// </summary>
public sealed class ScheduledJob : WorkBase<ScheduledJob>, IJob
{
private static string _accessToken;
private static string _refreshToken;
private readonly IJobRecordService _jobRecordService;
private readonly IJobService _jobService;
private readonly ILogger<ScheduledJob> _logger;
private readonly IUserService _userService;
private string _requestHeader;
/// <summary>
/// Initializes a new instance of the <see cref="ScheduledJob" /> class.
/// </summary>
public ScheduledJob()
{
_jobRecordService = ServiceProvider.GetService<IJobRecordService>();
_jobService = ServiceProvider.GetService<IJobService>();
_logger = ServiceProvider.GetService<ILogger<ScheduledJob>>();
_userService = ServiceProvider.GetService<IUserService>();
}
/// <summary>
/// 具体处理逻辑
/// </summary>
/// <param name="context">作业执行前上下文</param>
/// <param name="stoppingToken">取消任务 Token</param>
/// <exception cref="NetAdminGetLockerException">加锁失败异常</exception>
public async Task ExecuteAsync(JobExecutingContext context, CancellationToken stoppingToken)
{
await WorkflowAsync(stoppingToken).ConfigureAwait(false);
}
/// <summary>
/// 通用工作流
/// </summary>
/// <exception cref="NotImplementedException">NotImplementedException</exception>
/// <exception cref="ArgumentOutOfRangeException">ArgumentOutOfRangeException</exception>
protected override async ValueTask WorkflowAsync(CancellationToken cancelToken)
{
QueryJobRsp job = null;
try {
job = await _jobService.GetNextJobAsync().ConfigureAwait(false);
}
catch (DbUpdateVersionException) {
// ignore
}
if (job == null) {
_logger.Info(Ln.);
return;
}
var request = BuildRequest(job);
var sw = new Stopwatch();
sw.Start();
var rsp = await request.SendAsync(cancelToken).ConfigureAwait(false);
if (rsp.StatusCode == HttpStatusCode.Unauthorized) {
var loginRsp = await _userService.LoginByUserIdAsync(job.UserId).ConfigureAwait(false);
#pragma warning disable S2696
_accessToken = loginRsp.AccessToken;
_refreshToken = loginRsp.RefreshToken;
#pragma warning restore S2696
request = BuildRequest(job);
rsp = await request.SendAsync(cancelToken).ConfigureAwait(false);
}
sw.Stop();
await UowManager.AtomicOperateAsync(async () => {
var rspBody = await rsp.Content.ReadAsStringAsync(cancelToken).ConfigureAwait(false);
var jobRecord = new CreateJobRecordReq //
{
Duration = sw.ElapsedMilliseconds
, HttpMethod = job.HttpMethod
, HttpStatusCode = rsp.StatusCode
, JobId = job.Id
, RequestBody = job.RequestBody
, RequestHeader = _requestHeader
, RequestUrl = job.RequestUrl
, ResponseBody = rspBody
, ResponseHeader = rsp.Headers.Json()
, TimeId = job.NextTimeId!.Value
};
_ = await _jobRecordService.CreateAsync(jobRecord).ConfigureAwait(false);
await _jobService
.FinishJobAsync(job.Adapt<UpdateJobReq>() with { LastStatusCode = rsp.StatusCode })
.ConfigureAwait(false);
})
.ConfigureAwait(false);
}
private HttpRequestPart BuildRequest(Sys_Job job)
{
var ret = job.RequestUrl.SetHttpMethod(new HttpMethod(job.HttpMethod.ToString()));
var headers = new Dictionary<string, string>();
if (!_accessToken.NullOrEmpty()) {
headers.Add(Chars.FLG_ACCESS_TOKEN_HEADER_KEY, $"{Chars.FLG_AUTH_SCHEMA} {_accessToken}");
}
if (!_refreshToken.NullOrEmpty()) {
headers.Add(Chars.FLG_X_ACCESS_TOKEN_HEADER_KEY, $"{Chars.FLG_AUTH_SCHEMA} {_refreshToken}");
}
if (!job.RequestHeader.NullOrEmpty()) {
ret = ret.SetHeaders(headers.Union(job.RequestHeader.Object<Dictionary<string, string>>())
.ToDictionary(x => x.Key, x => x.Value));
}
if (!job.RequestBody.NullOrEmpty()) {
ret = ret.SetBody(job.RequestBody);
}
return ret.OnResponsing(GetRequestHeader).OnException(GetRequestHeader);
}
private void GetRequestHeader(HttpClient _, HttpResponseMessage rsp, string __)
{
_requestHeader = rsp!.RequestMessage!.Headers.Json();
}
private void GetRequestHeader(HttpClient _, HttpResponseMessage rsp)
{
GetRequestHeader(_, rsp, null);
}
}