From 4a43accc4688012bbb6d07ee6d24a1dcf8789214 Mon Sep 17 00:00:00 2001 From: Tom Gardham-Pallister Date: Sun, 29 Jan 2017 09:41:05 +0000 Subject: [PATCH] implementing load balancers --- .../DownstreamUrlCreatorMiddleware.cs | 8 + src/Ocelot/Errors/OcelotErrorCode.cs | 4 +- src/Ocelot/Values/HostAndPort.cs | 5 + test/Ocelot.UnitTests/LeastConnectionTests.cs | 362 ++++++++++++++++++ test/Ocelot.UnitTests/RoundRobinTests.cs | 50 ++- test/Ocelot.UnitTests/ServiceRegistryTests.cs | 25 +- 6 files changed, 427 insertions(+), 27 deletions(-) create mode 100644 test/Ocelot.UnitTests/LeastConnectionTests.cs diff --git a/src/Ocelot/DownstreamUrlCreator/Middleware/DownstreamUrlCreatorMiddleware.cs b/src/Ocelot/DownstreamUrlCreator/Middleware/DownstreamUrlCreatorMiddleware.cs index 8144b42b..b4d73c7c 100644 --- a/src/Ocelot/DownstreamUrlCreator/Middleware/DownstreamUrlCreatorMiddleware.cs +++ b/src/Ocelot/DownstreamUrlCreator/Middleware/DownstreamUrlCreatorMiddleware.cs @@ -46,12 +46,18 @@ namespace Ocelot.DownstreamUrlCreator.Middleware var dsScheme = DownstreamRoute.ReRoute.DownstreamScheme; + //here we could have a lb factory that takes stuff or we could just get the load balancer from the reRoute? + //returns the lb for this request + + //lease the next address from the lb + var dsHostAndPort = DownstreamRoute.ReRoute.DownstreamHostAndPort(); var dsUrl = _urlBuilder.Build(dsPath.Data.Value, dsScheme, dsHostAndPort); if (dsUrl.IsError) { + //todo - release the lb connection? _logger.LogDebug("IUrlBuilder returned an error, setting pipeline error"); SetPipelineError(dsUrl.Errors); @@ -66,6 +72,8 @@ namespace Ocelot.DownstreamUrlCreator.Middleware await _next.Invoke(context); + //todo - release the lb connection? + _logger.LogDebug("succesfully called next middleware"); } } diff --git a/src/Ocelot/Errors/OcelotErrorCode.cs b/src/Ocelot/Errors/OcelotErrorCode.cs index 5de770cd..85c3e097 100644 --- a/src/Ocelot/Errors/OcelotErrorCode.cs +++ b/src/Ocelot/Errors/OcelotErrorCode.cs @@ -21,6 +21,8 @@ DownstreamPathTemplateContainsSchemeError, DownstreamPathNullOrEmptyError, DownstreamSchemeNullOrEmptyError, - DownstreamHostNullOrEmptyError + DownstreamHostNullOrEmptyError, + ServicesAreNullError, + ServicesAreEmptyError } } diff --git a/src/Ocelot/Values/HostAndPort.cs b/src/Ocelot/Values/HostAndPort.cs index cd336dec..f8769743 100644 --- a/src/Ocelot/Values/HostAndPort.cs +++ b/src/Ocelot/Values/HostAndPort.cs @@ -10,5 +10,10 @@ public string DownstreamHost { get; private set; } public int DownstreamPort { get; private set; } + + public override string ToString() + { + return $"{DownstreamHost}:{DownstreamPort}"; + } } } \ No newline at end of file diff --git a/test/Ocelot.UnitTests/LeastConnectionTests.cs b/test/Ocelot.UnitTests/LeastConnectionTests.cs new file mode 100644 index 00000000..758d1917 --- /dev/null +++ b/test/Ocelot.UnitTests/LeastConnectionTests.cs @@ -0,0 +1,362 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Ocelot.Errors; +using Ocelot.Responses; +using Ocelot.Values; +using Shouldly; +using TestStack.BDDfy; +using Xunit; + +namespace Ocelot.UnitTests +{ + public class LeastConnectionTests + { + private HostAndPort _hostAndPort; + private Response _result; + private LeastConnection _leastConnection; + private List _services; + + public LeastConnectionTests() + { + } + + [Fact] + public void should_get_next_url() + { + var serviceName = "products"; + + var hostAndPort = new HostAndPort("localhost", 80); + + var availableServices = new List + { + new Service(serviceName, hostAndPort) + }; + + this.Given(x => x.GivenAHostAndPort(hostAndPort)) + .And(x => x.GivenTheLoadBalancerStarts(availableServices, serviceName)) + .When(x => x.WhenIGetTheNextHostAndPort()) + .Then(x => x.ThenTheNextHostAndPortIsReturned()) + .BDDfy(); + } + + [Fact] + public void should_serve_from_service_with_least_connections() + { + var serviceName = "products"; + + var availableServices = new List + { + new Service(serviceName, new HostAndPort("127.0.0.1", 80)), + new Service(serviceName, new HostAndPort("127.0.0.2", 80)), + new Service(serviceName, new HostAndPort("127.0.0.3", 80)) + }; + + _services = availableServices; + _leastConnection = new LeastConnection(() => _services, serviceName); + + var response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[2].HostAndPort.DownstreamHost); + } + + [Fact] + public void should_build_connections_per_service() + { + var serviceName = "products"; + + var availableServices = new List + { + new Service(serviceName, new HostAndPort("127.0.0.1", 80)), + new Service(serviceName, new HostAndPort("127.0.0.2", 80)), + }; + + _services = availableServices; + _leastConnection = new LeastConnection(() => _services, serviceName); + + var response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost); + } + + [Fact] + public void should_release_connection() + { + var serviceName = "products"; + + var availableServices = new List + { + new Service(serviceName, new HostAndPort("127.0.0.1", 80)), + new Service(serviceName, new HostAndPort("127.0.0.2", 80)), + }; + + _services = availableServices; + _leastConnection = new LeastConnection(() => _services, serviceName); + + var response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[0].HostAndPort.DownstreamHost); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost); + + //release this so 2 should have 1 connection and we should get 2 back as our next host and port + _leastConnection.Release(availableServices[1].HostAndPort); + + response = _leastConnection.Lease(); + + response.Data.DownstreamHost.ShouldBe(availableServices[1].HostAndPort.DownstreamHost); + } + + [Fact] + public void should_return_error_if_services_are_null() + { + var serviceName = "products"; + + var hostAndPort = new HostAndPort("localhost", 80); + this.Given(x => x.GivenAHostAndPort(hostAndPort)) + .And(x => x.GivenTheLoadBalancerStarts(null, serviceName)) + .When(x => x.WhenIGetTheNextHostAndPort()) + .Then(x => x.ThenServiceAreNullErrorIsReturned()) + .BDDfy(); + } + + [Fact] + public void should_return_error_if_services_are_empty() + { + var serviceName = "products"; + + var hostAndPort = new HostAndPort("localhost", 80); + this.Given(x => x.GivenAHostAndPort(hostAndPort)) + .And(x => x.GivenTheLoadBalancerStarts(new List(), serviceName)) + .When(x => x.WhenIGetTheNextHostAndPort()) + .Then(x => x.ThenServiceAreEmptyErrorIsReturned()) + .BDDfy(); + } + + private void ThenServiceAreNullErrorIsReturned() + { + _result.IsError.ShouldBeTrue(); + _result.Errors[0].ShouldBeOfType(); + } + + private void ThenServiceAreEmptyErrorIsReturned() + { + _result.IsError.ShouldBeTrue(); + _result.Errors[0].ShouldBeOfType(); + } + + private void GivenTheLoadBalancerStarts(List services, string serviceName) + { + _services = services; + _leastConnection = new LeastConnection(() => _services, serviceName); + } + + private void WhenTheLoadBalancerStarts(List services, string serviceName) + { + GivenTheLoadBalancerStarts(services, serviceName); + } + + private void GivenAHostAndPort(HostAndPort hostAndPort) + { + _hostAndPort = hostAndPort; + } + + private void WhenIGetTheNextHostAndPort() + { + _result = _leastConnection.Lease(); + } + + private void ThenTheNextHostAndPortIsReturned() + { + _result.Data.DownstreamHost.ShouldBe(_hostAndPort.DownstreamHost); + _result.Data.DownstreamPort.ShouldBe(_hostAndPort.DownstreamPort); + } + } + + public class LeastConnection : ILoadBalancer + { + private Func> _services; + private List _leases; + private string _serviceName; + + public LeastConnection(Func> services, string serviceName) + { + _services = services; + _serviceName = serviceName; + _leases = new List(); + } + + public Response Lease() + { + var services = _services(); + + if(services == null) + { + return new ErrorResponse(new List(){ new ServicesAreNullError($"services were null for {_serviceName}")}); + } + + if(!services.Any()) + { + return new ErrorResponse(new List(){ new ServicesAreEmptyError($"services were empty for {_serviceName}")}); + } + + //todo - maybe this should be moved somewhere else...? Maybe on a repeater on seperate thread? loop every second and update or something? + UpdateServices(services); + + var leaseWithLeastConnections = GetLeaseWithLeastConnections(); + + _leases.Remove(leaseWithLeastConnections); + + leaseWithLeastConnections = AddConnection(leaseWithLeastConnections); + + _leases.Add(leaseWithLeastConnections); + + return new OkResponse(new HostAndPort(leaseWithLeastConnections.HostAndPort.DownstreamHost, leaseWithLeastConnections.HostAndPort.DownstreamPort)); + } + + public Response Release(HostAndPort hostAndPort) + { + var matchingLease = _leases.FirstOrDefault(l => l.HostAndPort.DownstreamHost == hostAndPort.DownstreamHost + && l.HostAndPort.DownstreamPort == hostAndPort.DownstreamPort); + + if(matchingLease != null) + { + var replacementLease = new Lease(hostAndPort, matchingLease.Connections - 1); + + _leases.Remove(matchingLease); + + _leases.Add(replacementLease); + } + + return new OkResponse(); + } + + private Lease AddConnection(Lease lease) + { + return new Lease(lease.HostAndPort, lease.Connections + 1); + } + + private Lease GetLeaseWithLeastConnections() + { + //now get the service with the least connections? + Lease leaseWithLeastConnections = null; + + for(var i = 0; i < _leases.Count; i++) + { + if(i == 0) + { + leaseWithLeastConnections = _leases[i]; + } + else + { + if(_leases[i].Connections < leaseWithLeastConnections.Connections) + { + leaseWithLeastConnections = _leases[i]; + } + } + } + + return leaseWithLeastConnections; + } + + private Response UpdateServices(List services) + { + if(_leases.Count > 0) + { + var leasesToRemove = new List(); + + foreach(var lease in _leases) + { + var match = services.FirstOrDefault(s => s.HostAndPort.DownstreamHost == lease.HostAndPort.DownstreamHost + && s.HostAndPort.DownstreamPort == lease.HostAndPort.DownstreamPort); + + if(match == null) + { + leasesToRemove.Add(lease); + } + } + + foreach(var lease in leasesToRemove) + { + _leases.Remove(lease); + } + + foreach(var service in services) + { + var exists = _leases.FirstOrDefault(l => l.HostAndPort.ToString() == service.HostAndPort.ToString()); + + if(exists == null) + { + _leases.Add(new Lease(service.HostAndPort, 0)); + } + } + } + else + { + foreach(var service in services) + { + _leases.Add(new Lease(service.HostAndPort, 0)); + } + } + + return new OkResponse(); + } + } + + public class Lease + { + public Lease(HostAndPort hostAndPort, int connections) + { + HostAndPort = hostAndPort; + Connections = connections; + } + public HostAndPort HostAndPort {get;private set;} + public int Connections {get;private set;} + } + + public class ServicesAreNullError : Error + { + public ServicesAreNullError(string message) + : base(message, OcelotErrorCode.ServicesAreNullError) + { + } + } + + public class ServicesAreEmptyError : Error + { + public ServicesAreEmptyError(string message) + : base(message, OcelotErrorCode.ServicesAreEmptyError) + { + } + } +} \ No newline at end of file diff --git a/test/Ocelot.UnitTests/RoundRobinTests.cs b/test/Ocelot.UnitTests/RoundRobinTests.cs index 82689b62..5b7da070 100644 --- a/test/Ocelot.UnitTests/RoundRobinTests.cs +++ b/test/Ocelot.UnitTests/RoundRobinTests.cs @@ -1,7 +1,9 @@ using System.Collections.Generic; using System.Diagnostics; +using Ocelot.Responses; using Ocelot.Values; using Shouldly; +using TestStack.BDDfy; using Xunit; namespace Ocelot.UnitTests @@ -10,6 +12,7 @@ namespace Ocelot.UnitTests { private readonly RoundRobin _roundRobin; private readonly List _hostAndPorts; + private Response _hostAndPort; public RoundRobinTests() { @@ -26,12 +29,13 @@ namespace Ocelot.UnitTests [Fact] public void should_get_next_address() { - var address = _roundRobin.Next(); - address.ShouldBe(_hostAndPorts[0]); - address = _roundRobin.Next(); - address.ShouldBe(_hostAndPorts[1]); - address = _roundRobin.Next(); - address.ShouldBe(_hostAndPorts[2]); + this.Given(x => x.GivenIGetTheNextAddress()) + .Then(x => x.ThenTheNextAddressIndexIs(0)) + .Given(x => x.GivenIGetTheNextAddress()) + .Then(x => x.ThenTheNextAddressIndexIs(1)) + .Given(x => x.GivenIGetTheNextAddress()) + .Then(x => x.ThenTheNextAddressIndexIs(2)) + .BDDfy(); } [Fact] @@ -41,19 +45,30 @@ namespace Ocelot.UnitTests while (stopWatch.ElapsedMilliseconds < 1000) { - var address = _roundRobin.Next(); - address.ShouldBe(_hostAndPorts[0]); - address = _roundRobin.Next(); - address.ShouldBe(_hostAndPorts[1]); - address = _roundRobin.Next(); - address.ShouldBe(_hostAndPorts[2]); + var address = _roundRobin.Lease(); + address.Data.ShouldBe(_hostAndPorts[0]); + address = _roundRobin.Lease(); + address.Data.ShouldBe(_hostAndPorts[1]); + address = _roundRobin.Lease(); + address.Data.ShouldBe(_hostAndPorts[2]); } } + + private void GivenIGetTheNextAddress() + { + _hostAndPort = _roundRobin.Lease(); + } + + private void ThenTheNextAddressIndexIs(int index) + { + _hostAndPort.Data.ShouldBe(_hostAndPorts[index]); + } } public interface ILoadBalancer { - HostAndPort Next(); + Response Lease(); + Response Release(HostAndPort hostAndPort); } public class RoundRobin : ILoadBalancer @@ -66,7 +81,7 @@ namespace Ocelot.UnitTests _hostAndPorts = hostAndPorts; } - public HostAndPort Next() + public Response Lease() { if (_last >= _hostAndPorts.Count) { @@ -75,7 +90,12 @@ namespace Ocelot.UnitTests var next = _hostAndPorts[_last]; _last++; - return next; + return new OkResponse(next); + } + + public Response Release(HostAndPort hostAndPort) + { + return new OkResponse(); } } } diff --git a/test/Ocelot.UnitTests/ServiceRegistryTests.cs b/test/Ocelot.UnitTests/ServiceRegistryTests.cs index 8866f1ae..27987d42 100644 --- a/test/Ocelot.UnitTests/ServiceRegistryTests.cs +++ b/test/Ocelot.UnitTests/ServiceRegistryTests.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using Ocelot.Values; using Shouldly; using TestStack.BDDfy; using Xunit; @@ -21,7 +22,7 @@ namespace Ocelot.UnitTests [Fact] public void should_register_service() { - this.Given(x => x.GivenAServiceToRegister("product", "localhost:5000")) + this.Given(x => x.GivenAServiceToRegister("product", "localhost:5000", 80)) .When(x => x.WhenIRegisterTheService()) .Then(x => x.ThenTheServiceIsRegistered()) .BDDfy(); @@ -29,7 +30,7 @@ namespace Ocelot.UnitTests public void should_lookup_service() { - this.Given(x => x.GivenAServiceIsRegistered("product", "localhost:600")) + this.Given(x => x.GivenAServiceIsRegistered("product", "localhost:600", 80)) .When(x => x.WhenILookupTheService("product")) .Then(x => x.ThenTheServiceDetailsAreReturned()) .BDDfy(); @@ -37,7 +38,8 @@ namespace Ocelot.UnitTests private void ThenTheServiceDetailsAreReturned() { - _services[0].Address.ShouldBe(_service.Address); + _services[0].HostAndPort.DownstreamHost.ShouldBe(_service.HostAndPort.DownstreamHost); + _services[0].HostAndPort.DownstreamPort.ShouldBe(_service.HostAndPort.DownstreamPort); _services[0].Name.ShouldBe(_service.Name); } @@ -46,15 +48,15 @@ namespace Ocelot.UnitTests _services = _serviceRegistry.Lookup(name); } - private void GivenAServiceIsRegistered(string name, string address) + private void GivenAServiceIsRegistered(string name, string address, int port) { - _service = new Service(name, address); + _service = new Service(name, new HostAndPort(address, port)); _serviceRepository.Set(_service); } - private void GivenAServiceToRegister(string name, string address) + private void GivenAServiceToRegister(string name, string address, int port) { - _service = new Service(name, address); + _service = new Service(name, new HostAndPort(address, port)); } private void WhenIRegisterTheService() @@ -65,7 +67,8 @@ namespace Ocelot.UnitTests private void ThenTheServiceIsRegistered() { var serviceNameAndAddress = _serviceRepository.Get(_service.Name); - serviceNameAndAddress[0].Address.ShouldBe(_service.Address); + serviceNameAndAddress[0].HostAndPort.DownstreamHost.ShouldBe(_service.HostAndPort.DownstreamHost); + serviceNameAndAddress[0].HostAndPort.DownstreamPort.ShouldBe(_service.HostAndPort.DownstreamPort); serviceNameAndAddress[0].Name.ShouldBe(_service.Name); } } @@ -96,13 +99,13 @@ namespace Ocelot.UnitTests public class Service { - public Service(string name, string address) + public Service(string name, HostAndPort hostAndPort) { Name = name; - Address = address; + HostAndPort = hostAndPort; } public string Name {get; private set;} - public string Address {get; private set;} + public HostAndPort HostAndPort {get; private set;} } public interface IServiceRepository