- 移除 In多表表达式函数解析 #243;

- 调整 SafeObjectPool 源码移入项目;
This commit is contained in:
28810
2020-03-20 20:32:49 +08:00
parent 25312ceead
commit 72781596bd
49 changed files with 1138 additions and 174 deletions

View File

@ -1,4 +1,4 @@
using SafeObjectPool;
using FreeSql.Internal.ObjectPool;
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;

View File

@ -1,5 +1,5 @@
using FreeSql.Internal.Model;
using SafeObjectPool;
using FreeSql.Internal.ObjectPool;
using System;
using System.Collections.Generic;
using System.Data;

View File

@ -1,4 +1,4 @@
using SafeObjectPool;
using FreeSql.Internal.ObjectPool;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

View File

@ -1,4 +1,4 @@
using SafeObjectPool;
using FreeSql.Internal.ObjectPool;
using System;
using System.Collections.Generic;
using System.Data;

View File

@ -1,6 +1,6 @@
using FreeSql.Internal.Model;
using FreeSql.Extensions.EntityUtil;
using SafeObjectPool;
using FreeSql.Internal.ObjectPool;
using System;
using System.Collections.Generic;
using System.Data;

View File

@ -1,5 +1,5 @@
using FreeSql.Internal.Model;
using SafeObjectPool;
using FreeSql.Internal.ObjectPool;
using System;
using System.Collections.Generic;
using System.Data;

View File

@ -2,7 +2,7 @@
using FreeSql.DatabaseModel;
using FreeSql.Extensions.EntityUtil;
using FreeSql.Internal.Model;
using SafeObjectPool;
using FreeSql.Internal.ObjectPool;
using System;
using System.Collections;
using System.Collections.Concurrent;

View File

