wip fake consul provider

This commit is contained in:
Tom Gardham-Pallister 2017-02-05 21:08:16 +00:00
parent c46dcc05b8
commit fb0f101732
7 changed files with 174 additions and 39 deletions

View File

@ -13,6 +13,7 @@ namespace Ocelot.LoadBalancer.LoadBalancers
private readonly Func<Task<List<Service>>> _services; private readonly Func<Task<List<Service>>> _services;
private readonly List<Lease> _leases; private readonly List<Lease> _leases;
private readonly string _serviceName; private readonly string _serviceName;
private static readonly object _syncLock = new object();
public LeastConnectionLoadBalancer(Func<Task<List<Service>>> services, string serviceName) public LeastConnectionLoadBalancer(Func<Task<List<Service>>> services, string serviceName)
{ {
@ -35,32 +36,38 @@ namespace Ocelot.LoadBalancer.LoadBalancers
return new ErrorResponse<HostAndPort>(new List<Error>() { new ServicesAreEmptyError($"services were empty for {_serviceName}") }); return new ErrorResponse<HostAndPort>(new List<Error>() { new ServicesAreEmptyError($"services were empty for {_serviceName}") });
} }
//todo - maybe this should be moved somewhere else...? Maybe on a repeater on seperate thread? loop every second and update or something? lock(_syncLock)
UpdateServices(services); {
//todo - maybe this should be moved somewhere else...? Maybe on a repeater on seperate thread? loop every second and update or something?
UpdateServices(services);
var leaseWithLeastConnections = GetLeaseWithLeastConnections(); var leaseWithLeastConnections = GetLeaseWithLeastConnections();
_leases.Remove(leaseWithLeastConnections); _leases.Remove(leaseWithLeastConnections);
leaseWithLeastConnections = AddConnection(leaseWithLeastConnections); leaseWithLeastConnections = AddConnection(leaseWithLeastConnections);
_leases.Add(leaseWithLeastConnections); _leases.Add(leaseWithLeastConnections);
return new OkResponse<HostAndPort>(new HostAndPort(leaseWithLeastConnections.HostAndPort.DownstreamHost, leaseWithLeastConnections.HostAndPort.DownstreamPort)); return new OkResponse<HostAndPort>(new HostAndPort(leaseWithLeastConnections.HostAndPort.DownstreamHost, leaseWithLeastConnections.HostAndPort.DownstreamPort));
}
} }
public Response Release(HostAndPort hostAndPort) public Response Release(HostAndPort hostAndPort)
{ {
var matchingLease = _leases.FirstOrDefault(l => l.HostAndPort.DownstreamHost == hostAndPort.DownstreamHost lock(_syncLock)
&& l.HostAndPort.DownstreamPort == hostAndPort.DownstreamPort);
if (matchingLease != null)
{ {
var replacementLease = new Lease(hostAndPort, matchingLease.Connections - 1); var matchingLease = _leases.FirstOrDefault(l => l.HostAndPort.DownstreamHost == hostAndPort.DownstreamHost
&& l.HostAndPort.DownstreamPort == hostAndPort.DownstreamPort);
_leases.Remove(matchingLease); if (matchingLease != null)
{
var replacementLease = new Lease(hostAndPort, matchingLease.Connections - 1);
_leases.Add(replacementLease); _leases.Remove(matchingLease);
_leases.Add(replacementLease);
}
} }
return new OkResponse(); return new OkResponse();

View File

@ -32,8 +32,16 @@ namespace Ocelot.LoadBalancer.LoadBalancers
public Response Add(string key, ILoadBalancer loadBalancer) public Response Add(string key, ILoadBalancer loadBalancer)
{ {
_loadBalancers[key] = loadBalancer; try
return new OkResponse(); {
_loadBalancers.Add(key, loadBalancer);
return new OkResponse();
}
catch (System.Exception exception)
{
Console.WriteLine(exception.StackTrace);
throw;
}
} }
} }
} }

View File

