Feature/poll consul (#392)

* WIP - implement a consul service discovery poller, lots of shared code with existing, refactor next and a todo in the docs to finish

* #374 implement polling for consul as option

* #374 updated docs to remove todo

* #374 fixed failing unit test

* #374 fixed failing unit test

* #374 fixed failing acceptance test
This commit is contained in:
Tom Pallister 2018-06-12 00:58:08 +03:00 committed by GitHub
parent 14308ff5fb
commit 0f2a9c1d0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 649 additions and 369 deletions

View File

@ -43,6 +43,21 @@ and LeastConnection algorithm you can use. If no load balancer is specified Ocel
When this is set up Ocelot will lookup the downstream host and port from the service discover provider and load balance requests across any available services. When this is set up Ocelot will lookup the downstream host and port from the service discover provider and load balance requests across any available services.
A lot of people have asked me to implement a feature where Ocelot polls consul for latest service information rather than per request. If you want to poll consul for the latest services rather than per request (default behaviour) then you need to set the following configuration.
.. code-block:: json
"ServiceDiscoveryProvider": {
"Host": "localhost",
"Port": 9500,
"Type": "PollConsul",
"PollingInteral": 100
}
The polling interval is in milliseconds and tells Ocelot how often to call Consul for changes in service configuration.
Please note there are tradeoffs here. If you poll Consul it is possible Ocelot will not know if a service is down depending on your polling interval and you might get more errors than if you get the latest services per request. This really depends on how volitile your services are. I doubt it will matter for most people and polling may give a tiny performance improvement over calling consul per request (as sidecar agent). If you are calling a remote consul agent then polling will be a good performance improvement.
ACL Token ACL Token
--------- ---------

View File

@ -7,6 +7,7 @@ namespace Ocelot.Configuration.Builder
private string _type; private string _type;
private string _token; private string _token;
private string _configurationKey; private string _configurationKey;
private int _pollingInterval;
public ServiceProviderConfigurationBuilder WithHost(string serviceDiscoveryProviderHost) public ServiceProviderConfigurationBuilder WithHost(string serviceDiscoveryProviderHost)
{ {
@ -38,9 +39,15 @@ namespace Ocelot.Configuration.Builder
return this; return this;
} }
public ServiceProviderConfigurationBuilder WithPollingInterval(int pollingInterval)
{
_pollingInterval = pollingInterval;
return this;
}
public ServiceProviderConfiguration Build() public ServiceProviderConfiguration Build()
{ {
return new ServiceProviderConfiguration(_type, _serviceDiscoveryProviderHost, _serviceDiscoveryProviderPort, _token, _configurationKey); return new ServiceProviderConfiguration(_type, _serviceDiscoveryProviderHost, _serviceDiscoveryProviderPort, _token, _configurationKey, _pollingInterval);
} }
} }
} }

View File

@ -9,6 +9,7 @@ namespace Ocelot.Configuration.Creator
{ {
var port = globalConfiguration?.ServiceDiscoveryProvider?.Port ?? 0; var port = globalConfiguration?.ServiceDiscoveryProvider?.Port ?? 0;
var host = globalConfiguration?.ServiceDiscoveryProvider?.Host ?? "consul"; var host = globalConfiguration?.ServiceDiscoveryProvider?.Host ?? "consul";
var pollingInterval = globalConfiguration?.ServiceDiscoveryProvider?.PollingInterval ?? 0;
return new ServiceProviderConfigurationBuilder() return new ServiceProviderConfigurationBuilder()
.WithHost(host) .WithHost(host)
@ -16,6 +17,7 @@ namespace Ocelot.Configuration.Creator
.WithType(globalConfiguration?.ServiceDiscoveryProvider?.Type) .WithType(globalConfiguration?.ServiceDiscoveryProvider?.Type)
.WithToken(globalConfiguration?.ServiceDiscoveryProvider?.Token) .WithToken(globalConfiguration?.ServiceDiscoveryProvider?.Token)
.WithConfigurationKey(globalConfiguration?.ServiceDiscoveryProvider?.ConfigurationKey) .WithConfigurationKey(globalConfiguration?.ServiceDiscoveryProvider?.ConfigurationKey)
.WithPollingInterval(pollingInterval)
.Build(); .Build();
} }
} }

View File