@ -0,0 +1,74 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace FreeSql.Internal.ObjectPool
{
public class DefaultPolicy<T> : IPolicy<T>
{
public string Name { get; set; } = typeof(DefaultPolicy<T>).GetType().FullName;
public int PoolSize { get; set; } = 1000;
public TimeSpan SyncGetTimeout { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan IdleTimeout { get; set; } = TimeSpan.FromSeconds(50);
public int AsyncGetCapacity { get; set; } = 10000;
public bool IsThrowGetTimeoutException { get; set; } = true;
public int CheckAvailableInterval { get; set; } = 5;
public Func<T> CreateObject;
public Action<Object<T>> OnGetObject;
public T OnCreate()
{
return CreateObject();
}
public void OnDestroy(T obj)
{
}
public void OnGet(Object<T> obj)
{
//Console.WriteLine("Get: " + obj);
OnGetObject?.Invoke(obj);
}
#if net40
#else
public Task OnGetAsync(Object<T> obj)
{
//Console.WriteLine("GetAsync: " + obj);
OnGetObject?.Invoke(obj);
return Task.FromResult(true);
}
#endif
public void OnGetTimeout()
{
}
public void OnReturn(Object<T> obj)
{
//Console.WriteLine("Return: " + obj);
}
public bool OnCheckAvailable(Object<T> obj)
{
return true;
}
public void OnAvailable()
{
}
public void OnUnavailable()
{
}
}
}

View File

@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace FreeSql.Internal.ObjectPool
{
public interface IObjectPool<T> : IDisposable
{
IPolicy<T> Policy { get; }
/// <summary>
/// 是否可用
/// </summary>
bool IsAvailable { get; }
/// <summary>
/// 不可用错误
/// </summary>
Exception UnavailableException { get; }
/// <summary>
/// 不可用时间
/// </summary>
DateTime? UnavailableTime { get; }
/// <summary>
/// 将对象池设置为不可用,后续 Get/GetAsync 均会报错,同时启动后台定时检查服务恢复可用
/// </summary>
/// <param name="exception"></param>
/// <returns>由【可用】变成【不可用】时返回true否则返回false</returns>
bool SetUnavailable(Exception exception);
/// <summary>
/// 统计对象池中的对象
/// </summary>
string Statistics { get; }
/// <summary>
/// 统计对象池中的对象(完整)
/// </summary>
string StatisticsFullily { get; }
/// <summary>
/// 获取资源
/// </summary>
/// <param name="timeout">超时</param>
/// <returns></returns>
Object<T> Get(TimeSpan? timeout = null);
#if net40
#else
/// <summary>
/// 获取资源
/// </summary>
/// <returns></returns>
Task<Object<T>> GetAsync();
#endif
/// <summary>
/// 使用完毕后,归还资源
/// </summary>
/// <param name="obj">对象</param>
/// <param name="isReset">是否重新创建</param>
void Return(Object<T> obj, bool isReset = false);
}
}

View File

@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace FreeSql.Internal.ObjectPool
{
public interface IPolicy<T>
{
/// <summary>
/// 名称
/// </summary>
string Name { get; set; }
/// <summary>
/// 池容量
/// </summary>
int PoolSize { get; set; }
/// <summary>
/// 默认获取超时设置
/// </summary>
TimeSpan SyncGetTimeout { get; set; }
/// <summary>
/// 空闲时间,获取时若超出,则重新创建
/// </summary>
TimeSpan IdleTimeout { get; set; }
/// <summary>
/// 异步获取排队队列大小小于等于0不生效
/// </summary>
int AsyncGetCapacity { get; set; }
/// <summary>
/// 获取超时后,是否抛出异常
/// </summary>
bool IsThrowGetTimeoutException { get; set; }
/// <summary>
/// 后台定时检查可用性间隔秒数
/// </summary>
int CheckAvailableInterval { get; set; }
/// <summary>
/// 对象池的对象被创建时
/// </summary>
/// <returns>返回被创建的对象</returns>
T OnCreate();
/// <summary>
/// 销毁对象
/// </summary>
/// <param name="obj">资源对象</param>
void OnDestroy(T obj);
/// <summary>
/// 从对象池获取对象超时的时候触发,通过该方法统计
/// </summary>
void OnGetTimeout();
/// <summary>
/// 从对象池获取对象成功的时候触发,通过该方法统计或初始化对象
/// </summary>
/// <param name="obj">资源对象</param>
void OnGet(Object<T> obj);
#if net40
#else
/// <summary>
/// 从对象池获取对象成功的时候触发,通过该方法统计或初始化对象
/// </summary>
/// <param name="obj">资源对象</param>
Task OnGetAsync(Object<T> obj);
#endif
/// <summary>
/// 归还对象给对象池的时候触发
/// </summary>
/// <param name="obj">资源对象</param>
void OnReturn(Object<T> obj);
/// <summary>
/// 检查可用性
/// </summary>
/// <param name="obj">资源对象</param>
/// <returns></returns>
bool OnCheckAvailable(Object<T> obj);
/// <summary>
/// 事件:可用时触发
/// </summary>
void OnAvailable();
/// <summary>
/// 事件:不可用时触发
/// </summary>
void OnUnavailable();
}
}

View File

@ -0,0 +1,93 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace FreeSql.Internal.ObjectPool
{
public class Object<T> : IDisposable
{
public static Object<T> InitWith(IObjectPool<T> pool, int id, T value)
{
return new Object<T>
{
Pool = pool,
Id = id,
Value = value,
LastGetThreadId = Thread.CurrentThread.ManagedThreadId,
LastGetTime = DateTime.Now
};
}
/// <summary>
/// 所属对象池
/// </summary>
public IObjectPool<T> Pool { get; internal set; }
/// <summary>
/// 在对象池中的唯一标识
/// </summary>
public int Id { get; internal set; }
/// <summary>
/// 资源对象
/// </summary>
public T Value { get; internal set; }
internal long _getTimes;
/// <summary>
/// 被获取的总次数
/// </summary>
public long GetTimes => _getTimes;
/// 最后获取时的时间
public DateTime LastGetTime { get; internal set; }
/// <summary>
/// 最后归还时的时间
/// </summary>
public DateTime LastReturnTime { get; internal set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreateTime { get; internal set; } = DateTime.Now;
/// <summary>
/// 最后获取时的线程id
/// </summary>
public int LastGetThreadId { get; internal set; }
/// <summary>
/// 最后归还时的线程id
/// </summary>
public int LastReturnThreadId { get; internal set; }
public override string ToString()
{
return $"{this.Value}, Times: {this.GetTimes}, ThreadId(R/G): {this.LastReturnThreadId}/{this.LastGetThreadId}, Time(R/G): {this.LastReturnTime.ToString("yyyy-MM-dd HH:mm:ss:ms")}/{this.LastGetTime.ToString("yyyy-MM-dd HH:mm:ss:ms")}";
}
/// <summary>
/// 重置 Value 值
/// </summary>
public void ResetValue()
{
if (this.Value != null)
{
try { this.Pool.Policy.OnDestroy(this.Value); } catch { }
try { (this.Value as IDisposable)?.Dispose(); } catch { }
}
T value = default(T);
try { value = this.Pool.Policy.OnCreate(); } catch { }
this.Value = value;
this.LastReturnTime = DateTime.Now;
}
internal bool _isReturned = false;
public void Dispose()
{
Pool?.Return(this);
}
}
}

View File

@ -0,0 +1,547 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
namespace FreeSql.Internal.ObjectPool
{
/// <summary>
/// 对象池管理类
/// </summary>
/// <typeparam name="T">对象类型</typeparam>
public partial class ObjectPool<T> : IObjectPool<T>
{
public IPolicy<T> Policy { get; protected set; }
private List<Object<T>> _allObjects = new List<Object<T>>();
private object _allObjectsLock = new object();
private ConcurrentStack<Object<T>> _freeObjects = new ConcurrentStack<Object<T>>();
private ConcurrentQueue<GetSyncQueueInfo> _getSyncQueue = new ConcurrentQueue<GetSyncQueueInfo>();
private ConcurrentQueue<TaskCompletionSource<Object<T>>> _getAsyncQueue = new ConcurrentQueue<TaskCompletionSource<Object<T>>>();
private ConcurrentQueue<bool> _getQueue = new ConcurrentQueue<bool>();
public bool IsAvailable => this.UnavailableException == null;
public Exception UnavailableException { get; private set; }
public DateTime? UnavailableTime { get; private set; }
private object UnavailableLock = new object();
private bool running = true;
public bool SetUnavailable(Exception exception)
{
bool isseted = false;
if (exception != null && UnavailableException == null)
{
lock (UnavailableLock)
{
if (UnavailableException == null)
{
UnavailableException = exception;
UnavailableTime = DateTime.Now;
isseted = true;
}
}
}
if (isseted)
{
Policy.OnUnavailable();
CheckAvailable(Policy.CheckAvailableInterval);
}
return isseted;
}
/// <summary>
/// 后台定时检查可用性
/// </summary>
/// <param name="interval"></param>
private void CheckAvailable(int interval)
{
new Thread(() =>
{
if (UnavailableException != null)
{
var bgcolor = Console.BackgroundColor;
var forecolor = Console.ForegroundColor;
Console.BackgroundColor = ConsoleColor.DarkYellow;
Console.ForegroundColor = ConsoleColor.White;
Console.Write($"【{Policy.Name}】恢复检查时间:{DateTime.Now.AddSeconds(interval)}");
Console.BackgroundColor = bgcolor;
Console.ForegroundColor = forecolor;
Console.WriteLine();
}
while (UnavailableException != null)
{
if (running == false) return;
Thread.CurrentThread.Join(TimeSpan.FromSeconds(interval));
if (running == false) return;
try
{
var conn = getFree(false);
if (conn == null) throw new Exception($"CheckAvailable 无法获得资源,{this.Statistics}");
try
{
if (Policy.OnCheckAvailable(conn) == false) throw new Exception("CheckAvailable 应抛出异常,代表仍然不可用。");
break;
}
finally
{
Return(conn);
}
}
catch (Exception ex)
{
var bgcolor = Console.BackgroundColor;
var forecolor = Console.ForegroundColor;
Console.BackgroundColor = ConsoleColor.DarkYellow;
Console.ForegroundColor = ConsoleColor.White;
Console.Write($"【{Policy.Name}】仍然不可用,下一次恢复检查时间:{DateTime.Now.AddSeconds(interval)},错误:({ex.Message})");
Console.BackgroundColor = bgcolor;
Console.ForegroundColor = forecolor;
Console.WriteLine();
}
}
RestoreToAvailable();
}).Start();
}
private void RestoreToAvailable()
{
bool isRestored = false;
if (UnavailableException != null)
{
lock (UnavailableLock)
{
if (UnavailableException != null)
{
UnavailableException = null;
UnavailableTime = null;
isRestored = true;
}
}
}
if (isRestored)
{
lock (_allObjectsLock)
_allObjects.ForEach(a => a.LastGetTime = a.LastReturnTime = new DateTime(2000, 1, 1));
Policy.OnAvailable();
var bgcolor = Console.BackgroundColor;
var forecolor = Console.ForegroundColor;
Console.BackgroundColor = ConsoleColor.DarkGreen;
Console.ForegroundColor = ConsoleColor.White;
Console.Write($"【{Policy.Name}】已恢复工作");
Console.BackgroundColor = bgcolor;
Console.ForegroundColor = forecolor;
Console.WriteLine();
}
}
protected bool LiveCheckAvailable()
{
try
{
var conn = getFree(false);
if (conn == null) throw new Exception($"LiveCheckAvailable 无法获得资源,{this.Statistics}");
try
{
if (Policy.OnCheckAvailable(conn) == false) throw new Exception("LiveCheckAvailable 应抛出异常,代表仍然不可用。");
}
finally
{
Return(conn);
}
}
catch
{
return false;
}
RestoreToAvailable();
return true;
}
public string Statistics => $"Pool: {_freeObjects.Count}/{_allObjects.Count}, Get wait: {_getSyncQueue.Count}, GetAsync wait: {_getAsyncQueue.Count}";
public string StatisticsFullily
{
get
{
var sb = new StringBuilder();
sb.AppendLine(Statistics);
sb.AppendLine("");
foreach (var obj in _allObjects)
{
sb.AppendLine($"{obj.Value}, Times: {obj.GetTimes}, ThreadId(R/G): {obj.LastReturnThreadId}/{obj.LastGetThreadId}, Time(R/G): {obj.LastReturnTime.ToString("yyyy-MM-dd HH:mm:ss:ms")}/{obj.LastGetTime.ToString("yyyy-MM-dd HH:mm:ss:ms")}, ");
}
return sb.ToString();
}
}
/// <summary>
/// 创建对象池
/// </summary>
/// <param name="poolsize">池大小</param>
/// <param name="createObject">池内对象的创建委托</param>
/// <param name="onGetObject">获取池内对象成功后,进行使用前操作</param>
public ObjectPool(int poolsize, Func<T> createObject, Action<Object<T>> onGetObject = null) : this(new DefaultPolicy<T> { PoolSize = poolsize, CreateObject = createObject, OnGetObject = onGetObject })
{
}
/// <summary>
/// 创建对象池
/// </summary>
/// <param name="policy">策略</param>
public ObjectPool(IPolicy<T> policy)
{
Policy = policy;
AppDomain.CurrentDomain.ProcessExit += (s1, e1) =>
{
running = false;
};
try
{
Console.CancelKeyPress += (s1, e1) =>
{
if (e1.Cancel) return;
running = false;
};
}
catch { }
}
/// <summary>
/// 获取可用资源,或创建资源
/// </summary>
/// <returns></returns>
private Object<T> getFree(bool checkAvailable)
{
if (running == false)
throw new ObjectDisposedException($"【{Policy.Name}】对象池已释放,无法访问。");
if (checkAvailable && UnavailableException != null)
throw new Exception($"【{Policy.Name}】状态不可用,等待后台检查程序恢复方可使用。{UnavailableException?.Message}");
if ((_freeObjects.TryPop(out var obj) == false || obj == null) && _allObjects.Count < Policy.PoolSize)
{
lock (_allObjectsLock)
if (_allObjects.Count < Policy.PoolSize)
_allObjects.Add(obj = new Object<T> { Pool = this, Id = _allObjects.Count + 1 });
}
if (obj != null)
obj._isReturned = false;
if (obj != null && obj.Value == null ||
obj != null && Policy.IdleTimeout > TimeSpan.Zero && DateTime.Now.Subtract(obj.LastReturnTime) > Policy.IdleTimeout)
{
try
{
obj.ResetValue();
}
catch
{
Return(obj);
throw;
}
}
return obj;
}
public Object<T> Get(TimeSpan? timeout = null)
{
var obj = getFree(true);
if (obj == null)
{
var queueItem = new GetSyncQueueInfo();
_getSyncQueue.Enqueue(queueItem);
_getQueue.Enqueue(false);
if (timeout == null) timeout = Policy.SyncGetTimeout;
try
{
if (queueItem.Wait.Wait(timeout.Value))
obj = queueItem.ReturnValue;
}
catch { }
if (obj == null) obj = queueItem.ReturnValue;
if (obj == null) lock (queueItem.Lock) queueItem.IsTimeout = (obj = queueItem.ReturnValue) == null;
if (obj == null) obj = queueItem.ReturnValue;
if (obj == null)
{
Policy.OnGetTimeout();
if (Policy.IsThrowGetTimeoutException)
throw new TimeoutException($"SafeObjectPool.Get 获取超时({timeout.Value.TotalSeconds}秒)。");
return null;
}
}
try
{
Policy.OnGet(obj);
}
catch
{
Return(obj);
throw;
}
obj.LastGetThreadId = Thread.CurrentThread.ManagedThreadId;
obj.LastGetTime = DateTime.Now;
Interlocked.Increment(ref obj._getTimes);
return obj;
}
#if net40
#else
async public Task<Object<T>> GetAsync()
{
var obj = getFree(true);
if (obj == null)
{
if (Policy.AsyncGetCapacity > 0 && _getAsyncQueue.Count >= Policy.AsyncGetCapacity - 1)
throw new OutOfMemoryException($"SafeObjectPool.GetAsync 无可用资源且队列过长Policy.AsyncGetCapacity = {Policy.AsyncGetCapacity}。");
var tcs = new TaskCompletionSource<Object<T>>();
_getAsyncQueue.Enqueue(tcs);
_getQueue.Enqueue(true);
obj = await tcs.Task;
//if (timeout == null) timeout = Policy.SyncGetTimeout;
//if (tcs.Task.Wait(timeout.Value))
// obj = tcs.Task.Result;
//if (obj == null) {
// tcs.TrySetCanceled();
// Policy.GetTimeout();
// if (Policy.IsThrowGetTimeoutException)
// throw new Exception($"SafeObjectPool.GetAsync 获取超时({timeout.Value.TotalSeconds}秒)。");
// return null;
//}
}
try
{
await Policy.OnGetAsync(obj);
}
catch
{
Return(obj);
throw;
}
obj.LastGetThreadId = Thread.CurrentThread.ManagedThreadId;
obj.LastGetTime = DateTime.Now;
Interlocked.Increment(ref obj._getTimes);
return obj;
}
#endif
public void Return(Object<T> obj, bool isReset = false)
{
if (obj == null) return;
if (obj._isReturned) return;
if (running == false)
{
Policy.OnDestroy(obj.Value);
try { (obj.Value as IDisposable)?.Dispose(); } catch { }
return;
}
if (isReset) obj.ResetValue();
bool isReturn = false;
while (isReturn == false && _getQueue.TryDequeue(out var isAsync))
{
if (isAsync == false)
{
if (_getSyncQueue.TryDequeue(out var queueItem) && queueItem != null)
{
lock (queueItem.Lock)
if (queueItem.IsTimeout == false)
queueItem.ReturnValue = obj;
if (queueItem.ReturnValue != null)
{
obj.LastReturnThreadId = Thread.CurrentThread.ManagedThreadId;
obj.LastReturnTime = DateTime.Now;
try
{
queueItem.Wait.Set();
isReturn = true;
}
catch
{
}
}
try { queueItem.Dispose(); } catch { }
}
}
else
{
if (_getAsyncQueue.TryDequeue(out var tcs) && tcs != null && tcs.Task.IsCanceled == false)
{
obj.LastReturnThreadId = Thread.CurrentThread.ManagedThreadId;
obj.LastReturnTime = DateTime.Now;
try { isReturn = tcs.TrySetResult(obj); } catch { }
}
}
}
//无排队,直接归还
if (isReturn == false)
{
try
{
Policy.OnReturn(obj);
}
catch
{
throw;
}
finally
{
obj.LastReturnThreadId = Thread.CurrentThread.ManagedThreadId;
obj.LastReturnTime = DateTime.Now;
obj._isReturned = true;
_freeObjects.Push(obj);
}
}
}
public void Dispose()
{
running = false;
while (_freeObjects.TryPop(out var fo)) ;
while (_getSyncQueue.TryDequeue(out var sync))
{
try { sync.Wait.Set(); } catch { }
}
while (_getAsyncQueue.TryDequeue(out var async))
async.TrySetCanceled();
while (_getQueue.TryDequeue(out var qs)) ;
for (var a = 0; a < _allObjects.Count; a++)
{
Policy.OnDestroy(_allObjects[a].Value);
try { (_allObjects[a].Value as IDisposable)?.Dispose(); } catch { }
}
_allObjects.Clear();
}
class GetSyncQueueInfo : IDisposable
{
internal ManualResetEventSlim Wait { get; set; } = new ManualResetEventSlim();
internal Object<T> ReturnValue { get; set; }
internal object Lock = new object();
internal bool IsTimeout { get; set; } = false;
public void Dispose()
{
try
{
if (Wait != null)
Wait.Dispose();
}
catch
{
}
}
}
}
}