@ -18,6 +18,7 @@ namespace Ocelot.Middleware
using System.Threading.Tasks; using System.Threading.Tasks;
using Authorisation.Middleware; using Authorisation.Middleware;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Ocelot.Configuration.Provider;
using Ocelot.LoadBalancer.Middleware; using Ocelot.LoadBalancer.Middleware;
public static class OcelotMiddlewareExtensions public static class OcelotMiddlewareExtensions
@ -29,6 +30,7 @@ namespace Ocelot.Middleware
/// <returns></returns> /// <returns></returns>
public static IApplicationBuilder UseOcelot(this IApplicationBuilder builder) public static IApplicationBuilder UseOcelot(this IApplicationBuilder builder)
{ {
CreateConfiguration(builder);
builder.UseOcelot(new OcelotMiddlewareConfiguration()); builder.UseOcelot(new OcelotMiddlewareConfiguration());
return builder; return builder;
} }
@ -41,6 +43,8 @@ namespace Ocelot.Middleware
/// <returns></returns> /// <returns></returns>
public static IApplicationBuilder UseOcelot(this IApplicationBuilder builder, OcelotMiddlewareConfiguration middlewareConfiguration) public static IApplicationBuilder UseOcelot(this IApplicationBuilder builder, OcelotMiddlewareConfiguration middlewareConfiguration)
{ {
CreateConfiguration(builder);
// This is registered to catch any global exceptions that are not handled // This is registered to catch any global exceptions that are not handled
builder.UseExceptionHandlerMiddleware(); builder.UseExceptionHandlerMiddleware();
@ -118,6 +122,18 @@ namespace Ocelot.Middleware
return builder; return builder;
} }
private static void CreateConfiguration(IApplicationBuilder builder)
{
var configProvider = (IOcelotConfigurationProvider)builder.ApplicationServices.GetService(typeof(IOcelotConfigurationProvider));
var config = configProvider.Get();
if(config == null)
{
throw new Exception("Unable to start Ocelot: configuration was null");
}
}
private static void UseIfNotNull(this IApplicationBuilder builder, Func<HttpContext, Func<Task>, Task> middleware) private static void UseIfNotNull(this IApplicationBuilder builder, Func<HttpContext, Func<Task>, Task> middleware)
{ {
if (middleware != null) if (middleware != null)

View File

@ -2,6 +2,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Net; using System.Net;
using Consul;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
@ -14,13 +15,19 @@ namespace Ocelot.AcceptanceTests
{ {
public class ServiceDiscoveryTests : IDisposable public class ServiceDiscoveryTests : IDisposable
{ {
private IWebHost _builder; private IWebHost _builderOne;
private IWebHost _builderTwo;
private IWebHost _fakeConsulBuilder; private IWebHost _fakeConsulBuilder;
private readonly Steps _steps; private readonly Steps _steps;
private List<ServiceEntry> _serviceEntries;
private int _counterOne;
private int _counterTwo;
private static readonly object _syncLock = new object();
public ServiceDiscoveryTests() public ServiceDiscoveryTests()
{ {
_steps = new Steps(); _steps = new Steps();
_serviceEntries = new List<ServiceEntry>();
} }
[Fact] [Fact]
@ -32,6 +39,28 @@ namespace Ocelot.AcceptanceTests
var fakeConsulServiceDiscoveryUrl = "http://localhost:8500"; var fakeConsulServiceDiscoveryUrl = "http://localhost:8500";
var downstreamServiceOneCounter = 0; var downstreamServiceOneCounter = 0;
var downstreamServiceTwoCounter = 0; var downstreamServiceTwoCounter = 0;
var serviceEntryOne = new ServiceEntry()
{
Service = new AgentService()
{
Service = serviceName,
Address = "localhost",
Port = 50879,
ID = Guid.NewGuid().ToString(),
Tags = new string[0]
},
};
var serviceEntryTwo = new ServiceEntry()
{
Service = new AgentService()
{
Service = serviceName,
Address = "localhost",
Port = 50880,
ID = Guid.NewGuid().ToString(),
Tags = new string[0]
},
};
var configuration = new FileConfiguration var configuration = new FileConfiguration
{ {
@ -52,38 +81,42 @@ namespace Ocelot.AcceptanceTests
ServiceDiscoveryProvider = new FileServiceDiscoveryProvider() ServiceDiscoveryProvider = new FileServiceDiscoveryProvider()
{ {
Provider = "Consul", Provider = "Consul",
Host = "localhost" Host = "localhost",
Port = 8500
} }
} }
}; };
this.Given(x => x.GivenThereIsAServiceRunningOn(downstreamServiceOneUrl, 200, downstreamServiceOneCounter)) this.Given(x => x.GivenProductServiceOneIsRunning(downstreamServiceOneUrl, 200))
.And(x => x.GivenThereIsAServiceRunningOn(downstreamServiceTwoUrl, 200, downstreamServiceTwoCounter)) .And(x => x.GivenProductServiceTwoIsRunning(downstreamServiceTwoUrl, 200))
.And(x => x.GivenThereIsAFakeConsulServiceDiscoveryProvider(fakeConsulServiceDiscoveryUrl)) .And(x => x.GivenThereIsAFakeConsulServiceDiscoveryProvider(fakeConsulServiceDiscoveryUrl))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceName, downstreamServiceOneUrl, downstreamServiceTwoUrl)) .And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceEntryOne, serviceEntryTwo))
.And(x => _steps.GivenThereIsAConfiguration(configuration)) .And(x => _steps.GivenThereIsAConfiguration(configuration))
.And(x => _steps.GivenOcelotIsRunning()) .And(x => _steps.GivenOcelotIsRunning())
.When(x => _steps.WhenIGetUrlOnTheApiGatewayMultipleTimes("/", 50)) .When(x => _steps.WhenIGetUrlOnTheApiGatewayMultipleTimes("/", 50))
.Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(50, downstreamServiceOneCounter, downstreamServiceTwoCounter)) .Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(50))
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(downstreamServiceOneCounter,downstreamServiceTwoCounter)) .And(x => x.ThenBothServicesCalledRealisticAmountOfTimes())
.BDDfy(); .BDDfy();
} }
private void ThenBothServicesCalledRealisticAmountOfTimes(int counterOne, int counterTwo) private void ThenBothServicesCalledRealisticAmountOfTimes()
{ {
counterOne.ShouldBeGreaterThan(10); _counterOne.ShouldBeGreaterThan(25);
counterTwo.ShouldBeGreaterThan(10); _counterTwo.ShouldBeGreaterThan(25);
} }
private void ThenTheTwoServicesShouldHaveBeenCalledTimes(int expected, int counterOne, int counterTwo) private void ThenTheTwoServicesShouldHaveBeenCalledTimes(int expected)
{ {
var total = counterOne + counterTwo; var total = _counterOne + _counterTwo;
total.ShouldBe(expected); total.ShouldBe(expected);
} }
private void GivenTheServicesAreRegisteredWithConsul(string serviceName, params string[] urls) private void GivenTheServicesAreRegisteredWithConsul(params ServiceEntry[] serviceEntries)
{ {
//register these services with fake consul foreach(var serviceEntry in serviceEntries)
{
_serviceEntries.Add(serviceEntry);
}
} }
private void GivenThereIsAFakeConsulServiceDiscoveryProvider(string url) private void GivenThereIsAFakeConsulServiceDiscoveryProvider(string url)
@ -98,7 +131,10 @@ namespace Ocelot.AcceptanceTests
{ {
app.Run(async context => app.Run(async context =>
{ {
//do consul shit if(context.Request.Path.Value == "/v1/health/service/product")
{
await context.Response.WriteJsonAsync(_serviceEntries);
}
}); });
}) })
.Build(); .Build();
@ -106,9 +142,9 @@ namespace Ocelot.AcceptanceTests
_fakeConsulBuilder.Start(); _fakeConsulBuilder.Start();
} }
private void GivenThereIsAServiceRunningOn(string url, int statusCode, int counter) private void GivenProductServiceOneIsRunning(string url, int statusCode)
{ {
_builder = new WebHostBuilder() _builderOne = new WebHostBuilder()
.UseUrls(url) .UseUrls(url)
.UseKestrel() .UseKestrel()
.UseContentRoot(Directory.GetCurrentDirectory()) .UseContentRoot(Directory.GetCurrentDirectory())
@ -118,18 +154,40 @@ namespace Ocelot.AcceptanceTests
{ {
app.Run(async context => app.Run(async context =>
{ {
counter++; _counterOne++;
context.Response.StatusCode = statusCode; context.Response.StatusCode = statusCode;
}); });
}) })
.Build(); .Build();
_builder.Start(); _builderOne.Start();
}
private void GivenProductServiceTwoIsRunning(string url, int statusCode)
{
_builderTwo = new WebHostBuilder()
.UseUrls(url)
.UseKestrel()
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()
.UseUrls(url)
.Configure(app =>
{
app.Run(async context =>
{
_counterTwo++;
context.Response.StatusCode = statusCode;
});
})
.Build();
_builderTwo.Start();
} }
public void Dispose() public void Dispose()
{ {
_builder?.Dispose(); _builderOne?.Dispose();
_builderTwo?.Dispose();
_steps.Dispose(); _steps.Dispose();
} }
} }