@ -7,5 +7,6 @@ namespace Ocelot.Configuration.File
public string Type { get; set; } public string Type { get; set; }
public string Token { get; set; } public string Token { get; set; }
public string ConfigurationKey { get; set; } public string ConfigurationKey { get; set; }
public int PollingInterval { get; set; }
} }
} }

View File

@ -2,13 +2,14 @@
{ {
public class ServiceProviderConfiguration public class ServiceProviderConfiguration
{ {
public ServiceProviderConfiguration(string type, string host, int port, string token, string configurationKey) public ServiceProviderConfiguration(string type, string host, int port, string token, string configurationKey, int pollingInterval)
{ {
ConfigurationKey = configurationKey; ConfigurationKey = configurationKey;
Host = host; Host = host;
Port = port; Port = port;
Token = token; Token = token;
Type = type; Type = type;
PollingInterval = pollingInterval;
} }
public string Host { get; } public string Host { get; }
@ -16,5 +17,6 @@
public string Type { get; } public string Type { get; }
public string Token { get; } public string Token { get; }
public string ConfigurationKey { get; } public string ConfigurationKey { get; }
public int PollingInterval { get; }
} }
} }

View File

@ -0,0 +1,56 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Consul;
using Ocelot.Infrastructure.Consul;
using Ocelot.Infrastructure.Extensions;
using Ocelot.Logging;
using Ocelot.ServiceDiscovery.Configuration;
using Ocelot.Values;
namespace Ocelot.ServiceDiscovery.Providers
{
public class PollingConsulServiceDiscoveryProvider : IServiceDiscoveryProvider
{
private readonly IOcelotLogger _logger;
private readonly IServiceDiscoveryProvider _consulServiceDiscoveryProvider;
private readonly Timer _timer;
private bool _polling;
private List<Service> _services;
private string _keyOfServiceInConsul;
public PollingConsulServiceDiscoveryProvider(int pollingInterval, string keyOfServiceInConsul, IOcelotLoggerFactory factory, IServiceDiscoveryProvider consulServiceDiscoveryProvider)
{;
_logger = factory.CreateLogger<PollingConsulServiceDiscoveryProvider>();
_keyOfServiceInConsul = keyOfServiceInConsul;
_consulServiceDiscoveryProvider = consulServiceDiscoveryProvider;
_services = new List<Service>();
_timer = new Timer(async x =>
{
if(_polling)
{
return;
}
_polling = true;
await Poll();
_polling = false;
}, null, pollingInterval, pollingInterval);
}
public Task<List<Service>> Get()
{
return Task.FromResult(_services);
}
private async Task Poll()
{
_services = await _consulServiceDiscoveryProvider.Get();
}
}
}

View File

@ -56,7 +56,15 @@ namespace Ocelot.ServiceDiscovery
} }
var consulRegistryConfiguration = new ConsulRegistryConfiguration(serviceConfig.Host, serviceConfig.Port, serviceName, serviceConfig.Token); var consulRegistryConfiguration = new ConsulRegistryConfiguration(serviceConfig.Host, serviceConfig.Port, serviceName, serviceConfig.Token);
return new ConsulServiceDiscoveryProvider(consulRegistryConfiguration, _factory, _consulFactory);
var consulServiceDiscoveryProvider = new ConsulServiceDiscoveryProvider(consulRegistryConfiguration, _factory, _consulFactory);
if (serviceConfig.Type?.ToLower() == "pollconsul")
{
return new PollingConsulServiceDiscoveryProvider(serviceConfig.PollingInterval, consulRegistryConfiguration.KeyOfServiceInConsul, _factory, consulServiceDiscoveryProvider);
}
return consulServiceDiscoveryProvider;
} }
} }
} }

View File

