implementing load balancers

This commit is contained in:
Tom Gardham-Pallister 2017-01-29 09:41:05 +00:00
parent c3a47f66c8
commit 4a43accc46
6 changed files with 427 additions and 27 deletions

View File

@ -46,12 +46,18 @@ namespace Ocelot.DownstreamUrlCreator.Middleware
var dsScheme = DownstreamRoute.ReRoute.DownstreamScheme; var dsScheme = DownstreamRoute.ReRoute.DownstreamScheme;
//here we could have a lb factory that takes stuff or we could just get the load balancer from the reRoute?
//returns the lb for this request
//lease the next address from the lb
var dsHostAndPort = DownstreamRoute.ReRoute.DownstreamHostAndPort(); var dsHostAndPort = DownstreamRoute.ReRoute.DownstreamHostAndPort();
var dsUrl = _urlBuilder.Build(dsPath.Data.Value, dsScheme, dsHostAndPort); var dsUrl = _urlBuilder.Build(dsPath.Data.Value, dsScheme, dsHostAndPort);
if (dsUrl.IsError) if (dsUrl.IsError)
{ {
//todo - release the lb connection?
_logger.LogDebug("IUrlBuilder returned an error, setting pipeline error"); _logger.LogDebug("IUrlBuilder returned an error, setting pipeline error");
SetPipelineError(dsUrl.Errors); SetPipelineError(dsUrl.Errors);
@ -66,6 +72,8 @@ namespace Ocelot.DownstreamUrlCreator.Middleware
await _next.Invoke(context); await _next.Invoke(context);
//todo - release the lb connection?
_logger.LogDebug("succesfully called next middleware"); _logger.LogDebug("succesfully called next middleware");
} }
} }

View File

@ -21,6 +21,8 @@
DownstreamPathTemplateContainsSchemeError, DownstreamPathTemplateContainsSchemeError,
DownstreamPathNullOrEmptyError, DownstreamPathNullOrEmptyError,
DownstreamSchemeNullOrEmptyError, DownstreamSchemeNullOrEmptyError,
DownstreamHostNullOrEmptyError DownstreamHostNullOrEmptyError,
ServicesAreNullError,
ServicesAreEmptyError
} }
} }

View File

@ -10,5 +10,10 @@
public string DownstreamHost { get; private set; } public string DownstreamHost { get; private set; }
public int DownstreamPort { get; private set; } public int DownstreamPort { get; private set; }
public override string ToString()
{
return $"{DownstreamHost}:{DownstreamPort}";
}
} }
} }

View File

