refactoring service discovery and load balancing approach into load balancing middleware

This commit is contained in:
TomPallister
2017-02-01 22:00:01 +00:00
54 changed files with 1160 additions and 547 deletions

View File

@ -201,13 +201,12 @@ namespace Ocelot.Configuration.Builder
public ReRoute Build()
{
Func<HostAndPort> downstreamHostFunc = () => new HostAndPort(_downstreamHost, _dsPort);
return new ReRoute(new DownstreamPathTemplate(_downstreamPathTemplate), _upstreamTemplate, _upstreamHttpMethod, _upstreamTemplatePattern,
_isAuthenticated, new AuthenticationOptions(_authenticationProvider, _authenticationProviderUrl, _scopeName,
_requireHttps, _additionalScopes, _scopeSecret), _configHeaderExtractorProperties, _claimToClaims, _routeClaimRequirement,
_isAuthorised, _claimToQueries, _requestIdHeaderKey, _isCached, _fileCacheOptions, _serviceName,
_useServiceDiscovery, _serviceDiscoveryAddress, _serviceDiscoveryProvider, downstreamHostFunc, _downstreamScheme, _loadBalancer);
_useServiceDiscovery, _serviceDiscoveryAddress, _serviceDiscoveryProvider, _downstreamScheme, _loadBalancer,
_downstreamHost, _dsPort);
}
}
}

View File

@ -6,6 +6,7 @@ using Microsoft.Extensions.Options;
using Ocelot.Configuration.File;
using Ocelot.Configuration.Parser;
using Ocelot.Configuration.Validator;
using Ocelot.LoadBalancer.LoadBalancers;
using Ocelot.Responses;
using Ocelot.ServiceDiscovery;
using Ocelot.Utilities;
@ -97,31 +98,6 @@ namespace Ocelot.Configuration.Creator
&& !string.IsNullOrEmpty(globalConfiguration?.ServiceDiscoveryProvider?.Address)
&& !string.IsNullOrEmpty(globalConfiguration?.ServiceDiscoveryProvider?.Provider);
//can we do the logic in this func to get the host and port from the load balancer?
//lBfactory.GetLbForThisDownstreamTemplate
//use it in the func to get the next host and port?
//how do we release it? cant callback, could access the lb and release later?
//ideal world we would get the host and port, then make the request using it, then release the connection to the lb
Func<HostAndPort> downstreamHostAndPortFunc = () => {
//service provider factory takes the reRoute
//can return no service provider (just use ocelot config)
//can return consol service provider
//returns a service provider
//we call get on the service provider
//could reutrn services from consol or just configuration.json
//this returns a list of services and we take the first one
var hostAndPort = new HostAndPort(reRoute.DownstreamHost.Trim('/'), reRoute.DownstreamPort);
var services = new List<Service>();
var serviceProvider = new NoServiceProvider(services);
var service = serviceProvider.Get();
var firstHostAndPort = service[0].HostAndPort;
return firstHostAndPort;
};
if (isAuthenticated)
{
var authOptionsForRoute = new AuthenticationOptions(reRoute.AuthenticationOptions.Provider,
@ -139,8 +115,8 @@ namespace Ocelot.Configuration.Creator
reRoute.RouteClaimsRequirement, isAuthorised, claimsToQueries,
requestIdKey, isCached, new CacheOptions(reRoute.FileCacheOptions.TtlSeconds),
reRoute.ServiceName, useServiceDiscovery, globalConfiguration?.ServiceDiscoveryProvider?.Provider,
globalConfiguration?.ServiceDiscoveryProvider?.Address, downstreamHostAndPortFunc, reRoute.DownstreamScheme,
reRoute.LoadBalancer);
globalConfiguration?.ServiceDiscoveryProvider?.Address, reRoute.DownstreamScheme,
reRoute.LoadBalancer, reRoute.DownstreamHost, reRoute.DownstreamPort);
}
return new ReRoute(new DownstreamPathTemplate(reRoute.DownstreamPathTemplate), reRoute.UpstreamTemplate,
@ -149,8 +125,8 @@ namespace Ocelot.Configuration.Creator
reRoute.RouteClaimsRequirement, isAuthorised, new List<ClaimToThing>(),
requestIdKey, isCached, new CacheOptions(reRoute.FileCacheOptions.TtlSeconds),
reRoute.ServiceName, useServiceDiscovery, globalConfiguration?.ServiceDiscoveryProvider?.Provider,
globalConfiguration?.ServiceDiscoveryProvider?.Address, downstreamHostAndPortFunc, reRoute.DownstreamScheme,
reRoute.LoadBalancer);
globalConfiguration?.ServiceDiscoveryProvider?.Address, reRoute.DownstreamScheme,
reRoute.LoadBalancer, reRoute.DownstreamHost, reRoute.DownstreamPort);
}
private string BuildUpstreamTemplate(FileReRoute reRoute)