@ -458,6 +458,64 @@ namespace Ocelot.AcceptanceTests
.BDDfy(); .BDDfy();
} }
[Fact]
public void should_handle_request_to_poll_consul_for_downstream_service_and_make_request()
{
const int consulPort = 8518;
const string serviceName = "web";
const int downstreamServicePort = 8082;
var downstreamServiceOneUrl = $"http://localhost:{downstreamServicePort}";
var fakeConsulServiceDiscoveryUrl = $"http://localhost:{consulPort}";
var serviceEntryOne = new ServiceEntry()
{
Service = new AgentService()
{
Service = serviceName,
Address = "localhost",
Port = downstreamServicePort,
ID = $"web_90_0_2_224_{downstreamServicePort}",
Tags = new[] {"version-v1"}
},
};
var configuration = new FileConfiguration
{
ReRoutes = new List<FileReRoute>
{
new FileReRoute
{
DownstreamPathTemplate = "/api/home",
DownstreamScheme = "http",
UpstreamPathTemplate = "/home",
UpstreamHttpMethod = new List<string> { "Get", "Options" },
ServiceName = serviceName,
LoadBalancerOptions = new FileLoadBalancerOptions { Type = "LeastConnection" },
UseServiceDiscovery = true,
}
},
GlobalConfiguration = new FileGlobalConfiguration()
{
ServiceDiscoveryProvider = new FileServiceDiscoveryProvider()
{
Host = "localhost",
Port = consulPort,
Type = "PollConsul",
PollingInterval = 0
}
}
};
this.Given(x => x.GivenThereIsAServiceRunningOn(downstreamServiceOneUrl, "/api/home", 200, "Hello from Laura"))
.And(x => x.GivenThereIsAFakeConsulServiceDiscoveryProvider(fakeConsulServiceDiscoveryUrl, serviceName))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceEntryOne))
.And(x => _steps.GivenThereIsAConfiguration(configuration))
.And(x => _steps.GivenOcelotIsRunning())
.When(x => _steps.WhenIGetUrlOnTheApiGatewayWaitingForTheResponseToBeOk("/home"))
.Then(x => _steps.ThenTheStatusCodeShouldBe(HttpStatusCode.OK))
.And(x => _steps.ThenTheResponseBodyShouldBe("Hello from Laura"))
.BDDfy();
}
private void WhenIAddAServiceBackIn(ServiceEntry serviceEntryTwo) private void WhenIAddAServiceBackIn(ServiceEntry serviceEntryTwo)
{ {
_consulServices.Add(serviceEntryTwo); _consulServices.Add(serviceEntryTwo);

View File

@ -27,6 +27,7 @@ using System.Text;
using static Ocelot.AcceptanceTests.HttpDelegatingHandlersTests; using static Ocelot.AcceptanceTests.HttpDelegatingHandlersTests;
using Ocelot.Requester; using Ocelot.Requester;
using Ocelot.Middleware.Multiplexer; using Ocelot.Middleware.Multiplexer;
using static Ocelot.Infrastructure.Wait;
namespace Ocelot.AcceptanceTests namespace Ocelot.AcceptanceTests
{ {
@ -675,6 +676,24 @@ namespace Ocelot.AcceptanceTests
_response = _ocelotClient.GetAsync(url).Result; _response = _ocelotClient.GetAsync(url).Result;
} }
public void WhenIGetUrlOnTheApiGatewayWaitingForTheResponseToBeOk(string url)
{
var result = WaitFor(2000).Until(() => {
try
{
_response = _ocelotClient.GetAsync(url).Result;
_response.EnsureSuccessStatusCode();
return true;
}
catch(Exception)
{
return false;
}
});
result.ShouldBeTrue();
}
public void WhenIGetUrlOnTheApiGateway(string url, string cookie, string value) public void WhenIGetUrlOnTheApiGateway(string url, string cookie, string value)
{ {
var request = _ocelotServer.CreateRequest(url); var request = _ocelotServer.CreateRequest(url);

View File

@ -26,7 +26,7 @@ namespace Ocelot.UnitTests.LoadBalancer
{ {
_factory = new Mock<ILoadBalancerFactory>(); _factory = new Mock<ILoadBalancerFactory>();
_loadBalancerHouse = new LoadBalancerHouse(_factory.Object); _loadBalancerHouse = new LoadBalancerHouse(_factory.Object);
_serviceProviderConfig = new ServiceProviderConfiguration("myType","myHost",123, string.Empty, "configKey"); _serviceProviderConfig = new ServiceProviderConfiguration("myType","myHost",123, string.Empty, "configKey", 0);
} }
[Fact] [Fact]

View File

@ -0,0 +1,89 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Consul;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Moq;
using Ocelot.Infrastructure.Consul;
using Ocelot.Logging;
using Ocelot.ServiceDiscovery.Configuration;
using Ocelot.ServiceDiscovery.Providers;
using Ocelot.Values;
using Xunit;
using TestStack.BDDfy;
using Shouldly;
using static Ocelot.Infrastructure.Wait;
namespace Ocelot.UnitTests.ServiceDiscovery
{
public class PollingConsulServiceDiscoveryProviderTests
{
private readonly int _delay;
private PollingConsulServiceDiscoveryProvider _provider;
private readonly string _serviceName;
private List<Service> _services;
private readonly Mock<IOcelotLoggerFactory> _factory;
private readonly Mock<IOcelotLogger> _logger;
private Mock<IServiceDiscoveryProvider> _consulServiceDiscoveryProvider;
private List<Service> _result;
public PollingConsulServiceDiscoveryProviderTests()
{
_services = new List<Service>();
_delay = 1;
_factory = new Mock<IOcelotLoggerFactory>();
_logger = new Mock<IOcelotLogger>();
_factory.Setup(x => x.CreateLogger<PollingConsulServiceDiscoveryProvider>()).Returns(_logger.Object);
_consulServiceDiscoveryProvider = new Mock<IServiceDiscoveryProvider>();
}
[Fact]
public void should_return_service_from_consul()
{
var service = new Service("", new ServiceHostAndPort("", 0), "", "", new List<string>());
this.Given(x => GivenConsulReturns(service))
.When(x => WhenIGetTheServices(1))
.Then(x => ThenTheCountIs(1))
.BDDfy();
}
private void GivenConsulReturns(Service service)
{
_services.Add(service);
_consulServiceDiscoveryProvider.Setup(x => x.Get()).ReturnsAsync(_services);
}
private void ThenTheCountIs(int count)
{
_result.Count.ShouldBe(count);
}
private void WhenIGetTheServices(int expected)
{
_provider = new PollingConsulServiceDiscoveryProvider(_delay, _serviceName, _factory.Object, _consulServiceDiscoveryProvider.Object);
var result = WaitFor(3000).Until(() => {
try
{
_result = _provider.Get().GetAwaiter().GetResult();
if(_result.Count == expected)
{
return true;
}
return false;
}
catch(Exception)
{
return false;
}
});
result.ShouldBeTrue();
}
}
}

View File

@ -24,12 +24,16 @@ namespace Ocelot.UnitTests.ServiceDiscovery
private DownstreamReRoute _reRoute; private DownstreamReRoute _reRoute;
private Mock<IOcelotLoggerFactory> _loggerFactory; private Mock<IOcelotLoggerFactory> _loggerFactory;
private Mock<IDiscoveryClient> _discoveryClient; private Mock<IDiscoveryClient> _discoveryClient;
private Mock<IOcelotLogger> _logger;
public ServiceProviderFactoryTests() public ServiceProviderFactoryTests()
{ {
_loggerFactory = new Mock<IOcelotLoggerFactory>(); _loggerFactory = new Mock<IOcelotLoggerFactory>();
_logger = new Mock<IOcelotLogger>();
_loggerFactory.Setup(x => x.CreateLogger<PollingConsulServiceDiscoveryProvider>()).Returns(_logger.Object);
_discoveryClient = new Mock<IDiscoveryClient>(); _discoveryClient = new Mock<IDiscoveryClient>();
_factory = new ServiceDiscoveryProviderFactory(_loggerFactory.Object, new ConsulClientFactory(), _discoveryClient.Object); var consulClient = new Mock<IConsulClientFactory>();
_factory = new ServiceDiscoveryProviderFactory(_loggerFactory.Object, consulClient.Object, _discoveryClient.Object);
} }
[Fact] [Fact]
@ -84,6 +88,25 @@ namespace Ocelot.UnitTests.ServiceDiscovery
.BDDfy(); .BDDfy();
} }
[Fact]
public void should_return_polling_consul_service_provider()
{
var reRoute = new DownstreamReRouteBuilder()
.WithServiceName("product")
.WithUseServiceDiscovery(true)
.Build();
var serviceConfig = new ServiceProviderConfigurationBuilder()
.WithType("PollConsul")
.WithPollingInterval(100000)
.Build();
this.Given(x => x.GivenTheReRoute(serviceConfig, reRoute))
.When(x => x.WhenIGetTheServiceProvider())
.Then(x => x.ThenTheServiceProviderIs<PollingConsulServiceDiscoveryProvider>())
.BDDfy();
}
[Fact] [Fact]
public void should_return_service_fabric_provider() public void should_return_service_fabric_provider()
{ {