@ -0,0 +1,362 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Ocelot.Errors;
using Ocelot.Responses;
using Ocelot.Values;
using Shouldly;
using TestStack.BDDfy;
using Xunit;
namespace Ocelot.UnitTests
{
public class LeastConnectionTests
{
private HostAndPort _hostAndPort;
private Response<HostAndPort> _result;
private LeastConnection _leastConnection;
private List<Service> _services;
public LeastConnectionTests()
{
}
[Fact]
public void should_get_next_url()
{
var serviceName = "products";
var hostAndPort = new HostAndPort("localhost", 80);
var availableServices = new List<Service>
{
new Service(serviceName, hostAndPort)
};
this.Given(x => x.GivenAHostAndPort(hostAndPort))
.And(x => x.GivenTheLoadBalancerStarts(availableServices, serviceName))
.When(x => x.WhenIGetTheNextHostAndPort())
.Then(x => x.ThenTheNextHostAndPortIsReturned())
.BDDfy();
}
[Fact]
public void should_serve_from_service_with_least_connections()
{
var serviceName = "products";
var availableServices = new List<Service>
{
new Service(serviceName, new HostAndPort("127.0.0.1", 80)),
new Service(serviceName, new HostAndPort("127.0.0.2", 80)),
new Service(serviceName, new HostAndPort("127.0.0.3", 80))
};
_services = availableServices;
_leastConnection = new LeastConnection(() => _services, serviceName);
var response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[2].HostAndPort.DownstreamHost);
}
[Fact]
public void should_build_connections_per_service()
{
var serviceName = "products";
var availableServices = new List<Service>
{
new Service(serviceName, new HostAndPort("127.0.0.1", 80)),
new Service(serviceName, new HostAndPort("127.0.0.2", 80)),
};
_services = availableServices;
_leastConnection = new LeastConnection(() => _services, serviceName);
var response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost);
}
[Fact]
public void should_release_connection()
{
var serviceName = "products";
var availableServices = new List<Service>
{
new Service(serviceName, new HostAndPort("127.0.0.1", 80)),
new Service(serviceName, new HostAndPort("127.0.0.2", 80)),
};
_services = availableServices;
_leastConnection = new LeastConnection(() => _services, serviceName);
var response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost);
//release this so 2 should have 1 connection and we should get 2 back as our next host and port
_leastConnection.Release(availableServices[1].HostAndPort);
response = _leastConnection.Lease();
response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost);
}
[Fact]
public void should_return_error_if_services_are_null()
{
var serviceName = "products";
var hostAndPort = new HostAndPort("localhost", 80);
this.Given(x => x.GivenAHostAndPort(hostAndPort))
.And(x => x.GivenTheLoadBalancerStarts(null, serviceName))
.When(x => x.WhenIGetTheNextHostAndPort())
.Then(x => x.ThenServiceAreNullErrorIsReturned())
.BDDfy();
}
[Fact]
public void should_return_error_if_services_are_empty()
{
var serviceName = "products";
var hostAndPort = new HostAndPort("localhost", 80);
this.Given(x => x.GivenAHostAndPort(hostAndPort))
.And(x => x.GivenTheLoadBalancerStarts(new List<Service>(), serviceName))
.When(x => x.WhenIGetTheNextHostAndPort())
.Then(x => x.ThenServiceAreEmptyErrorIsReturned())
.BDDfy();
}
private void ThenServiceAreNullErrorIsReturned()
{
_result.IsError.ShouldBeTrue();
_result.Errors[0].ShouldBeOfType<ServicesAreNullError>();
}
private void ThenServiceAreEmptyErrorIsReturned()
{
_result.IsError.ShouldBeTrue();
_result.Errors[0].ShouldBeOfType<ServicesAreEmptyError>();
}
private void GivenTheLoadBalancerStarts(List<Service> services, string serviceName)
{
_services = services;
_leastConnection = new LeastConnection(() => _services, serviceName);
}
private void WhenTheLoadBalancerStarts(List<Service> services, string serviceName)
{
GivenTheLoadBalancerStarts(services, serviceName);
}
private void GivenAHostAndPort(HostAndPort hostAndPort)
{
_hostAndPort = hostAndPort;
}
private void WhenIGetTheNextHostAndPort()
{
_result = _leastConnection.Lease();
}
private void ThenTheNextHostAndPortIsReturned()
{
_result.Data.DownstreamHost.ShouldBe(_hostAndPort.DownstreamHost);
_result.Data.DownstreamPort.ShouldBe(_hostAndPort.DownstreamPort);
}
}
public class LeastConnection : ILoadBalancer
{
private Func<List<Service>> _services;
private List<Lease> _leases;
private string _serviceName;
public LeastConnection(Func<List<Service>> services, string serviceName)
{
_services = services;
_serviceName = serviceName;
_leases = new List<Lease>();
}
public Response<HostAndPort> Lease()
{
var services = _services();
if(services == null)
{
return new ErrorResponse<HostAndPort>(new List<Error>(){ new ServicesAreNullError($"services were null for {_serviceName}")});
}
if(!services.Any())
{
return new ErrorResponse<HostAndPort>(new List<Error>(){ new ServicesAreEmptyError($"services were empty for {_serviceName}")});
}
//todo - maybe this should be moved somewhere else...? Maybe on a repeater on seperate thread? loop every second and update or something?
UpdateServices(services);
var leaseWithLeastConnections = GetLeaseWithLeastConnections();
_leases.Remove(leaseWithLeastConnections);
leaseWithLeastConnections = AddConnection(leaseWithLeastConnections);
_leases.Add(leaseWithLeastConnections);
return new OkResponse<HostAndPort>(new HostAndPort(leaseWithLeastConnections.HostAndPort.DownstreamHost, leaseWithLeastConnections.HostAndPort.DownstreamPort));
}
public Response Release(HostAndPort hostAndPort)
{
var matchingLease = _leases.FirstOrDefault(l => l.HostAndPort.DownstreamHost == hostAndPort.DownstreamHost
&& l.HostAndPort.DownstreamPort == hostAndPort.DownstreamPort);
if(matchingLease != null)
{
var replacementLease = new Lease(hostAndPort, matchingLease.Connections - 1);
_leases.Remove(matchingLease);
_leases.Add(replacementLease);
}
return new OkResponse();
}
private Lease AddConnection(Lease lease)
{
return new Lease(lease.HostAndPort, lease.Connections + 1);
}
private Lease GetLeaseWithLeastConnections()
{
//now get the service with the least connections?
Lease leaseWithLeastConnections = null;
for(var i = 0; i < _leases.Count; i++)
{
if(i == 0)
{
leaseWithLeastConnections = _leases[i];
}
else
{
if(_leases[i].Connections < leaseWithLeastConnections.Connections)
{
leaseWithLeastConnections = _leases[i];
}
}
}
return leaseWithLeastConnections;
}
private Response UpdateServices(List<Service> services)
{
if(_leases.Count > 0)
{
var leasesToRemove = new List<Lease>();
foreach(var lease in _leases)
{
var match = services.FirstOrDefault(s => s.HostAndPort.DownstreamHost == lease.HostAndPort.DownstreamHost
&& s.HostAndPort.DownstreamPort == lease.HostAndPort.DownstreamPort);
if(match == null)
{
leasesToRemove.Add(lease);
}
}
foreach(var lease in leasesToRemove)
{
_leases.Remove(lease);
}
foreach(var service in services)
{
var exists = _leases.FirstOrDefault(l => l.HostAndPort.ToString() == service.HostAndPort.ToString());
if(exists == null)
{
_leases.Add(new Lease(service.HostAndPort, 0));
}
}
}
else
{
foreach(var service in services)
{
_leases.Add(new Lease(service.HostAndPort, 0));
}
}
return new OkResponse();
}
}
public class Lease
{
public Lease(HostAndPort hostAndPort, int connections)
{
HostAndPort = hostAndPort;
Connections = connections;
}
public HostAndPort HostAndPort {get;private set;}
public int Connections {get;private set;}
}
public class ServicesAreNullError : Error
{
public ServicesAreNullError(string message)
: base(message, OcelotErrorCode.ServicesAreNullError)
{
}
}
public class ServicesAreEmptyError : Error
{
public ServicesAreEmptyError(string message)
: base(message, OcelotErrorCode.ServicesAreEmptyError)
{
}
}
}