View File

@ -10,10 +10,12 @@ namespace Ocelot.Configuration
bool isAuthenticated, AuthenticationOptions authenticationOptions, List<ClaimToThing> configurationHeaderExtractorProperties,
List<ClaimToThing> claimsToClaims, Dictionary<string, string> routeClaimsRequirement, bool isAuthorised, List<ClaimToThing> claimsToQueries,
string requestIdKey, bool isCached, CacheOptions fileCacheOptions, string serviceName, bool useServiceDiscovery,
string serviceDiscoveryProvider, string serviceDiscoveryAddress, Func<HostAndPort> downstreamHostAndPort,
string downstreamScheme, string loadBalancer)
string serviceDiscoveryProvider, string serviceDiscoveryAddress,
string downstreamScheme, string loadBalancer, string downstreamHost, int downstreamPort)
{
LoadBalancer = loadBalancer;
DownstreamHost = downstreamHost;
DownstreamPort = downstreamPort;
DownstreamPathTemplate = downstreamPathTemplate;
UpstreamTemplate = upstreamTemplate;
UpstreamHttpMethod = upstreamHttpMethod;
@ -35,7 +37,6 @@ namespace Ocelot.Configuration
UseServiceDiscovery = useServiceDiscovery;
ServiceDiscoveryProvider = serviceDiscoveryProvider;
ServiceDiscoveryAddress = serviceDiscoveryAddress;
DownstreamHostAndPort = downstreamHostAndPort;
DownstreamScheme = downstreamScheme;
}
public DownstreamPathTemplate DownstreamPathTemplate { get; private set; }
@ -56,8 +57,9 @@ namespace Ocelot.Configuration
public bool UseServiceDiscovery { get; private set;}
public string ServiceDiscoveryProvider { get; private set;}
public string ServiceDiscoveryAddress { get; private set;}
public Func<HostAndPort> DownstreamHostAndPort {get;private set;}
public string DownstreamScheme {get;private set;}
public string LoadBalancer {get;private set;}
public string DownstreamHost { get; private set; }
public int DownstreamPort { get; private set; }
}
}

View File

@ -6,6 +6,7 @@ using Ocelot.DownstreamUrlCreator.UrlTemplateReplacer;
using Ocelot.Infrastructure.RequestData;
using Ocelot.Logging;
using Ocelot.Middleware;
using Ocelot.Values;
namespace Ocelot.DownstreamUrlCreator.Middleware
{
@ -45,15 +46,10 @@ namespace Ocelot.DownstreamUrlCreator.Middleware
}
var dsScheme = DownstreamRoute.ReRoute.DownstreamScheme;
//here we could have a lb factory that takes stuff or we could just get the load balancer from the reRoute?
//returns the lb for this request
//lease the next address from the lb
//this could return the load balancer which you call next on, that gives you the host and port then you can call release in a try catch
//and if the call works?
var dsHostAndPort = DownstreamRoute.ReRoute.DownstreamHostAndPort();
//todo - get this out of scoped data repo?
var dsHostAndPort = new HostAndPort(DownstreamRoute.ReRoute.DownstreamHost,
DownstreamRoute.ReRoute.DownstreamPort);
var dsUrl = _urlBuilder.Build(dsPath.Data.Value, dsScheme, dsHostAndPort);

