diff --git a/src/Ocelot/Raft/SqlLiteLog.cs b/src/Ocelot/Raft/SqlLiteLog.cs index 882df202..f4dfed49 100644 --- a/src/Ocelot/Raft/SqlLiteLog.cs +++ b/src/Ocelot/Raft/SqlLiteLog.cs @@ -1,321 +1,322 @@ -namespace Ocelot.Raft -{ - using System; - using System.Collections.Generic; - using System.IO; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Data.Sqlite; - using Microsoft.Extensions.Logging; - using Newtonsoft.Json; - using Rafty.Infrastructure; - using Rafty.Log; - - public class SqlLiteLog : ILog - { - private readonly string _path; - private readonly SemaphoreSlim _sempaphore = new SemaphoreSlim(1, 1); - private readonly ILogger _logger; - private readonly NodeId _nodeId; - - public SqlLiteLog(NodeId nodeId, ILoggerFactory loggerFactory) - { - _logger = loggerFactory.CreateLogger(); - _nodeId = nodeId; - _path = $"{nodeId.Id.Replace("/", "").Replace(":", "")}.db"; - _sempaphore.Wait(); - - if (!File.Exists(_path)) - { - var fs = File.Create(_path); - +namespace Ocelot.Raft +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Data.Sqlite; + using Microsoft.Extensions.Logging; + using Newtonsoft.Json; + using Rafty.Infrastructure; + using Rafty.Log; + + [ExcludeFromCoverage] + public class SqlLiteLog : ILog + { + private readonly string _path; + private readonly SemaphoreSlim _sempaphore = new SemaphoreSlim(1, 1); + private readonly ILogger _logger; + private readonly NodeId _nodeId; + + public SqlLiteLog(NodeId nodeId, ILoggerFactory loggerFactory) + { + _logger = loggerFactory.CreateLogger(); + _nodeId = nodeId; + _path = $"{nodeId.Id.Replace("/", "").Replace(":", "")}.db"; + _sempaphore.Wait(); + + if (!File.Exists(_path)) + { + var fs = File.Create(_path); + fs.Dispose(); - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - - const string sql = @"create table logs ( - id integer primary key, - data text not null - )"; - - using (var command = new SqliteCommand(sql, connection)) - { - var result = command.ExecuteNonQuery(); - - _logger.LogInformation(result == 0 - ? $"id: {_nodeId.Id} create database, result: {result}" - : $"id: {_nodeId.Id} did not create database., result: {result}"); - } - } - } - - _sempaphore.Release(); - } - - public async Task LastLogIndex() - { - _sempaphore.Wait(); - var result = 1; - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - var sql = @"select id from logs order by id desc limit 1"; - using (var command = new SqliteCommand(sql, connection)) - { - var index = Convert.ToInt32(await command.ExecuteScalarAsync()); - if (index > result) - { - result = index; - } - } - } - - _sempaphore.Release(); - return result; - } - - public async Task LastLogTerm() - { - _sempaphore.Wait(); - long result = 0; - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - var sql = @"select data from logs order by id desc limit 1"; - using (var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(await command.ExecuteScalarAsync()); + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + + const string sql = @"create table logs ( + id integer primary key, + data text not null + )"; + + using (var command = new SqliteCommand(sql, connection)) + { + var result = command.ExecuteNonQuery(); + + _logger.LogInformation(result == 0 + ? $"id: {_nodeId.Id} create database, result: {result}" + : $"id: {_nodeId.Id} did not create database., result: {result}"); + } + } + } + + _sempaphore.Release(); + } + + public async Task LastLogIndex() + { + _sempaphore.Wait(); + var result = 1; + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + var sql = @"select id from logs order by id desc limit 1"; + using (var command = new SqliteCommand(sql, connection)) + { + var index = Convert.ToInt32(await command.ExecuteScalarAsync()); + if (index > result) + { + result = index; + } + } + } + + _sempaphore.Release(); + return result; + } + + public async Task LastLogTerm() + { + _sempaphore.Wait(); + long result = 0; + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + var sql = @"select data from logs order by id desc limit 1"; + using (var command = new SqliteCommand(sql, connection)) + { + var data = Convert.ToString(await command.ExecuteScalarAsync()); var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if (log != null && log.Term > result) - { - result = log.Term; - } - } - } - _sempaphore.Release(); - return result; - } - + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if (log != null && log.Term > result) + { + result = log.Term; + } + } + } + _sempaphore.Release(); + return result; + } + public async Task Count() - { - _sempaphore.Wait(); - var result = 0; - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - var sql = @"select count(id) from logs"; - using (var command = new SqliteCommand(sql, connection)) - { - var index = Convert.ToInt32(await command.ExecuteScalarAsync()); - if (index > result) - { - result = index; - } - } - } - _sempaphore.Release(); - return result; - } - - public async Task Apply(LogEntry log) - { - _sempaphore.Wait(); - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); + { + _sempaphore.Wait(); + var result = 0; + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + var sql = @"select count(id) from logs"; + using (var command = new SqliteCommand(sql, connection)) + { + var index = Convert.ToInt32(await command.ExecuteScalarAsync()); + if (index > result) + { + result = index; + } + } + } + _sempaphore.Release(); + return result; + } + + public async Task Apply(LogEntry log) + { + _sempaphore.Wait(); + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var data = JsonConvert.SerializeObject(log, jsonSerializerSettings); - //todo - sql injection dont copy this.. - var sql = $"insert into logs (data) values ('{data}')"; - _logger.LogInformation($"id: {_nodeId.Id}, sql: {sql}"); - using (var command = new SqliteCommand(sql, connection)) - { - var result = await command.ExecuteNonQueryAsync(); - _logger.LogInformation($"id: {_nodeId.Id}, insert log result: {result}"); - } - - sql = "select last_insert_rowid()"; - using (var command = new SqliteCommand(sql, connection)) - { - var result = await command.ExecuteScalarAsync(); - _logger.LogInformation($"id: {_nodeId.Id}, about to release semaphore"); - _sempaphore.Release(); - _logger.LogInformation($"id: {_nodeId.Id}, saved log to sqlite"); - return Convert.ToInt32(result); + TypeNameHandling = TypeNameHandling.All + }; + var data = JsonConvert.SerializeObject(log, jsonSerializerSettings); + //todo - sql injection dont copy this.. + var sql = $"insert into logs (data) values ('{data}')"; + _logger.LogInformation($"id: {_nodeId.Id}, sql: {sql}"); + using (var command = new SqliteCommand(sql, connection)) + { + var result = await command.ExecuteNonQueryAsync(); + _logger.LogInformation($"id: {_nodeId.Id}, insert log result: {result}"); } - } - } - - public async Task DeleteConflictsFromThisLog(int index, LogEntry logEntry) - { - _sempaphore.Wait(); - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index};"; - _logger.LogInformation($"id: {_nodeId.Id} sql: {sql}"); - using (var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(await command.ExecuteScalarAsync()); + + sql = "select last_insert_rowid()"; + using (var command = new SqliteCommand(sql, connection)) + { + var result = await command.ExecuteScalarAsync(); + _logger.LogInformation($"id: {_nodeId.Id}, about to release semaphore"); + _sempaphore.Release(); + _logger.LogInformation($"id: {_nodeId.Id}, saved log to sqlite"); + return Convert.ToInt32(result); + } + } + } + + public async Task DeleteConflictsFromThisLog(int index, LogEntry logEntry) + { + _sempaphore.Wait(); + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index};"; + _logger.LogInformation($"id: {_nodeId.Id} sql: {sql}"); + using (var command = new SqliteCommand(sql, connection)) + { + var data = Convert.ToString(await command.ExecuteScalarAsync()); var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - - _logger.LogInformation($"id {_nodeId.Id} got log for index: {index}, data is {data} and new log term is {logEntry.Term}"); - - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if (logEntry != null && log != null && logEntry.Term != log.Term) - { - //todo - sql injection dont copy this.. - var deleteSql = $"delete from logs where id >= {index};"; - _logger.LogInformation($"id: {_nodeId.Id} sql: {deleteSql}"); - using (var deleteCommand = new SqliteCommand(deleteSql, connection)) - { - var result = await deleteCommand.ExecuteNonQueryAsync(); - } - } - } - } - _sempaphore.Release(); - } - - public async Task IsDuplicate(int index, LogEntry logEntry) - { - _sempaphore.Wait(); - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index};"; - using (var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(await command.ExecuteScalarAsync()); + TypeNameHandling = TypeNameHandling.All + }; + + _logger.LogInformation($"id {_nodeId.Id} got log for index: {index}, data is {data} and new log term is {logEntry.Term}"); + + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if (logEntry != null && log != null && logEntry.Term != log.Term) + { + //todo - sql injection dont copy this.. + var deleteSql = $"delete from logs where id >= {index};"; + _logger.LogInformation($"id: {_nodeId.Id} sql: {deleteSql}"); + using (var deleteCommand = new SqliteCommand(deleteSql, connection)) + { + var result = await deleteCommand.ExecuteNonQueryAsync(); + } + } + } + } + _sempaphore.Release(); + } + + public async Task IsDuplicate(int index, LogEntry logEntry) + { + _sempaphore.Wait(); + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index};"; + using (var command = new SqliteCommand(sql, connection)) + { + var data = Convert.ToString(await command.ExecuteScalarAsync()); var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - - if (logEntry != null && log != null && logEntry.Term == log.Term) - { - _sempaphore.Release(); - return true; - } - } - } - - _sempaphore.Release(); - return false; - } - - public async Task Get(int index) - { - _sempaphore.Wait(); - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index}"; - using (var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(await command.ExecuteScalarAsync()); + TypeNameHandling = TypeNameHandling.All + }; + + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + + if (logEntry != null && log != null && logEntry.Term == log.Term) + { + _sempaphore.Release(); + return true; + } + } + } + + _sempaphore.Release(); + return false; + } + + public async Task Get(int index) + { + _sempaphore.Wait(); + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index}"; + using (var command = new SqliteCommand(sql, connection)) + { + var data = Convert.ToString(await command.ExecuteScalarAsync()); var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - _sempaphore.Release(); - return log; - } - } - } - - public async Task> GetFrom(int index) - { - _sempaphore.Wait(); - var logsToReturn = new List<(int, LogEntry)>(); - - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select id, data from logs where id >= {index}"; - using (var command = new SqliteCommand(sql, connection)) - { - using (var reader = await command.ExecuteReaderAsync()) - { - while (reader.Read()) - { - var id = Convert.ToInt32(reader[0]); - var data = (string)reader[1]; + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + _sempaphore.Release(); + return log; + } + } + } + + public async Task> GetFrom(int index) + { + _sempaphore.Wait(); + var logsToReturn = new List<(int, LogEntry)>(); + + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select id, data from logs where id >= {index}"; + using (var command = new SqliteCommand(sql, connection)) + { + using (var reader = await command.ExecuteReaderAsync()) + { + while (reader.Read()) + { + var id = Convert.ToInt32(reader[0]); + var data = (string)reader[1]; var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - logsToReturn.Add((id, log)); - - } - } + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + logsToReturn.Add((id, log)); + + } + } } _sempaphore.Release(); - return logsToReturn; - } - } - - public async Task GetTermAtIndex(int index) - { - _sempaphore.Wait(); - long result = 0; - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index}"; - using (var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(await command.ExecuteScalarAsync()); + return logsToReturn; + } + } + + public async Task GetTermAtIndex(int index) + { + _sempaphore.Wait(); + long result = 0; + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index}"; + using (var command = new SqliteCommand(sql, connection)) + { + var data = Convert.ToString(await command.ExecuteScalarAsync()); var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if (log != null && log.Term > result) - { - result = log.Term; - } - } - } - _sempaphore.Release(); - return result; - } - public async Task Remove(int indexOfCommand) - { - _sempaphore.Wait(); - using (var connection = new SqliteConnection($"Data Source={_path};")) - { - connection.Open(); - //todo - sql injection dont copy this.. - var deleteSql = $"delete from logs where id >= {indexOfCommand};"; - _logger.LogInformation($"id: {_nodeId.Id} Remove {deleteSql}"); - using (var deleteCommand = new SqliteCommand(deleteSql, connection)) - { - var result = await deleteCommand.ExecuteNonQueryAsync(); - } - } - _sempaphore.Release(); - } - } + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if (log != null && log.Term > result) + { + result = log.Term; + } + } + } + _sempaphore.Release(); + return result; + } + public async Task Remove(int indexOfCommand) + { + _sempaphore.Wait(); + using (var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + //todo - sql injection dont copy this.. + var deleteSql = $"delete from logs where id >= {indexOfCommand};"; + _logger.LogInformation($"id: {_nodeId.Id} Remove {deleteSql}"); + using (var deleteCommand = new SqliteCommand(deleteSql, connection)) + { + var result = await deleteCommand.ExecuteNonQueryAsync(); + } + } + _sempaphore.Release(); + } + } }