View File

@ -1,7 +1,9 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using Ocelot.Responses;
using Ocelot.Values; using Ocelot.Values;
using Shouldly; using Shouldly;
using TestStack.BDDfy;
using Xunit; using Xunit;
namespace Ocelot.UnitTests namespace Ocelot.UnitTests
@ -10,6 +12,7 @@ namespace Ocelot.UnitTests
{ {
private readonly RoundRobin _roundRobin; private readonly RoundRobin _roundRobin;
private readonly List<HostAndPort> _hostAndPorts; private readonly List<HostAndPort> _hostAndPorts;
private Response<HostAndPort> _hostAndPort;
public RoundRobinTests() public RoundRobinTests()
{ {
@ -26,12 +29,13 @@ namespace Ocelot.UnitTests
[Fact] [Fact]
public void should_get_next_address() public void should_get_next_address()
{ {
var address = _roundRobin.Next(); this.Given(x => x.GivenIGetTheNextAddress())
address.ShouldBe(_hostAndPorts[0]); .Then(x => x.ThenTheNextAddressIndexIs(0))
address = _roundRobin.Next(); .Given(x => x.GivenIGetTheNextAddress())
address.ShouldBe(_hostAndPorts[1]); .Then(x => x.ThenTheNextAddressIndexIs(1))
address = _roundRobin.Next(); .Given(x => x.GivenIGetTheNextAddress())
address.ShouldBe(_hostAndPorts[2]); .Then(x => x.ThenTheNextAddressIndexIs(2))
.BDDfy();
} }
[Fact] [Fact]
@ -41,19 +45,30 @@ namespace Ocelot.UnitTests
while (stopWatch.ElapsedMilliseconds < 1000) while (stopWatch.ElapsedMilliseconds < 1000)
{ {
var address = _roundRobin.Next(); var address = _roundRobin.Lease();
address.ShouldBe(_hostAndPorts[0]); address.Data.ShouldBe(_hostAndPorts[0]);
address = _roundRobin.Next(); address = _roundRobin.Lease();
address.ShouldBe(_hostAndPorts[1]); address.Data.ShouldBe(_hostAndPorts[1]);
address = _roundRobin.Next(); address = _roundRobin.Lease();
address.ShouldBe(_hostAndPorts[2]); address.Data.ShouldBe(_hostAndPorts[2]);
} }
} }
private void GivenIGetTheNextAddress()
{
_hostAndPort = _roundRobin.Lease();
}
private void ThenTheNextAddressIndexIs(int index)
{
_hostAndPort.Data.ShouldBe(_hostAndPorts[index]);
}
} }
public interface ILoadBalancer public interface ILoadBalancer
{ {
HostAndPort Next(); Response<HostAndPort> Lease();
Response Release(HostAndPort hostAndPort);
} }
public class RoundRobin : ILoadBalancer public class RoundRobin : ILoadBalancer
@ -66,7 +81,7 @@ namespace Ocelot.UnitTests
_hostAndPorts = hostAndPorts; _hostAndPorts = hostAndPorts;
} }
public HostAndPort Next() public Response<HostAndPort> Lease()
{ {
if (_last >= _hostAndPorts.Count) if (_last >= _hostAndPorts.Count)
{ {
@ -75,7 +90,12 @@ namespace Ocelot.UnitTests
var next = _hostAndPorts[_last]; var next = _hostAndPorts[_last];
_last++; _last++;
return next; return new OkResponse<HostAndPort>(next);
}
public Response Release(HostAndPort hostAndPort)
{
return new OkResponse();
} }
} }
} }

