From f50dfc8f197716fd5ad8f9d48caa0c830c345955 Mon Sep 17 00:00:00 2001 From: tk Date: Tue, 7 Jan 2025 15:41:01 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8F=20=E8=AE=A1?= =?UTF-8?q?=E5=88=92=E4=BD=9C=E4=B8=9A=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EventBus/DefaultEventPublisher.cs | 12 ++-- .../NetAdmin.Infrastructure.csproj | 2 +- .../NetAdmin.Infrastructure/Schedule/IJob.cs | 12 ++++ .../Schedule/JobConfigAttribute.cs | 23 +++++++ .../Extensions/ServiceCollectionExtensions.cs | 66 +++++++++++++++---- .../Jobs/FreeScheduledJob.cs | 16 ++--- .../Jobs/ScheduledJob.cs | 19 ++---- 7 files changed, 108 insertions(+), 42 deletions(-) create mode 100644 src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/IJob.cs create mode 100644 src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/JobConfigAttribute.cs diff --git a/src/backend/NetAdmin/NetAdmin.Infrastructure/EventBus/DefaultEventPublisher.cs b/src/backend/NetAdmin/NetAdmin.Infrastructure/EventBus/DefaultEventPublisher.cs index 825917a0..510840dc 100644 --- a/src/backend/NetAdmin/NetAdmin.Infrastructure/EventBus/DefaultEventPublisher.cs +++ b/src/backend/NetAdmin/NetAdmin.Infrastructure/EventBus/DefaultEventPublisher.cs @@ -18,11 +18,13 @@ public sealed class DefaultEventPublisher : IEventPublisher _ = new TaskFactory().StartNew( // async state => { var subscribers = (List)state; - await foreach (var msg in _eventChannel.Reader.ReadAllAsync()) { - _ = Parallel.ForEach( // - subscribers.Where(x => x.GetParameters().FirstOrDefault()?.ParameterType == msg.GetType()) - , (x, _) => x.Invoke(App.GetService(x.DeclaringType), [msg])); - } + await Parallel.ForEachAsync(_eventChannel.Reader.ReadAllAsync(), (msg, __) => { + _ = Parallel.ForEach( // + subscribers.Where(x => x.GetParameters().FirstOrDefault()?.ParameterType == msg.GetType()) + , (x, _) => x.Invoke(App.GetService(x.DeclaringType), [msg])); + return ValueTask.CompletedTask; + }) + .ConfigureAwait(false); }, App.EffectiveTypes.Where(x => typeof(IEventSubscriber).IsAssignableFrom(x) && x.IsClass && !x.IsAbstract).SelectMany(x => x.GetMethods(BindingFlags.Instance | BindingFlags.Public).Where(y => y.IsDefined(typeof(EventSubscribeAttribute)))).ToList()); } diff --git a/src/backend/NetAdmin/NetAdmin.Infrastructure/NetAdmin.Infrastructure.csproj b/src/backend/NetAdmin/NetAdmin.Infrastructure/NetAdmin.Infrastructure.csproj index 32ada115..8742fe84 100644 --- a/src/backend/NetAdmin/NetAdmin.Infrastructure/NetAdmin.Infrastructure.csproj +++ b/src/backend/NetAdmin/NetAdmin.Infrastructure/NetAdmin.Infrastructure.csproj @@ -5,7 +5,7 @@ - + diff --git a/src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/IJob.cs b/src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/IJob.cs new file mode 100644 index 00000000..88b2fbf8 --- /dev/null +++ b/src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/IJob.cs @@ -0,0 +1,12 @@ +namespace NetAdmin.Infrastructure.Schedule; + +/// +/// 作业处理程序 +/// +public interface IJob +{ + /// + /// 具体处理逻辑 + /// + Task ExecuteAsync(CancellationToken cancelToken); +} \ No newline at end of file diff --git a/src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/JobConfigAttribute.cs b/src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/JobConfigAttribute.cs new file mode 100644 index 00000000..dc1673cb --- /dev/null +++ b/src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/JobConfigAttribute.cs @@ -0,0 +1,23 @@ +namespace NetAdmin.Infrastructure.Schedule; + +/// +/// 作业配置 +/// +[AttributeUsage(AttributeTargets.Class)] +public sealed class JobConfigAttribute : Attribute +{ + /// + /// 上一次执行时间 + /// + public DateTime? LastExecutionTime { get; set; } + + /// + /// 启动时运行 + /// + public bool RunOnStart { get; init; } + + /// + /// 触发器表达式 + /// + public string TriggerCron { get; init; } +} \ No newline at end of file diff --git a/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Extensions/ServiceCollectionExtensions.cs b/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Extensions/ServiceCollectionExtensions.cs index 48713245..fb1b88a3 100644 --- a/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Extensions/ServiceCollectionExtensions.cs +++ b/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Extensions/ServiceCollectionExtensions.cs @@ -1,8 +1,9 @@ -using Gurion.Schedule; +using Cronos; using NetAdmin.Domain.Contexts; using NetAdmin.Domain.Events; using NetAdmin.Host.Filters; -using NetAdmin.SysComponent.Host.Jobs; +using NetAdmin.Host.Middlewares; +using NetAdmin.Infrastructure.Schedule; using NetAdmin.SysComponent.Host.Utils; using FreeSqlBuilder = NetAdmin.Infrastructure.Utils.FreeSqlBuilder; @@ -62,17 +63,58 @@ public static class ServiceCollectionExtensions /// /// 添加定时任务 /// - public static IServiceCollection AddSchedules(this IServiceCollection me, bool force = false, Action optionsAction = null) + public static IServiceCollection AddSchedules(this IServiceCollection me, bool force = false) { - return App.WebHostEnvironment.IsProduction() || force - ? me.AddSchedule( // - builder => { - _ = builder // - .AddJob(true, Triggers.PeriodSeconds(1).SetRunOnStart(true)) - .AddJob(true, Triggers.PeriodMinutes(1).SetRunOnStart(true)); + if (!App.WebHostEnvironment.IsProduction() && !force) { + return me; + } - optionsAction?.Invoke(builder); - }) - : me; + var jobTypes = App.EffectiveTypes + .Where(x => typeof(IJob).IsAssignableFrom(x) && x.IsClass && !x.IsAbstract && x.IsDefined(typeof(JobConfigAttribute))) + .ToDictionary(x => x, x => x.GetCustomAttribute()); + var runOnStartJobTypes = jobTypes.Where(x => // + x.Value.RunOnStart); + RunJob(runOnStartJobTypes); + _ = Task.Run(LoopTaskAsync); + return me; + + #pragma warning disable S2190 + async Task LoopTaskAsync() + #pragma warning restore S2190 + { + while (true) { + await Task.Delay(1000).ConfigureAwait(false); + if (SafetyShopHostMiddleware.IsShutdown) { + Console.WriteLine(Ln.此节点已下线); + } + else { + RunJob(jobTypes.Where(Filter)); + } + } + + bool Filter(KeyValuePair x) + { + return !x.Value.TriggerCron.NullOrEmpty() && + CronExpression.Parse(x.Value.TriggerCron, CronFormat.IncludeSeconds) + .GetNextOccurrence(x.Value.LastExecutionTime ?? DateTime.UtcNow.AddDays(-1), TimeZoneInfo.Local) + ?.ToLocalTime() <= DateTime.Now; + } + + // ReSharper disable once FunctionNeverReturns + } + } + + private static void RunJob(IEnumerable> jobTypes) + { + foreach (var job in jobTypes) { + try { + _ = typeof(IJob).GetMethod(nameof(IJob.ExecuteAsync))!.Invoke( // + Activator.CreateInstance(job.Key), [CancellationToken.None]); + job.Value.LastExecutionTime = DateTime.UtcNow; + } + catch (Exception ex) { + LogHelper.Get().Error(ex); + } + } } } \ No newline at end of file diff --git a/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/FreeScheduledJob.cs b/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/FreeScheduledJob.cs index 61456dd6..02aef981 100644 --- a/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/FreeScheduledJob.cs +++ b/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/FreeScheduledJob.cs @@ -1,12 +1,12 @@ -using Gurion.Schedule; using NetAdmin.Host.BackgroundRunning; -using NetAdmin.Host.Middlewares; +using NetAdmin.Infrastructure.Schedule; namespace NetAdmin.SysComponent.Host.Jobs; /// /// 释放计划作业 /// +[JobConfig(TriggerCron = "0 * * * * *")] public sealed class FreeScheduledJob : WorkBase, IJob { private readonly IJobService _jobService; @@ -22,17 +22,11 @@ public sealed class FreeScheduledJob : WorkBase, IJob /// /// 具体处理逻辑 /// - /// 作业执行前上下文 - /// 取消任务 Token + /// 取消任务 Token /// 加锁失败异常 - public async Task ExecuteAsync(JobExecutingContext context, CancellationToken stoppingToken) + public async Task ExecuteAsync(CancellationToken cancelToken) { - if (SafetyShopHostMiddleware.IsShutdown) { - Console.WriteLine(Ln.此节点已下线); - return; - } - - await WorkflowAsync(true, stoppingToken).ConfigureAwait(false); + await WorkflowAsync(true, cancelToken).ConfigureAwait(false); } /// diff --git a/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/ScheduledJob.cs b/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/ScheduledJob.cs index 59433522..18b97af5 100644 --- a/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/ScheduledJob.cs +++ b/src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/ScheduledJob.cs @@ -1,18 +1,18 @@ using FreeSql.Internal; using Gurion.RemoteRequest; using Gurion.RemoteRequest.Extensions; -using Gurion.Schedule; using NetAdmin.Application.Extensions; using NetAdmin.Domain.Dto.Sys.Job; using NetAdmin.Domain.Dto.Sys.JobRecord; using NetAdmin.Host.BackgroundRunning; -using NetAdmin.Host.Middlewares; +using NetAdmin.Infrastructure.Schedule; namespace NetAdmin.SysComponent.Host.Jobs; /// /// 计划作业 /// +[JobConfig(TriggerCron = "* * * * * *")] public sealed class ScheduledJob : WorkBase, IJob { private static string _accessToken; @@ -30,19 +30,12 @@ public sealed class ScheduledJob : WorkBase, IJob /// /// 具体处理逻辑 /// - /// 作业执行前上下文 - /// 取消任务 Token + /// 取消任务 Token /// 加锁失败异常 - public async Task ExecuteAsync(JobExecutingContext context, CancellationToken stoppingToken) + public Task ExecuteAsync(CancellationToken cancelToken) { - if (SafetyShopHostMiddleware.IsShutdown) { - Console.WriteLine(Ln.此节点已下线); - return; - } - - // ReSharper disable once MethodSupportsCancellation - await Parallel.ForAsync(0, Numbers.SCHEDULED_JOB_PARALLEL_NUM, async (_, _) => await WorkflowAsync(stoppingToken).ConfigureAwait(false)) - .ConfigureAwait(false); + return Parallel.ForAsync(0, Numbers.SCHEDULED_JOB_PARALLEL_NUM, cancelToken + , async (_, _) => await WorkflowAsync(cancelToken).ConfigureAwait(false)); } ///