View File

@ -0,0 +1,11 @@
using Ocelot.Responses;
using Ocelot.Values;
namespace Ocelot.LoadBalancer.LoadBalancers
{
public interface ILoadBalancer
{
Response<HostAndPort> Lease();
Response Release(HostAndPort hostAndPort);
}
}

View File

@ -0,0 +1,7 @@
namespace Ocelot.LoadBalancer.LoadBalancers
{
public interface ILoadBalancerFactory
{
ILoadBalancer Get(string serviceName, string loadBalancer);
}
}

View File

@ -0,0 +1,15 @@
using Ocelot.Values;
namespace Ocelot.LoadBalancer.LoadBalancers
{
public class Lease
{
public Lease(HostAndPort hostAndPort, int connections)
{
HostAndPort = hostAndPort;
Connections = connections;
}
public HostAndPort HostAndPort { get; private set; }
public int Connections { get; private set; }
}
}

View File

@ -0,0 +1,139 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Ocelot.Errors;
using Ocelot.Responses;
using Ocelot.Values;
namespace Ocelot.LoadBalancer.LoadBalancers
{
public class LeastConnectionLoadBalancer : ILoadBalancer
{
private Func<List<Service>> _services;
private List<Lease> _leases;
private string _serviceName;
public LeastConnectionLoadBalancer(Func<List<Service>> services, string serviceName)
{
_services = services;
_serviceName = serviceName;
_leases = new List<Lease>();
}
public Response<HostAndPort> Lease()
{
var services = _services();
if (services == null)
{
return new ErrorResponse<HostAndPort>(new List<Error>() { new ServicesAreNullError($"services were null for {_serviceName}") });
}
if (!services.Any())
{
return new ErrorResponse<HostAndPort>(new List<Error>() { new ServicesAreEmptyError($"services were empty for {_serviceName}") });
}
//todo - maybe this should be moved somewhere else...? Maybe on a repeater on seperate thread? loop every second and update or something?
UpdateServices(services);
var leaseWithLeastConnections = GetLeaseWithLeastConnections();
_leases.Remove(leaseWithLeastConnections);
leaseWithLeastConnections = AddConnection(leaseWithLeastConnections);
_leases.Add(leaseWithLeastConnections);
return new OkResponse<HostAndPort>(new HostAndPort(leaseWithLeastConnections.HostAndPort.DownstreamHost, leaseWithLeastConnections.HostAndPort.DownstreamPort));
}
public Response Release(HostAndPort hostAndPort)
{
var matchingLease = _leases.FirstOrDefault(l => l.HostAndPort.DownstreamHost == hostAndPort.DownstreamHost
&& l.HostAndPort.DownstreamPort == hostAndPort.DownstreamPort);
if (matchingLease != null)
{
var replacementLease = new Lease(hostAndPort, matchingLease.Connections - 1);
_leases.Remove(matchingLease);
_leases.Add(replacementLease);
}
return new OkResponse();
}
private Lease AddConnection(Lease lease)
{
return new Lease(lease.HostAndPort, lease.Connections + 1);
}
private Lease GetLeaseWithLeastConnections()
{
//now get the service with the least connections?
Lease leaseWithLeastConnections = null;
for (var i = 0; i < _leases.Count; i++)
{
if (i == 0)
{
leaseWithLeastConnections = _leases[i];
}
else
{
if (_leases[i].Connections < leaseWithLeastConnections.Connections)
{
leaseWithLeastConnections = _leases[i];
}
}
}
return leaseWithLeastConnections;
}
private Response UpdateServices(List<Service> services)
{
if (_leases.Count > 0)
{
var leasesToRemove = new List<Lease>();
foreach (var lease in _leases)
{
var match = services.FirstOrDefault(s => s.HostAndPort.DownstreamHost == lease.HostAndPort.DownstreamHost
&& s.HostAndPort.DownstreamPort == lease.HostAndPort.DownstreamPort);
if (match == null)
{
leasesToRemove.Add(lease);
}
}
foreach (var lease in leasesToRemove)
{
_leases.Remove(lease);
}
foreach (var service in services)
{
var exists = _leases.FirstOrDefault(l => l.HostAndPort.ToString() == service.HostAndPort.ToString());
if (exists == null)
{
_leases.Add(new Lease(service.HostAndPort, 0));
}
}
}
else
{
foreach (var service in services)
{
_leases.Add(new Lease(service.HostAndPort, 0));
}
}
return new OkResponse();
}
}
}