View File

@ -28,7 +28,7 @@
{ {
var runTime = $"{oSDescription}-{osArchitecture}".ToLower(); var runTime = $"{oSDescription}-{osArchitecture}".ToLower();
var configPath = $"./bin/Debug/netcoreapp{Version}/{runTime}/configuration.json"; var configPath = $"./test/Ocelot.AcceptanceTests/bin/Debug/netcoreapp{Version}/{runTime}/configuration.json";
return configPath; return configPath;
} }

View File

@ -32,7 +32,8 @@
"Microsoft.AspNetCore.Server.Kestrel": "1.1.0", "Microsoft.AspNetCore.Server.Kestrel": "1.1.0",
"Microsoft.NETCore.App": "1.1.0", "Microsoft.NETCore.App": "1.1.0",
"Shouldly": "2.8.2", "Shouldly": "2.8.2",
"TestStack.BDDfy": "4.3.2" "TestStack.BDDfy": "4.3.2",
"Consul": "0.7.2.1"
}, },
"runtimes": { "runtimes": {
"win10-x64": {}, "win10-x64": {},

View File

@ -1,3 +1,4 @@
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ocelot.LoadBalancer.LoadBalancers; using Ocelot.LoadBalancer.LoadBalancers;
@ -15,6 +16,50 @@ namespace Ocelot.UnitTests.LoadBalancer
private Response<HostAndPort> _result; private Response<HostAndPort> _result;
private LeastConnectionLoadBalancer _leastConnection; private LeastConnectionLoadBalancer _leastConnection;
private List<Service> _services; private List<Service> _services;
private Random _random;
public LeastConnectionTests()
{
_random = new Random();
}
[Fact]
public void should_be_able_to_lease_and_release_concurrently()
{
var serviceName = "products";
var availableServices = new List<Service>
{
new Service(serviceName, new HostAndPort("127.0.0.1", 80), string.Empty, string.Empty, new string[0]),
new Service(serviceName, new HostAndPort("127.0.0.2", 80), string.Empty, string.Empty, new string[0]),
};
_services = availableServices;
_leastConnection = new LeastConnectionLoadBalancer(() => Task.FromResult(_services), serviceName);
var tasks = new Task[100];
try
{
for(var i = 0; i < tasks.Length; i++)
{
tasks[i] = LeaseDelayAndRelease();
}
Task.WaitAll(tasks);
}
catch (System.Exception exception)
{
Console.WriteLine(exception.StackTrace);
throw;
}
}
private async Task LeaseDelayAndRelease()
{
var hostAndPort = await _leastConnection.Lease();
await Task.Delay(_random.Next(1, 100));
var response = _leastConnection.Release(hostAndPort.Data);
}
[Fact] [Fact]
public void should_get_next_url() public void should_get_next_url()