pgsql/mysql/sqlserver适配

This commit is contained in:
28810
2018-12-18 20:09:52 +08:00
commit 9b5e34032c
130 changed files with 12283 additions and 0 deletions

View File

@ -0,0 +1,267 @@
using Microsoft.Extensions.Logging;
using SafeObjectPool;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
namespace FreeSql.Internal.CommonProvider {
abstract partial class AdoProvider : IAdo {
protected abstract void ReturnConnection(ObjectPool<DbConnection> pool, Object<DbConnection> conn, Exception ex);
protected abstract DbCommand CreateCommand();
protected abstract DbParameter[] GetDbParamtersByObject(string sql, object obj);
public bool IsTracePerformance { get; set; } = string.Compare(Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT"), "Development", true) == 0;
public ObjectPool<DbConnection> MasterPool { get; protected set; }
public List<ObjectPool<DbConnection>> SlavePools { get; } = new List<ObjectPool<DbConnection>>();
protected ICache _cache { get; set; }
protected ILogger _log { get; set; }
protected int slaveUnavailables = 0;
private object slaveLock = new object();
private Random slaveRandom = new Random();
public AdoProvider(ICache cache, ILogger log) {
this._cache = cache;
this._log = log;
}
void LoggerException(ObjectPool<DbConnection> pool, DbCommand cmd, Exception e, DateTime dt, string logtxt, bool isThrowException = true) {
if (IsTracePerformance) {
TimeSpan ts = DateTime.Now.Subtract(dt);
if (e == null && ts.TotalMilliseconds > 100)
_log.LogWarning($"{pool.Policy.Name}执行SQL语句耗时过长{ts.TotalMilliseconds}ms\r\n{cmd.CommandText}\r\n{logtxt}");
}
if (e == null) return;
string log = $"{pool.Policy.Name}数据库出错执行SQL〓〓〓〓〓〓〓〓〓〓〓〓〓〓〓\r\n{cmd.CommandText}\r\n";
foreach (DbParameter parm in cmd.Parameters)
log += parm.ParameterName.PadRight(20, ' ') + " = " + (parm.Value ?? "NULL") + "\r\n";
log += e.Message;
_log.LogError(log);
RollbackTransaction();
cmd.Parameters.Clear();
if (isThrowException) throw e;
}
public List<T> Query<T>(string sql, object parms = null) => Query<T>(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
public List<T> Query<T>(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
var names = new Dictionary<string, int>(StringComparer.CurrentCultureIgnoreCase);
var ds = new List<object[]>();
ExecuteReader(dr => {
if (names.Any() == false)
for (var a = 0; a < dr.FieldCount; a++) names.Add(dr.GetName(a), a);
object[] values = new object[dr.FieldCount];
dr.GetValues(values);
ds.Add(values);
}, cmdType, cmdText, cmdParms);
var ret = new List<T>();
foreach (var row in ds) {
var read = Utils.ExecuteArrayRowReadClassOrTuple(typeof(T), names, row);
ret.Add(read.value == null ? default(T) : (T) read.value);
}
return ret;
}
public void ExecuteReader(Action<DbDataReader> readerHander, string sql, object parms = null) => ExecuteReader(readerHander, CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
public void ExecuteReader(Action<DbDataReader> readerHander, CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
DateTime dt = DateTime.Now;
string logtxt = "";
DateTime logtxt_dt = DateTime.Now;
var pool = this.MasterPool;
bool isSlave = false;
//读写分离规则
if (this.SlavePools.Any() && cmdText.StartsWith("SELECT ", StringComparison.CurrentCultureIgnoreCase)) {
var availables = slaveUnavailables == 0 ?
//查从库
this.SlavePools : (
//查主库
slaveUnavailables == this.SlavePools.Count ? new List<ObjectPool<DbConnection>>() :
//查从库可用
this.SlavePools.Where(sp => sp.IsAvailable).ToList());
if (availables.Any()) {
isSlave = true;
pool = availables.Count == 1 ? availables[0] : availables[slaveRandom.Next(availables.Count)];
}
}
Object<DbConnection> conn = null;
var pc = PrepareCommand(cmdType, cmdText, cmdParms, ref logtxt);
if (IsTracePerformance) logtxt += $"PrepareCommand: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
Exception ex = null;
try {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
if (isSlave) {
//从库查询切换,恢复
bool isSlaveFail = false;
try {
if (pc.cmd.Connection == null) pc.cmd.Connection = (conn = pool.Get()).Value;
//if (slaveRandom.Next(100) % 2 == 0) throw new Exception("测试从库抛出异常");
} catch {
isSlaveFail = true;
}
if (isSlaveFail) {
if (conn != null) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
ReturnConnection(pool, conn, ex); //pool.Return(conn, ex);
if (IsTracePerformance) logtxt += $"ReleaseConnection: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms";
}
LoggerException(pool, pc.cmd, new Exception($"连接失败,准备切换其他可用服务器"), dt, logtxt, false);
pc.cmd.Parameters.Clear();
ExecuteReader(readerHander, cmdType, cmdText, cmdParms);
return;
}
} else {
//主库查询
if (pc.cmd.Connection == null) pc.cmd.Connection = (conn = pool.Get()).Value;
}
if (IsTracePerformance) {
logtxt += $"Open: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
logtxt_dt = DateTime.Now;
}
using (var dr = pc.cmd.ExecuteReader()) {
if (IsTracePerformance) logtxt += $"ExecuteReader: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
while (true) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
bool isread = dr.Read();
if (IsTracePerformance) logtxt += $" dr.Read: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
if (isread == false) break;
if (readerHander != null) {
object[] values = null;
if (IsTracePerformance) {
logtxt_dt = DateTime.Now;
values = new object[dr.FieldCount];
dr.GetValues(values);
logtxt += $" dr.GetValues: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
logtxt_dt = DateTime.Now;
}
readerHander(dr);
if (IsTracePerformance) logtxt += $" readerHander: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms ({string.Join(",", values)})\r\n";
}
}
if (IsTracePerformance) logtxt_dt = DateTime.Now;
dr.Close();
}
if (IsTracePerformance) logtxt += $"ExecuteReader_dispose: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
} catch (Exception ex2) {
ex = ex2;
}
if (conn != null) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
ReturnConnection(pool, conn, ex); //pool.Return(conn, ex);
if (IsTracePerformance) logtxt += $"ReleaseConnection: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms";
}
LoggerException(pool, pc.cmd, ex, dt, logtxt);
pc.cmd.Parameters.Clear();
}
public object[][] ExecuteArray(string sql, object parms = null) => ExecuteArray(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
public object[][] ExecuteArray(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
List<object[]> ret = new List<object[]>();
ExecuteReader(dr => {
object[] values = new object[dr.FieldCount];
dr.GetValues(values);
ret.Add(values);
}, cmdType, cmdText, cmdParms);
return ret.ToArray();
}
public DataTable ExecuteDataTable(string sql, object parms = null) => ExecuteDataTable(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
public DataTable ExecuteDataTable(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
var ret = new DataTable();
ExecuteReader(dr => {
if (ret.Columns.Count == 0)
for (var a = 0; a < dr.FieldCount; a++) ret.Columns.Add(dr.GetName(a));
object[] values = new object[ret.Columns.Count];
dr.GetValues(values);
ret.Rows.Add(values);
}, cmdType, cmdText, cmdParms);
return ret;
}
public int ExecuteNonQuery(string sql, object parms = null) => ExecuteNonQuery(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
public int ExecuteNonQuery(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
DateTime dt = DateTime.Now;
string logtxt = "";
DateTime logtxt_dt = DateTime.Now;
Object<DbConnection> conn = null;
var pc = PrepareCommand(cmdType, cmdText, cmdParms, ref logtxt);
int val = 0;
Exception ex = null;
try {
if (pc.cmd.Connection == null) pc.cmd.Connection = (conn = this.MasterPool.Get()).Value;
val = pc.cmd.ExecuteNonQuery();
} catch (Exception ex2) {
ex = ex2;
}
if (conn != null) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
ReturnConnection(MasterPool, conn, ex); //this.MasterPool.Return(conn, ex);
if (IsTracePerformance) logtxt += $"ReleaseConnection: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms";
}
LoggerException(this.MasterPool, pc.cmd, ex, dt, logtxt);
pc.cmd.Parameters.Clear();
return val;
}
public object ExecuteScalar(string sql, object parms = null) => ExecuteScalar(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
public object ExecuteScalar(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
DateTime dt = DateTime.Now;
string logtxt = "";
DateTime logtxt_dt = DateTime.Now;
Object<DbConnection> conn = null;
var pc = PrepareCommand(cmdType, cmdText, cmdParms, ref logtxt);
object val = null;
Exception ex = null;
try {
if (pc.cmd.Connection == null) pc.cmd.Connection = (conn = this.MasterPool.Get()).Value;
val = pc.cmd.ExecuteScalar();
} catch (Exception ex2) {
ex = ex2;
}
if (conn != null) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
ReturnConnection(MasterPool, conn, ex); //this.MasterPool.Return(conn, ex);
if (IsTracePerformance) logtxt += $"ReleaseConnection: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms";
}
LoggerException(this.MasterPool, pc.cmd, ex, dt, logtxt);
pc.cmd.Parameters.Clear();
return val;
}
private (DbTransaction tran, DbCommand cmd) PrepareCommand(CommandType cmdType, string cmdText, DbParameter[] cmdParms, ref string logtxt) {
var dt = DateTime.Now;
DbCommand cmd = CreateCommand();
cmd.CommandType = cmdType;
cmd.CommandText = cmdText;
if (cmdParms != null) {
foreach (var parm in cmdParms) {
if (parm == null) continue;
if (parm.Value == null) parm.Value = DBNull.Value;
cmd.Parameters.Add(parm);
}
}
var tran = TransactionCurrentThread;
if (IsTracePerformance) logtxt += $" PrepareCommand_part1: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms cmdParms: {cmdParms.Length}\r\n";
if (tran != null) {
if (IsTracePerformance) dt = DateTime.Now;
cmd.Connection = tran.Connection;
cmd.Transaction = tran;
if (IsTracePerformance) logtxt += $" PrepareCommand_tran!=null: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
}
if (IsTracePerformance) dt = DateTime.Now;
AutoCommitTransaction();
if (IsTracePerformance) logtxt += $" AutoCommitTransaction: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
return (tran, cmd);
}
}
}

View File

@ -0,0 +1,215 @@
using SafeObjectPool;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;
namespace FreeSql.Internal.CommonProvider {
partial class AdoProvider {
public Task<List<T>> QueryAsync<T>(string sql, object parms = null) => QueryAsync<T>(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
async public Task<List<T>> QueryAsync<T>(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
var names = new Dictionary<string, int>(StringComparer.CurrentCultureIgnoreCase);
var ds = new List<object[]>();
await ExecuteReaderAsync(async dr => {
if (names.Any() == false)
for (var a = 0; a < dr.FieldCount; a++) names.Add(dr.GetName(a), a);
object[] values = new object[dr.FieldCount];
for (int a = 0; a < values.Length; a++) if (!await dr.IsDBNullAsync(a)) values[a] = await dr.GetFieldValueAsync<object>(a);
ds.Add(values);
}, cmdType, cmdText, cmdParms);
var ret = new List<T>();
foreach (var row in ds) {
var read = Utils.ExecuteArrayRowReadClassOrTuple(typeof(T), names, row);
ret.Add(read.value == null ? default(T) : (T) read.value);
}
return ret;
}
public Task ExecuteReaderAsync(Func<DbDataReader, Task> readerHander, string sql, object parms = null) => ExecuteReaderAsync(readerHander, CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
async public Task ExecuteReaderAsync(Func<DbDataReader, Task> readerHander, CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
DateTime dt = DateTime.Now;
string logtxt = "";
DateTime logtxt_dt = DateTime.Now;
var pool = this.MasterPool;
bool isSlave = false;
//读写分离规则
if (this.SlavePools.Any() && cmdText.StartsWith("SELECT ", StringComparison.CurrentCultureIgnoreCase)) {
var availables = slaveUnavailables == 0 ?
//查从库
this.SlavePools : (
//查主库
slaveUnavailables == this.SlavePools.Count ? new List<ObjectPool<DbConnection>>() :
//查从库可用
this.SlavePools.Where(sp => sp.IsAvailable).ToList());
if (availables.Any()) {
isSlave = true;
pool = availables.Count == 1 ? this.SlavePools[0] : availables[slaveRandom.Next(availables.Count)];
}
}
Object<DbConnection> conn = null;
var cmd = PrepareCommandAsync(cmdType, cmdText, cmdParms, ref logtxt);
if (IsTracePerformance) logtxt += $"PrepareCommand: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
Exception ex = null;
try {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
if (isSlave) {
//从库查询切换,恢复
bool isSlaveFail = false;
try {
if (cmd.Connection == null) cmd.Connection = (conn = await pool.GetAsync()).Value;
//if (slaveRandom.Next(100) % 2 == 0) throw new Exception("测试从库抛出异常");
} catch {
isSlaveFail = true;
}
if (isSlaveFail) {
if (conn != null) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
ReturnConnection(pool, conn, ex); //pool.Return(conn, ex);
if (IsTracePerformance) logtxt += $"ReleaseConnection: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms";
}
LoggerException(pool, cmd, new Exception($"连接失败,准备切换其他可用服务器"), dt, logtxt, false);
cmd.Parameters.Clear();
await ExecuteReaderAsync(readerHander, cmdType, cmdText, cmdParms);
return;
}
} else {
//主库查询
if (cmd.Connection == null) cmd.Connection = (conn = await pool.GetAsync()).Value;
}
if (IsTracePerformance) {
logtxt += $"Open: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
logtxt_dt = DateTime.Now;
}
using (var dr = await cmd.ExecuteReaderAsync()) {
if (IsTracePerformance) logtxt += $"ExecuteReader: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
while (true) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
bool isread = await dr.ReadAsync();
if (IsTracePerformance) logtxt += $" dr.Read: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
if (isread == false) break;
if (readerHander != null) {
object[] values = null;
if (IsTracePerformance) {
logtxt_dt = DateTime.Now;
values = new object[dr.FieldCount];
for (int a = 0; a < values.Length; a++) if (!await dr.IsDBNullAsync(a)) values[a] = await dr.GetFieldValueAsync<object>(a);
logtxt += $" dr.GetValues: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
logtxt_dt = DateTime.Now;
}
await readerHander(dr);
if (IsTracePerformance) logtxt += $" readerHander: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms ({string.Join(",", values)})\r\n";
}
}
if (IsTracePerformance) logtxt_dt = DateTime.Now;
dr.Close();
}
if (IsTracePerformance) logtxt += $"ExecuteReader_dispose: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
} catch (Exception ex2) {
ex = ex2;
}
if (conn != null) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
ReturnConnection(pool, conn, ex); //pool.Return(conn, ex);
if (IsTracePerformance) logtxt += $"ReleaseConnection: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms";
}
LoggerException(pool, cmd, ex, dt, logtxt);
cmd.Parameters.Clear();
}
public Task ExecuteArrayAsync(string sql, object parms = null) => ExecuteArrayAsync(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
async public Task<object[][]> ExecuteArrayAsync(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
List<object[]> ret = new List<object[]>();
await ExecuteReaderAsync(async dr => {
object[] values = new object[dr.FieldCount];
for (int a = 0; a < values.Length; a++) if (!await dr.IsDBNullAsync(a)) values[a] = await dr.GetFieldValueAsync<object>(a);
ret.Add(values);
}, cmdType, cmdText, cmdParms);
return ret.ToArray();
}
public Task<DataTable> ExecuteDataTableAsync(string sql, object parms = null) => ExecuteDataTableAsync(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
async public Task<DataTable> ExecuteDataTableAsync(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
var ret = new DataTable();
await ExecuteReaderAsync(async dr => {
if (ret.Columns.Count == 0)
for (var a = 0; a < dr.FieldCount; a++) ret.Columns.Add(dr.GetName(a));
object[] values = new object[ret.Columns.Count];
for (int a = 0; a < values.Length; a++) if (!await dr.IsDBNullAsync(a)) values[a] = await dr.GetFieldValueAsync<object>(a);
ret.Rows.Add(values);
}, cmdType, cmdText, cmdParms);
return ret;
}
public Task<int> ExecuteNonQueryAsync(string sql, object parms = null) => ExecuteNonQueryAsync(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
async public Task<int> ExecuteNonQueryAsync(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
DateTime dt = DateTime.Now;
string logtxt = "";
Object<DbConnection> conn = null;
var cmd = PrepareCommandAsync(cmdType, cmdText, cmdParms, ref logtxt);
var logtxt_dt = DateTime.Now;
int val = 0;
Exception ex = null;
try {
if (cmd.Connection == null) cmd.Connection = (conn = await this.MasterPool.GetAsync()).Value;
val = await cmd.ExecuteNonQueryAsync();
} catch (Exception ex2) {
ex = ex2;
}
if (conn != null) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
ReturnConnection(MasterPool, conn, ex); //this.MasterPool.Return(conn, ex);
if (IsTracePerformance) logtxt += $"ReleaseConnection: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms";
}
LoggerException(this.MasterPool, cmd, ex, dt, logtxt);
cmd.Parameters.Clear();
return val;
}
public Task<object> ExecuteScalarAsync(string sql, object parms = null) => ExecuteScalarAsync(CommandType.Text, sql, GetDbParamtersByObject(sql, parms));
async public Task<object> ExecuteScalarAsync(CommandType cmdType, string cmdText, params DbParameter[] cmdParms) {
var dt = DateTime.Now;
var logtxt = "";
Object<DbConnection> conn = null;
var cmd = PrepareCommandAsync(cmdType, cmdText, cmdParms, ref logtxt);
var logtxt_dt = DateTime.Now;
object val = null;
Exception ex = null;
try {
if (cmd.Connection == null) cmd.Connection = (conn = await this.MasterPool.GetAsync()).Value;
val = await cmd.ExecuteScalarAsync();
} catch (Exception ex2) {
ex = ex2;
}
if (conn != null) {
if (IsTracePerformance) logtxt_dt = DateTime.Now;
ReturnConnection(MasterPool, conn, ex); //this.MasterPool.Return(conn, ex);
if (IsTracePerformance) logtxt += $"ReleaseConnection: {DateTime.Now.Subtract(logtxt_dt).TotalMilliseconds}ms Total: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms";
}
LoggerException(this.MasterPool, cmd, ex, dt, logtxt);
cmd.Parameters.Clear();
return val;
}
private DbCommand PrepareCommandAsync(CommandType cmdType, string cmdText, DbParameter[] cmdParms, ref string logtxt) {
DateTime dt = DateTime.Now;
DbCommand cmd = CreateCommand();
cmd.CommandType = cmdType;
cmd.CommandText = cmdText;
if (cmdParms != null) {
foreach (var parm in cmdParms) {
if (parm == null) continue;
if (parm.Value == null) parm.Value = DBNull.Value;
cmd.Parameters.Add(parm);
}
}
if (IsTracePerformance) logtxt += $" PrepareCommand_tran==null: {DateTime.Now.Subtract(dt).TotalMilliseconds}ms\r\n";
return cmd;
}
}
}

View File

@ -0,0 +1,152 @@
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using SafeObjectPool;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Threading;
namespace FreeSql.Internal.CommonProvider {
partial class AdoProvider {
class Transaction2 {
internal Object<DbConnection> Conn;
internal DbTransaction Transaction;
internal DateTime RunTime;
internal TimeSpan Timeout;
public Transaction2(Object<DbConnection> conn, DbTransaction tran, TimeSpan timeout) {
Conn = conn;
Transaction = tran;
RunTime = DateTime.Now;
Timeout = timeout;
}
}
private Dictionary<int, Transaction2> _trans = new Dictionary<int, Transaction2>();
private object _trans_lock = new object();
public DbTransaction TransactionCurrentThread => _trans.TryGetValue(Thread.CurrentThread.ManagedThreadId, out var conn) && conn.Transaction?.Connection != null ? conn.Transaction : null;
private Dictionary<int, List<string>> _preRemoveKeys = new Dictionary<int, List<string>>();
private object _preRemoveKeys_lock = new object();
public string[] PreRemove(params string[] key) {
var tid = Thread.CurrentThread.ManagedThreadId;
List<string> keys = null;
if (key == null || key.Any() == false) return _preRemoveKeys.TryGetValue(tid, out keys) ? keys.ToArray() : new string[0];
_log.LogDebug($"线程{tid}事务预删除 {JsonConvert.SerializeObject(key)}");
if (_preRemoveKeys.TryGetValue(tid, out keys) == false)
lock (_preRemoveKeys_lock)
if (_preRemoveKeys.TryGetValue(tid, out keys) == false) {
_preRemoveKeys.Add(tid, keys = new List<string>(key));
return key;
}
keys.AddRange(key);
return keys.ToArray();
}
public void TransactionPreRemoveCache(params string[] key) => PreRemove(key);
/// <summary>
/// 启动事务
/// </summary>
public void BeginTransaction(TimeSpan timeout) {
int tid = Thread.CurrentThread.ManagedThreadId;
Transaction2 tran = null;
Object<DbConnection> conn = null;
try {
conn = MasterPool.Get();
tran = new Transaction2(conn, conn.Value.BeginTransaction(), timeout);
} catch(Exception ex) {
_log.LogError($"数据库出错(开启事务){ex.Message} \r\n{ex.StackTrace}");
throw ex;
}
if (_trans.ContainsKey(tid)) CommitTransaction();
lock (_trans_lock)
_trans.Add(tid, tran);
}
/// <summary>
/// 自动提交事务
/// </summary>
private void AutoCommitTransaction() {
if (_trans.Count > 0) {
Transaction2[] trans = null;
lock (_trans_lock)
trans = _trans.Values.Where(st2 => DateTime.Now.Subtract(st2.RunTime) > st2.Timeout).ToArray();
foreach (Transaction2 tran in trans) CommitTransaction(true, tran);
}
}
private void CommitTransaction(bool isCommit, Transaction2 tran) {
if (tran == null || tran.Transaction == null || tran.Transaction.Connection == null) return;
if (_trans.ContainsKey(tran.Conn.LastGetThreadId))
lock (_trans_lock)
if (_trans.ContainsKey(tran.Conn.LastGetThreadId))
_trans.Remove(tran.Conn.LastGetThreadId);
var removeKeys = PreRemove();
if (_preRemoveKeys.ContainsKey(tran.Conn.LastGetThreadId))
lock (_preRemoveKeys_lock)
if (_preRemoveKeys.ContainsKey(tran.Conn.LastGetThreadId))
_preRemoveKeys.Remove(tran.Conn.LastGetThreadId);
Exception ex = null;
var f001 = isCommit ? "提交" : "回滚";
try {
_log.LogDebug($"线程{tran.Conn.LastGetThreadId}事务{f001}批量删除缓存key {Newtonsoft.Json.JsonConvert.SerializeObject(removeKeys)}");
_cache.Remove(removeKeys);
if (isCommit) tran.Transaction.Commit();
else tran.Transaction.Rollback();
} catch (Exception ex2) {
ex = ex2;
_log.LogError($"数据库出错({f001}事务):{ex.Message} {ex.StackTrace}");
} finally {
ReturnConnection(MasterPool, tran.Conn, ex); //MasterPool.Return(tran.Conn, ex);
}
}
private void CommitTransaction(bool isCommit) {
if (_trans.TryGetValue(Thread.CurrentThread.ManagedThreadId, out var tran)) CommitTransaction(isCommit, tran);
}
/// <summary>
/// 提交事务
/// </summary>
public void CommitTransaction() => CommitTransaction(true);
/// <summary>
/// 回滚事务
/// </summary>
public void RollbackTransaction() => CommitTransaction(false);
public void Dispose() {
Transaction2[] trans = null;
lock (_trans_lock)
trans = _trans.Values.ToArray();
foreach (Transaction2 tran in trans) CommitTransaction(false, tran);
}
/// <summary>
/// 开启事务不支持异步60秒未执行完将自动提交
/// </summary>
/// <param name="handler">事务体 () => {}</param>
public void Transaction(Action handler) {
Transaction(handler, TimeSpan.FromSeconds(60));
}
/// <summary>
/// 开启事务(不支持异步)
/// </summary>
/// <param name="handler">事务体 () => {}</param>
/// <param name="timeout">超时,未执行完将自动提交</param>
public void Transaction(Action handler, TimeSpan timeout) {
try {
BeginTransaction(timeout);
handler();
CommitTransaction();
} catch (Exception ex) {
RollbackTransaction();
throw ex;
}
}
}
}

View File

@ -0,0 +1,19 @@
using System.Text.RegularExpressions;
namespace FreeSql.Internal.CommonProvider {
partial class AdoProvider {
public abstract object AddslashesProcessParam(object param);
public string Addslashes(string filter, params object[] parms) {
if (filter == null || parms == null) return string.Empty;
if (parms.Length == 0) return filter;
var nparms = new object[parms.Length];
for (int a = 0; a < parms.Length; a++) {
if (parms[a] == null)
filter = Regex.Replace(filter, @"\s*(=|IN)\s*\{" + a + @"\}", " IS {" + a + "}", RegexOptions.IgnoreCase);
nparms[a] = AddslashesProcessParam(parms[a]);
}
try { string ret = string.Format(filter, nparms); return ret; } catch { return filter; }
}
}
}