View File

@ -0,0 +1,27 @@
using Ocelot.ServiceDiscovery;
namespace Ocelot.LoadBalancer.LoadBalancers
{
public class LoadBalancerFactory : ILoadBalancerFactory
{
private readonly IServiceProvider _serviceProvider;
public LoadBalancerFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public ILoadBalancer Get(string serviceName, string loadBalancer)
{
switch (loadBalancer)
{
case "RoundRobin":
return new RoundRobinLoadBalancer(_serviceProvider.Get());
case "LeastConnection":
return new LeastConnectionLoadBalancer(() => _serviceProvider.Get(), serviceName);
default:
return new NoLoadBalancer(_serviceProvider.Get());
}
}
}
}

View File

@ -0,0 +1,28 @@
using System.Collections.Generic;
using System.Linq;
using Ocelot.Responses;
using Ocelot.Values;
namespace Ocelot.LoadBalancer.LoadBalancers
{
public class NoLoadBalancer : ILoadBalancer
{
private readonly List<Service> _services;
public NoLoadBalancer(List<Service> services)
{
_services = services;
}
public Response<HostAndPort> Lease()
{
var service = _services.FirstOrDefault();
return new OkResponse<HostAndPort>(service.HostAndPort);
}
public Response Release(HostAndPort hostAndPort)
{
return new OkResponse();
}
}
}

View File

@ -0,0 +1,34 @@
using System.Collections.Generic;
using Ocelot.Responses;
using Ocelot.Values;
namespace Ocelot.LoadBalancer.LoadBalancers
{
public class RoundRobinLoadBalancer : ILoadBalancer
{
private readonly List<Service> _services;
private int _last;
public RoundRobinLoadBalancer(List<Service> services)
{
_services = services;
}
public Response<HostAndPort> Lease()
{
if (_last >= _services.Count)
{
_last = 0;
}
var next = _services[_last];
_last++;
return new OkResponse<HostAndPort>(next.HostAndPort);
}
public Response Release(HostAndPort hostAndPort)
{
return new OkResponse();
}
}
}

View File

@ -0,0 +1,12 @@
using Ocelot.Errors;
namespace Ocelot.LoadBalancer.LoadBalancers
{
public class ServicesAreEmptyError : Error
{
public ServicesAreEmptyError(string message)
: base(message, OcelotErrorCode.ServicesAreEmptyError)
{
}
}
}

View File

@ -0,0 +1,12 @@
using Ocelot.Errors;
namespace Ocelot.LoadBalancer.LoadBalancers
{
public class ServicesAreNullError : Error
{
public ServicesAreNullError(string message)
: base(message, OcelotErrorCode.ServicesAreNullError)
{
}
}
}

View File

