add PollingKubeServiceDiscovery

This commit is contained in:
geffzhang 2019-01-30 09:32:00 +08:00
parent 0afceb40f5
commit 580106aa50
3 changed files with 136 additions and 0 deletions

View File

@ -33,6 +33,10 @@ namespace Ocelot.Provider.Kubernetes
var k8sServiceDiscoveryProvider = new Kube(k8sRegistryConfiguration, factory, kubeClientFactory); var k8sServiceDiscoveryProvider = new Kube(k8sRegistryConfiguration, factory, kubeClientFactory);
if (config.Type?.ToLower() == "pollkube")
{
return new PollKube(config.PollingInterval, factory, k8sServiceDiscoveryProvider);
}
return k8sServiceDiscoveryProvider; return k8sServiceDiscoveryProvider;
} }
} }

View File

@ -0,0 +1,50 @@
using Ocelot.Logging;
using Ocelot.ServiceDiscovery.Providers;
using Ocelot.Values;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Ocelot.Provider.Kubernetes
{
public class PollKube : IServiceDiscoveryProvider
{
private readonly IOcelotLogger _logger;
private readonly IServiceDiscoveryProvider _kubeServiceDiscoveryProvider;
private readonly Timer _timer;
private bool _polling;
private List<Service> _services;
public PollKube(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider)
{
_logger = factory.CreateLogger<PollKube>();
_kubeServiceDiscoveryProvider = kubeServiceDiscoveryProvider;
_services = new List<Service>();
_timer = new Timer(async x =>
{
if (_polling)
{
return;
}
_polling = true;
await Poll();
_polling = false;
}, null, pollingInterval, pollingInterval);
}
public Task<List<Service>> Get()
{
return Task.FromResult(_services);
}
private async Task Poll()
{
_services = await _kubeServiceDiscoveryProvider.Get();
}
}
}

View File

@ -0,0 +1,82 @@
using Moq;
using Ocelot.Infrastructure;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes;
using Ocelot.ServiceDiscovery.Providers;
using Ocelot.Values;
using Shouldly;
using System;
using System.Collections.Generic;
using System.Text;
using TestStack.BDDfy;
using Xunit;
namespace Ocelot.UnitTests.Kubernetes
{
public class PollingKubeServiceDiscoveryProviderTests
{
private readonly int _delay;
private PollKube _provider;
private readonly List<Service> _services;
private readonly Mock<IOcelotLoggerFactory> _factory;
private readonly Mock<IOcelotLogger> _logger;
private readonly Mock<IServiceDiscoveryProvider> _kubeServiceDiscoveryProvider;
private List<Service> _result;
public PollingKubeServiceDiscoveryProviderTests()
{
_services = new List<Service>();
_delay = 1;
_factory = new Mock<IOcelotLoggerFactory>();
_logger = new Mock<IOcelotLogger>();
_factory.Setup(x => x.CreateLogger<PollKube>()).Returns(_logger.Object);
_kubeServiceDiscoveryProvider = new Mock<IServiceDiscoveryProvider>();
}
[Fact]
public void should_return_service_from_kube()
{
var service = new Service("", new ServiceHostAndPort("", 0), "", "", new List<string>());
this.Given(x => GivenKubeReturns(service))
.When(x => WhenIGetTheServices(1))
.Then(x => ThenTheCountIs(1))
.BDDfy();
}
private void GivenKubeReturns(Service service)
{
_services.Add(service);
_kubeServiceDiscoveryProvider.Setup(x => x.Get()).ReturnsAsync(_services);
}
private void ThenTheCountIs(int count)
{
_result.Count.ShouldBe(count);
}
private void WhenIGetTheServices(int expected)
{
_provider = new PollKube(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object);
var result = Wait.WaitFor(3000).Until(() => {
try
{
_result = _provider.Get().GetAwaiter().GetResult();
if (_result.Count == expected)
{
return true;
}
return false;
}
catch (Exception)
{
return false;
}
});
result.ShouldBeTrue();
}
}
}