Feature/fix unstable int tests (#376)

* updated packages but build wont work

* #245 implementing more stable rafty

* #245 OK so these raft integration tests are passing everytime on my local mac now...lets see about the build servergit log

* #245 added donation button

* #245 removed file we dont need
This commit is contained in:
Tom Pallister
2018-05-31 22:08:50 +01:00
committed by GitHub
parent e55b27de0f
commit 7cd3ff2ff7
16 changed files with 567 additions and 484 deletions

View File

@ -5,10 +5,10 @@ using Microsoft.AspNetCore.Mvc;
using Ocelot.Configuration.File;
using Ocelot.Configuration.Setter;
using Ocelot.Raft;
using Rafty.Concensus;
namespace Ocelot.Configuration
{
using Rafty.Concensus.Node;
using Repository;
[Authorize]
@ -50,7 +50,7 @@ namespace Ocelot.Configuration
{
var node = (INode)test;
var result = await node.Accept(new UpdateFileConfiguration(fileConfiguration));
if (result.GetType() == typeof(Rafty.Concensus.ErrorResponse<UpdateFileConfiguration>))
if (result.GetType() == typeof(Rafty.Infrastructure.ErrorResponse<UpdateFileConfiguration>))
{
return new BadRequestObjectResult("There was a problem. This error message sucks raise an issue in GitHub.");
}

View File

@ -8,6 +8,8 @@ using Rafty.Log;
namespace Ocelot.DependencyInjection
{
using Rafty.Concensus.Node;
public class OcelotAdministrationBuilder : IOcelotAdministrationBuilder
{
private readonly IServiceCollection _services;
@ -21,7 +23,7 @@ namespace Ocelot.DependencyInjection
public IOcelotAdministrationBuilder AddRafty()
{
var settings = new InMemorySettings(4000, 5000, 100, 5000);
var settings = new InMemorySettings(4000, 6000, 100, 10000);
_services.AddSingleton<ILog, SqlLiteLog>();
_services.AddSingleton<IFiniteStateMachine, OcelotFiniteStateMachine>();
_services.AddSingleton<ISettings>(settings);

View File

@ -18,6 +18,7 @@
using Rafty.Infrastructure;
using Ocelot.Middleware.Pipeline;
using Pivotal.Discovery.Client;
using Rafty.Concensus.Node;
public static class OcelotMiddlewareExtensions
{
@ -91,7 +92,7 @@
applicationLifetime.ApplicationStopping.Register(() => OnShutdown(builder));
var node = (INode)builder.ApplicationServices.GetService(typeof(INode));
var nodeId = (NodeId)builder.ApplicationServices.GetService(typeof(NodeId));
node.Start(nodeId.Id);
node.Start(nodeId);
}
private static async Task<IInternalConfiguration> CreateConfiguration(IApplicationBuilder builder)

View File

@ -25,37 +25,37 @@
<DebugSymbols>True</DebugSymbols>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Butterfly.Client" Version="0.0.8" />
<PackageReference Include="Butterfly.Client" Version="0.0.8"/>
<PackageReference Include="Butterfly.Client.AspNetCore" Version="0.0.8">
<NoWarn>NU1701</NoWarn>
</PackageReference>
<PackageReference Include="FluentValidation" Version="7.5.2" />
<PackageReference Include="IdentityServer4.AccessTokenValidation" Version="2.6.0" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.0.3" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.0.4" />
<PackageReference Include="Microsoft.AspNetCore.MiddlewareAnalysis" Version="2.0.3" />
<PackageReference Include="Microsoft.Data.SQLite" Version="2.0.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.2" />
<PackageReference Include="FluentValidation" Version="7.5.2"/>
<PackageReference Include="IdentityServer4.AccessTokenValidation" Version="2.6.0"/>
<PackageReference Include="Microsoft.AspNetCore" Version="2.0.3"/>
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.0.4"/>
<PackageReference Include="Microsoft.AspNetCore.MiddlewareAnalysis" Version="2.0.3"/>
<PackageReference Include="Microsoft.Data.SQLite" Version="2.0.1"/>
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.2"/>
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.2"/>
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.2"/>
<PackageReference Include="Microsoft.Extensions.DiagnosticAdapter" Version="2.0.1">
<NoWarn>NU1701</NoWarn>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.0.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.0.2" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.0.2" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.2"/>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.0.2"/>
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.0.2"/>
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.0.2"/>
<PackageReference Include="StyleCop.Analyzers" Version="1.0.2">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Text.RegularExpressions" Version="4.3.0" />
<PackageReference Include="CacheManager.Core" Version="1.1.2" />
<PackageReference Include="CacheManager.Microsoft.Extensions.Configuration" Version="1.1.2" />
<PackageReference Include="CacheManager.Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Consul" Version="0.7.2.4" />
<PackageReference Include="Polly" Version="6.0.1" />
<PackageReference Include="IdentityServer4" Version="2.2.0" />
<PackageReference Include="Rafty" Version="0.4.3" />
<PackageReference Include="Pivotal.Discovery.ClientCore" Version="2.0.1" />
<PackageReference Include="System.Text.RegularExpressions" Version="4.3.0"/>
<PackageReference Include="CacheManager.Core" Version="1.1.2"/>
<PackageReference Include="CacheManager.Microsoft.Extensions.Configuration" Version="1.1.2"/>
<PackageReference Include="CacheManager.Microsoft.Extensions.Logging" Version="1.1.2"/>
<PackageReference Include="Consul" Version="0.7.2.4"/>
<PackageReference Include="Polly" Version="6.0.1"/>
<PackageReference Include="IdentityServer4" Version="2.2.0"/>
<PackageReference Include="Pivotal.Discovery.ClientCore" Version="2.0.1"/>
<PackageReference Include="Rafty" Version="0.4.4"/>
</ItemGroup>
</Project>

View File

@ -1,36 +0,0 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Rafty.FiniteStateMachine;
using Rafty.Infrastructure;
using Rafty.Log;
namespace Ocelot.Raft
{
[ExcludeFromCoverage]
public class FileFsm : IFiniteStateMachine
{
private string _id;
public FileFsm(NodeId nodeId)
{
_id = nodeId.Id.Replace("/","").Replace(":","");
}
public Task Handle(LogEntry log)
{
try
{
var json = JsonConvert.SerializeObject(log.CommandData);
File.AppendAllText(_id, json);
}
catch(Exception exception)
{
Console.WriteLine(exception);
}
return Task.CompletedTask;
}
}
}

View File

@ -9,6 +9,8 @@ using Rafty.Infrastructure;
namespace Ocelot.Raft
{
using Rafty.Concensus.Peers;
[ExcludeFromCoverage]
public class FilePeersProvider : IPeersProvider
{

View File

@ -10,6 +10,10 @@ using Rafty.FiniteStateMachine;
namespace Ocelot.Raft
{
using Rafty.Concensus.Messages;
using Rafty.Concensus.Peers;
using Rafty.Infrastructure;
[ExcludeFromCoverage]
public class HttpPeer : IPeer
{

View File

@ -8,7 +8,7 @@ namespace Ocelot.Raft
[ExcludeFromCoverage]
public class OcelotFiniteStateMachine : IFiniteStateMachine
{
private IFileConfigurationSetter _setter;
private readonly IFileConfigurationSetter _setter;
public OcelotFiniteStateMachine(IFileConfigurationSetter setter)
{

View File

@ -14,6 +14,9 @@ using Rafty.FiniteStateMachine;
namespace Ocelot.Raft
{
using Rafty.Concensus.Messages;
using Rafty.Concensus.Node;
[ExcludeFromCoverage]
[Authorize]
[Route("raft")]

View File

@ -1,286 +1,321 @@
using System.IO;
using Rafty.Log;
using Microsoft.Data.Sqlite;
using Newtonsoft.Json;
using System;
using Rafty.Infrastructure;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ocelot.Raft
{
//todo - use async await
[ExcludeFromCoverage]
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Rafty.Infrastructure;
using Rafty.Log;
public class SqlLiteLog : ILog
{
private string _path;
private readonly object _lock = new object();
private readonly string _path;
private readonly SemaphoreSlim _sempaphore = new SemaphoreSlim(1, 1);
private readonly ILogger _logger;
private readonly NodeId _nodeId;
public SqlLiteLog(NodeId nodeId)
public SqlLiteLog(NodeId nodeId, ILoggerFactory loggerFactory)
{
_path = $"{nodeId.Id.Replace("/","").Replace(":","")}.db";
if(!File.Exists(_path))
{
lock(_lock)
{
FileStream fs = File.Create(_path);
fs.Dispose();
}
_logger = loggerFactory.CreateLogger<SqlLiteLog>();
_nodeId = nodeId;
_path = $"{nodeId.Id.Replace("/", "").Replace(":", "")}.db";
_sempaphore.Wait();
using(var connection = new SqliteConnection($"Data Source={_path};"))
if (!File.Exists(_path))
{
var fs = File.Create(_path);
fs.Dispose();
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
var sql = @"create table logs (
const string sql = @"create table logs (
id integer primary key,
data text not null
)";
using(var command = new SqliteCommand(sql, connection))
using (var command = new SqliteCommand(sql, connection))
{
var result = command.ExecuteNonQuery();
_logger.LogInformation(result == 0
? $"id: {_nodeId.Id} create database, result: {result}"
: $"id: {_nodeId.Id} did not create database., result: {result}");
}
}
}
_sempaphore.Release();
}
public Task<int> LastLogIndex()
public async Task<int> LastLogIndex()
{
lock(_lock)
_sempaphore.Wait();
var result = 1;
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
var result = 1;
using(var connection = new SqliteConnection($"Data Source={_path};"))
connection.Open();
var sql = @"select id from logs order by id desc limit 1";
using (var command = new SqliteCommand(sql, connection))
{
connection.Open();
var sql = @"select id from logs order by id desc limit 1";
using(var command = new SqliteCommand(sql, connection))
var index = Convert.ToInt32(await command.ExecuteScalarAsync());
if (index > result)
{
var index = Convert.ToInt32(command.ExecuteScalar());
if(index > result)
{
result = index;
}
result = index;
}
}
return Task.FromResult(result);
}
_sempaphore.Release();
return result;
}
public Task<long> LastLogTerm ()
public async Task<long> LastLogTerm()
{
lock(_lock)
_sempaphore.Wait();
long result = 0;
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
long result = 0;
using(var connection = new SqliteConnection($"Data Source={_path};"))
connection.Open();
var sql = @"select data from logs order by id desc limit 1";
using (var command = new SqliteCommand(sql, connection))
{
connection.Open();
var sql = @"select data from logs order by id desc limit 1";
using(var command = new SqliteCommand(sql, connection))
{
var data = Convert.ToString(command.ExecuteScalar());
var jsonSerializerSettings = new JsonSerializerSettings() {
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
if(log != null && log.Term > result)
{
result = log.Term;
}
}
}
return Task.FromResult(result);
}
}
public Task<int> Count ()
{
lock(_lock)
{
var result = 0;
using(var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
var sql = @"select count(id) from logs";
using(var command = new SqliteCommand(sql, connection))
{
var index = Convert.ToInt32(command.ExecuteScalar());
if(index > result)
{
result = index;
}
}
}
return Task.FromResult(result);
}
}
public Task<int> Apply(LogEntry log)
{
lock(_lock)
{
using(var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
var jsonSerializerSettings = new JsonSerializerSettings() {
var data = Convert.ToString(await command.ExecuteScalarAsync());
var jsonSerializerSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.All
};
var data = JsonConvert.SerializeObject(log, jsonSerializerSettings);
//todo - sql injection dont copy this..
var sql = $"insert into logs (data) values ('{data}')";
using(var command = new SqliteCommand(sql, connection))
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
if (log != null && log.Term > result)
{
var result = command.ExecuteNonQuery();
result = log.Term;
}
sql = "select last_insert_rowid()";
using(var command = new SqliteCommand(sql, connection))
{
var result = command.ExecuteScalar();
return Task.FromResult(Convert.ToInt32(result));
}
}
}
_sempaphore.Release();
return result;
}
public async Task<int> Count()
{
_sempaphore.Wait();
var result = 0;
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
var sql = @"select count(id) from logs";
using (var command = new SqliteCommand(sql, connection))
{
var index = Convert.ToInt32(await command.ExecuteScalarAsync());
if (index > result)
{
result = index;
}
}
}
_sempaphore.Release();
return result;
}
public async Task<int> Apply(LogEntry log)
{
_sempaphore.Wait();
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
var jsonSerializerSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.All
};
var data = JsonConvert.SerializeObject(log, jsonSerializerSettings);
//todo - sql injection dont copy this..
var sql = $"insert into logs (data) values ('{data}')";
_logger.LogInformation($"id: {_nodeId.Id}, sql: {sql}");
using (var command = new SqliteCommand(sql, connection))
{
var result = await command.ExecuteNonQueryAsync();
_logger.LogInformation($"id: {_nodeId.Id}, insert log result: {result}");
}
sql = "select last_insert_rowid()";
using (var command = new SqliteCommand(sql, connection))
{
var result = await command.ExecuteScalarAsync();
_logger.LogInformation($"id: {_nodeId.Id}, about to release semaphore");
_sempaphore.Release();
_logger.LogInformation($"id: {_nodeId.Id}, saved log to sqlite");
return Convert.ToInt32(result);
}
}
}
public Task DeleteConflictsFromThisLog(int index, LogEntry logEntry)
public async Task DeleteConflictsFromThisLog(int index, LogEntry logEntry)
{
lock(_lock)
_sempaphore.Wait();
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
using(var connection = new SqliteConnection($"Data Source={_path};"))
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select data from logs where id = {index};";
_logger.LogInformation($"id: {_nodeId.Id} sql: {sql}");
using (var command = new SqliteCommand(sql, connection))
{
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select data from logs where id = {index};";
using(var command = new SqliteCommand(sql, connection))
var data = Convert.ToString(await command.ExecuteScalarAsync());
var jsonSerializerSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.All
};
_logger.LogInformation($"id {_nodeId.Id} got log for index: {index}, data is {data} and new log term is {logEntry.Term}");
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
if (logEntry != null && log != null && logEntry.Term != log.Term)
{
var data = Convert.ToString(command.ExecuteScalar());
var jsonSerializerSettings = new JsonSerializerSettings() {
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
if(logEntry != null && log != null && logEntry.Term != log.Term)
//todo - sql injection dont copy this..
var deleteSql = $"delete from logs where id >= {index};";
_logger.LogInformation($"id: {_nodeId.Id} sql: {deleteSql}");
using (var deleteCommand = new SqliteCommand(deleteSql, connection))
{
//todo - sql injection dont copy this..
var deleteSql = $"delete from logs where id >= {index};";
using(var deleteCommand = new SqliteCommand(deleteSql, connection))
{
var result = deleteCommand.ExecuteNonQuery();
}
var result = await deleteCommand.ExecuteNonQueryAsync();
}
}
}
}
return Task.CompletedTask;
_sempaphore.Release();
}
public Task<LogEntry> Get(int index)
public async Task<bool> IsDuplicate(int index, LogEntry logEntry)
{
lock(_lock)
_sempaphore.Wait();
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
using(var connection = new SqliteConnection($"Data Source={_path};"))
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select data from logs where id = {index};";
using (var command = new SqliteCommand(sql, connection))
{
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select data from logs where id = {index}";
using(var command = new SqliteCommand(sql, connection))
var data = Convert.ToString(await command.ExecuteScalarAsync());
var jsonSerializerSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
if (logEntry != null && log != null && logEntry.Term == log.Term)
{
var data = Convert.ToString(command.ExecuteScalar());
var jsonSerializerSettings = new JsonSerializerSettings() {
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
return Task.FromResult(log);
_sempaphore.Release();
return true;
}
}
}
_sempaphore.Release();
return false;
}
public async Task<LogEntry> Get(int index)
{
_sempaphore.Wait();
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select data from logs where id = {index}";
using (var command = new SqliteCommand(sql, connection))
{
var data = Convert.ToString(await command.ExecuteScalarAsync());
var jsonSerializerSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
_sempaphore.Release();
return log;
}
}
}
public Task<List<(int index, LogEntry logEntry)>> GetFrom(int index)
public async Task<List<(int index, LogEntry logEntry)>> GetFrom(int index)
{
lock(_lock)
{
var logsToReturn = new List<(int, LogEntry)>();
_sempaphore.Wait();
var logsToReturn = new List<(int, LogEntry)>();
using(var connection = new SqliteConnection($"Data Source={_path};"))
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select id, data from logs where id >= {index}";
using (var command = new SqliteCommand(sql, connection))
{
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select id, data from logs where id >= {index}";
using(var command = new SqliteCommand(sql, connection))
using (var reader = await command.ExecuteReaderAsync())
{
using(var reader = command.ExecuteReader())
while (reader.Read())
{
while(reader.Read())
{
var id = Convert.ToInt32(reader[0]);
var data = (string)reader[1];
var jsonSerializerSettings = new JsonSerializerSettings() {
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
logsToReturn.Add((id, log));
}
var id = Convert.ToInt32(reader[0]);
var data = (string)reader[1];
var jsonSerializerSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
logsToReturn.Add((id, log));
}
}
}
return Task.FromResult(logsToReturn);
}
}
public Task<long> GetTermAtIndex(int index)
{
lock(_lock)
{
long result = 0;
using(var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select data from logs where id = {index}";
using(var command = new SqliteCommand(sql, connection))
{
var data = Convert.ToString(command.ExecuteScalar());
var jsonSerializerSettings = new JsonSerializerSettings() {
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
if(log != null && log.Term > result)
{
result = log.Term;
}
}
}
return Task.FromResult(result);
}
_sempaphore.Release();
return logsToReturn;
}
}
public Task Remove(int indexOfCommand)
public async Task<long> GetTermAtIndex(int index)
{
lock(_lock)
_sempaphore.Wait();
long result = 0;
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
using(var connection = new SqliteConnection($"Data Source={_path};"))
connection.Open();
//todo - sql injection dont copy this..
var sql = $"select data from logs where id = {index}";
using (var command = new SqliteCommand(sql, connection))
{
connection.Open();
//todo - sql injection dont copy this..
var deleteSql = $"delete from logs where id >= {indexOfCommand};";
using(var deleteCommand = new SqliteCommand(deleteSql, connection))
var data = Convert.ToString(await command.ExecuteScalarAsync());
var jsonSerializerSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.All
};
var log = JsonConvert.DeserializeObject<LogEntry>(data, jsonSerializerSettings);
if (log != null && log.Term > result)
{
var result = deleteCommand.ExecuteNonQuery();
result = log.Term;
}
}
}
return Task.CompletedTask;
_sempaphore.Release();
return result;
}
public async Task Remove(int indexOfCommand)
{
_sempaphore.Wait();
using (var connection = new SqliteConnection($"Data Source={_path};"))
{
connection.Open();
//todo - sql injection dont copy this..
var deleteSql = $"delete from logs where id >= {indexOfCommand};";
_logger.LogInformation($"id: {_nodeId.Id} Remove {deleteSql}");
using (var deleteCommand = new SqliteCommand(deleteSql, connection))
{
var result = await deleteCommand.ExecuteNonQueryAsync();
}
}
_sempaphore.Release();
}
}
}
}