@ -0,0 +1,67 @@
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Ocelot.Infrastructure.RequestData;
using Ocelot.LoadBalancer.LoadBalancers;
using Ocelot.Logging;
using Ocelot.Middleware;
using Ocelot.QueryStrings.Middleware;
using Ocelot.ServiceDiscovery;
namespace Ocelot.LoadBalancer.Middleware
{
public class LoadBalancingMiddleware : OcelotMiddleware
{
private readonly RequestDelegate _next;
private readonly IOcelotLogger _logger;
public LoadBalancingMiddleware(RequestDelegate next,
IOcelotLoggerFactory loggerFactory,
IRequestScopedDataRepository requestScopedDataRepository)
: base(requestScopedDataRepository)
{
_next = next;
_logger = loggerFactory.CreateLogger<QueryStringBuilderMiddleware>();
}
public async Task Invoke(HttpContext context)
{
_logger.LogDebug("started calling query string builder middleware");
//todo - get out of di? or do this when we bootstrap?
var serviceProviderFactory = new ServiceProviderFactory();
var serviceConfig = new ServiceConfiguraion(
DownstreamRoute.ReRoute.ServiceName,
DownstreamRoute.ReRoute.DownstreamHost,
DownstreamRoute.ReRoute.DownstreamPort,
DownstreamRoute.ReRoute.UseServiceDiscovery);
//todo - get this out of some kind of service provider house?
var serviceProvider = serviceProviderFactory.Get(serviceConfig);
//todo - get out of di? or do this when we bootstrap?
var loadBalancerFactory = new LoadBalancerFactory(serviceProvider);
//todo - currently instanciates a load balancer per request which is wrong,
//need some kind of load balance house! :)
var loadBalancer = loadBalancerFactory.Get(DownstreamRoute.ReRoute.ServiceName, DownstreamRoute.ReRoute.LoadBalancer);
var response = loadBalancer.Lease();
_logger.LogDebug("calling next middleware");
//todo - try next middleware if we get an exception make sure we release
//the host and port? Not sure if this is the way to go but we shall see!
try
{
await _next.Invoke(context);
loadBalancer.Release(response.Data);
}
catch (Exception exception)
{
loadBalancer.Release(response.Data);
throw;
}
_logger.LogDebug("succesfully called next middleware");
}
}
}

View File

@ -3,11 +3,11 @@ using Ocelot.Values;
namespace Ocelot.ServiceDiscovery
{
public class NoServiceProvider : IServiceProvider
public class ConfigurationServiceProvider : IServiceProvider
{
private List<Service> _services;
public NoServiceProvider(List<Service> services)
public ConfigurationServiceProvider(List<Service> services)
{
_services = services;
}

View File

@ -1,9 +1,7 @@
using Ocelot.Configuration;
namespace Ocelot.ServiceDiscovery
{
public interface IServiceProviderFactory
{
Ocelot.ServiceDiscovery.IServiceProvider Get(ReRoute reRoute);
IServiceProvider Get(ServiceConfiguraion serviceConfig);
}
}

View File

@ -1,13 +1,34 @@
using System;
using Ocelot.Configuration;
using System.Collections.Generic;
using Ocelot.Values;
namespace Ocelot.ServiceDiscovery
{
public class ServiceProviderFactory : IServiceProviderFactory
{
public Ocelot.ServiceDiscovery.IServiceProvider Get(ReRoute reRoute)
public IServiceProvider Get(ServiceConfiguraion serviceConfig)
{
throw new NotImplementedException();
var services = new List<Service>()
{
new Service(serviceConfig.ServiceName, new HostAndPort(serviceConfig.DownstreamHost, serviceConfig.DownstreamPort))
};
return new ConfigurationServiceProvider(services);
}
}
public class ServiceConfiguraion
{
public ServiceConfiguraion(string serviceName, string downstreamHost, int downstreamPort, bool useServiceDiscovery)
{
ServiceName = serviceName;
DownstreamHost = downstreamHost;
DownstreamPort = downstreamPort;
UseServiceDiscovery = useServiceDiscovery;
}
public string ServiceName { get; }
public string DownstreamHost { get; }
public int DownstreamPort { get; }
public bool UseServiceDiscovery { get; }
}
}

View File

@ -4,16 +4,11 @@
{
public HostAndPort(string downstreamHost, int downstreamPort)
{
DownstreamHost = downstreamHost;
DownstreamHost = downstreamHost?.Trim('/');
DownstreamPort = downstreamPort;
}
public string DownstreamHost { get; private set; }
public int DownstreamPort { get; private set; }
public override string ToString()
{
return $"{DownstreamHost}:{DownstreamPort}";
}
}
}

View File

@ -1,5 +1,5 @@
{
"version": "1.0.0-*",
"version": "0.0.0-dev",
"dependencies": {
"Microsoft.AspNetCore.Server.IISIntegration": "1.1.0",