View File

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using Ocelot.Values;
using Shouldly; using Shouldly;
using TestStack.BDDfy; using TestStack.BDDfy;
using Xunit; using Xunit;
@ -21,7 +22,7 @@ namespace Ocelot.UnitTests
[Fact] [Fact]
public void should_register_service() public void should_register_service()
{ {
this.Given(x => x.GivenAServiceToRegister("product", "localhost:5000")) this.Given(x => x.GivenAServiceToRegister("product", "localhost:5000", 80))
.When(x => x.WhenIRegisterTheService()) .When(x => x.WhenIRegisterTheService())
.Then(x => x.ThenTheServiceIsRegistered()) .Then(x => x.ThenTheServiceIsRegistered())
.BDDfy(); .BDDfy();
@ -29,7 +30,7 @@ namespace Ocelot.UnitTests
public void should_lookup_service() public void should_lookup_service()
{ {
this.Given(x => x.GivenAServiceIsRegistered("product", "localhost:600")) this.Given(x => x.GivenAServiceIsRegistered("product", "localhost:600", 80))
.When(x => x.WhenILookupTheService("product")) .When(x => x.WhenILookupTheService("product"))
.Then(x => x.ThenTheServiceDetailsAreReturned()) .Then(x => x.ThenTheServiceDetailsAreReturned())
.BDDfy(); .BDDfy();
@ -37,7 +38,8 @@ namespace Ocelot.UnitTests
private void ThenTheServiceDetailsAreReturned() private void ThenTheServiceDetailsAreReturned()
{ {
_services[0].Address.ShouldBe(_service.Address); _services[0].HostAndPort.DownstreamHost.ShouldBe(_service.HostAndPort.DownstreamHost);
_services[0].HostAndPort.DownstreamPort.ShouldBe(_service.HostAndPort.DownstreamPort);
_services[0].Name.ShouldBe(_service.Name); _services[0].Name.ShouldBe(_service.Name);
} }
@ -46,15 +48,15 @@ namespace Ocelot.UnitTests
_services = _serviceRegistry.Lookup(name); _services = _serviceRegistry.Lookup(name);
} }
private void GivenAServiceIsRegistered(string name, string address) private void GivenAServiceIsRegistered(string name, string address, int port)
{ {
_service = new Service(name, address); _service = new Service(name, new HostAndPort(address, port));
_serviceRepository.Set(_service); _serviceRepository.Set(_service);
} }
private void GivenAServiceToRegister(string name, string address) private void GivenAServiceToRegister(string name, string address, int port)
{ {
_service = new Service(name, address); _service = new Service(name, new HostAndPort(address, port));
} }
private void WhenIRegisterTheService() private void WhenIRegisterTheService()
@ -65,7 +67,8 @@ namespace Ocelot.UnitTests
private void ThenTheServiceIsRegistered() private void ThenTheServiceIsRegistered()
{ {
var serviceNameAndAddress = _serviceRepository.Get(_service.Name); var serviceNameAndAddress = _serviceRepository.Get(_service.Name);
serviceNameAndAddress[0].Address.ShouldBe(_service.Address); serviceNameAndAddress[0].HostAndPort.DownstreamHost.ShouldBe(_service.HostAndPort.DownstreamHost);
serviceNameAndAddress[0].HostAndPort.DownstreamPort.ShouldBe(_service.HostAndPort.DownstreamPort);
serviceNameAndAddress[0].Name.ShouldBe(_service.Name); serviceNameAndAddress[0].Name.ShouldBe(_service.Name);
} }
} }
@ -96,13 +99,13 @@ namespace Ocelot.UnitTests
public class Service public class Service
{ {
public Service(string name, string address) public Service(string name, HostAndPort hostAndPort)
{ {
Name = name; Name = name;
Address = address; HostAndPort = hostAndPort;
} }
public string Name {get; private set;} public string Name {get; private set;}
public string Address {get; private set;} public HostAndPort HostAndPort {get; private set;}
} }
public interface IServiceRepository public interface IServiceRepository