Refactored k8s endpoints PR +semver: major

This commit is contained in:
TomPallister 2020-04-11 11:48:47 +01:00
commit 6e5471a714
7 changed files with 105 additions and 72 deletions

View File

@ -1,7 +1,9 @@
Kubernetes Kubernetes
============== ==============
This feature was requested as part of `Issue 345 <https://github.com/ThreeMammals/Ocelot/issues/345>`_ . to add support for kubernetes's service discovery provider. This feature was requested as part of `Issue 345 <https://github.com/ThreeMammals/Ocelot/issues/345>`_ . to add support for kubernetes's provider.
Ocelot will call the k8s endpoints API in a given namespace to get all of the endpoints for a pod and then load balance across them. Ocelot used to use the services api to send requests to the k8s service but this was changed in `PR 1134 <https://github.com/ThreeMammals/Ocelot/pull/1134>`_ because the service did not load balance as expected.
The first thing you need to do is install the NuGet package that provides kubernetes support in Ocelot. The first thing you need to do is install the NuGet package that provides kubernetes support in Ocelot.
@ -76,7 +78,7 @@ The polling interval is in milliseconds and tells Ocelot how often to call kuber
Please note there are tradeoffs here. If you poll kubernetes it is possible Ocelot will not know if a service is down depending on your polling interval and you might get more errors than if you get the latest services per request. This really depends on how volatile your services are. I doubt it will matter for most people and polling may give a tiny performance improvement over calling kubernetes per request. Please note there are tradeoffs here. If you poll kubernetes it is possible Ocelot will not know if a service is down depending on your polling interval and you might get more errors than if you get the latest services per request. This really depends on how volatile your services are. I doubt it will matter for most people and polling may give a tiny performance improvement over calling kubernetes per request.
There is no way for Ocelot to work these out for you. There is no way for Ocelot to work these out for you.
If your downstream service resides in a different namespace you can override the global setting at the ReRoute level by specifying a ServiceNamespace If your downstream service resides in a different namespace you can override the global setting at the ReRoute level by specifying a ServiceNamespace.
.. code-block:: json .. code-block:: json

View File

@ -0,0 +1,38 @@
using HTTPlease;
using KubeClient;
using KubeClient.Models;
using KubeClient.ResourceClients;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Ocelot.Provider.Kubernetes.KubeApiClientExtensions
{
public class EndPointClientV1 : KubeResourceClient
{
private readonly HttpRequest _collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
public EndPointClientV1(IKubeApiClient client) : base(client)
{
}
public async Task<EndpointsV1> Get(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName)) throw new ArgumentNullException(nameof(serviceName));
var response = await Http.GetAsync(
_collection.WithTemplateParameters(new
{
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
ServiceName = serviceName
}),
cancellationToken
);
if (response.IsSuccessStatusCode)
return await response.ReadContentAsAsync<EndpointsV1>();
return null;
}
}
}

View File

@ -6,57 +6,52 @@ using Ocelot.Values;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ocelot.Provider.Kubernetes.KubeApiClientExtensions;
namespace Ocelot.Provider.Kubernetes namespace Ocelot.Provider.Kubernetes
{ {
public class Kube : IServiceDiscoveryProvider public class KubernetesServiceDiscoveryProvider : IServiceDiscoveryProvider
{ {
private KubeRegistryConfiguration kubeRegistryConfiguration; private readonly KubeRegistryConfiguration _kubeRegistryConfiguration;
private IOcelotLogger logger; private readonly IOcelotLogger _logger;
private IKubeApiClient kubeApi; private readonly IKubeApiClient _kubeApi;
public Kube(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi) public KubernetesServiceDiscoveryProvider(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi)
{ {
this.kubeRegistryConfiguration = kubeRegistryConfiguration; _kubeRegistryConfiguration = kubeRegistryConfiguration;
this.logger = factory.CreateLogger<Kube>(); _logger = factory.CreateLogger<KubernetesServiceDiscoveryProvider>();
this.kubeApi = kubeApi; _kubeApi = kubeApi;
} }
public async Task<List<Service>> Get() public async Task<List<Service>> Get()
{ {
var service = await kubeApi.ServicesV1().Get(kubeRegistryConfiguration.KeyOfServiceInK8s, kubeRegistryConfiguration.KubeNamespace); var endpoint = await _kubeApi
.ResourceClient(client => new EndPointClientV1(client))
.Get(_kubeRegistryConfiguration.KeyOfServiceInK8s, _kubeRegistryConfiguration.KubeNamespace);
var services = new List<Service>(); var services = new List<Service>();
if (IsValid(service)) if (endpoint != null && endpoint.Subsets.Any())
{ {
services.Add(BuildService(service)); services.AddRange(BuildServices(endpoint));
} }
else else
{ {
logger.LogWarning($"namespace:{kubeRegistryConfiguration.KubeNamespace }service:{kubeRegistryConfiguration.KeyOfServiceInK8s} Unable to use ,it is invalid. Address must contain host only e.g. localhost and port must be greater than 0"); _logger.LogWarning($"namespace:{_kubeRegistryConfiguration.KubeNamespace }service:{_kubeRegistryConfiguration.KeyOfServiceInK8s} Unable to use ,it is invalid. Address must contain host only e.g. localhost and port must be greater than 0");
} }
return services; return services;
} }
private bool IsValid(ServiceV1 service) private List<Service> BuildServices(EndpointsV1 endpoint)
{ {
if (string.IsNullOrEmpty(service.Spec.ClusterIP) || service.Spec.Ports.Count <= 0) var services = new List<Service>();
{
return false;
}
return true; foreach (var subset in endpoint.Subsets)
}
private Service BuildService(ServiceV1 serviceEntry)
{ {
var servicePort = serviceEntry.Spec.Ports.FirstOrDefault(); services.AddRange(subset.Addresses.Select(address => new Service(endpoint.Metadata.Name,
return new Service( new ServiceHostAndPort(address.Ip, subset.Ports.First().Port),
serviceEntry.Metadata.Name, endpoint.Metadata.Uid, string.Empty, Enumerable.Empty<string>())));
new ServiceHostAndPort(serviceEntry.Spec.ClusterIP, servicePort.Port), }
serviceEntry.Metadata.Uid, return services;
string.Empty,
Enumerable.Empty<string>());
} }
} }
} }

View File

@ -12,22 +12,24 @@ namespace Ocelot.Provider.Kubernetes
public static ServiceDiscoveryFinderDelegate Get = (provider, config, reRoute) => public static ServiceDiscoveryFinderDelegate Get = (provider, config, reRoute) =>
{ {
var factory = provider.GetService<IOcelotLoggerFactory>(); var factory = provider.GetService<IOcelotLoggerFactory>();
return GetkubeProvider(provider, config, reRoute, factory); return GetKubeProvider(provider, config, reRoute, factory);
}; };
private static ServiceDiscovery.Providers.IServiceDiscoveryProvider GetkubeProvider(IServiceProvider provider, Configuration.ServiceProviderConfiguration config, DownstreamReRoute reRoute, IOcelotLoggerFactory factory) private static ServiceDiscovery.Providers.IServiceDiscoveryProvider GetKubeProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamReRoute reRoute, IOcelotLoggerFactory factory)
{ {
var kubeClient = provider.GetService<IKubeApiClient>(); var kubeClient = provider.GetService<IKubeApiClient>();
var k8sRegistryConfiguration = new KubeRegistryConfiguration() var k8sRegistryConfiguration = new KubeRegistryConfiguration()
{ {
KeyOfServiceInK8s = reRoute.ServiceName, KeyOfServiceInK8s = reRoute.ServiceName,
KubeNamespace = string.IsNullOrEmpty(reRoute.ServiceNamespace) ? config.Namespace : reRoute.ServiceNamespace KubeNamespace = string.IsNullOrEmpty(reRoute.ServiceNamespace) ? config.Namespace : reRoute.ServiceNamespace
}; };
var k8sServiceDiscoveryProvider = new Kube(k8sRegistryConfiguration, factory, kubeClient); var k8sServiceDiscoveryProvider = new KubernetesServiceDiscoveryProvider(k8sRegistryConfiguration, factory, kubeClient);
if (config.Type?.ToLower() == "pollkube") if (config.Type?.ToLower() == "pollkube")
{ {
return new PollKube(config.PollingInterval, factory, k8sServiceDiscoveryProvider); return new PollKubernetes(config.PollingInterval, factory, k8sServiceDiscoveryProvider);
} }
return k8sServiceDiscoveryProvider; return k8sServiceDiscoveryProvider;
} }

View File

@ -7,7 +7,7 @@ using System.Threading.Tasks;
namespace Ocelot.Provider.Kubernetes namespace Ocelot.Provider.Kubernetes
{ {
public class PollKube : IServiceDiscoveryProvider public class PollKubernetes : IServiceDiscoveryProvider
{ {
private readonly IOcelotLogger _logger; private readonly IOcelotLogger _logger;
private readonly IServiceDiscoveryProvider _kubeServiceDiscoveryProvider; private readonly IServiceDiscoveryProvider _kubeServiceDiscoveryProvider;
@ -15,9 +15,9 @@ namespace Ocelot.Provider.Kubernetes
private bool _polling; private bool _polling;
private List<Service> _services; private List<Service> _services;
public PollKube(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider) public PollKubernetes(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider)
{ {
_logger = factory.CreateLogger<PollKube>(); _logger = factory.CreateLogger<PollKubernetes>();
_kubeServiceDiscoveryProvider = kubeServiceDiscoveryProvider; _kubeServiceDiscoveryProvider = kubeServiceDiscoveryProvider;
_services = new List<Service>(); _services = new List<Service>();

View File

@ -21,8 +21,8 @@ namespace Ocelot.UnitTests.Kubernetes
public class KubeServiceDiscoveryProviderTests : IDisposable public class KubeServiceDiscoveryProviderTests : IDisposable
{ {
private IWebHost _fakeKubeBuilder; private IWebHost _fakeKubeBuilder;
private ServiceV1 _serviceEntries; private readonly KubernetesServiceDiscoveryProvider _provider;
private readonly Kube _provider; private EndpointsV1 _endpointEntries;
private readonly string _serviceName; private readonly string _serviceName;
private readonly string _namespaces; private readonly string _namespaces;
private readonly int _port; private readonly int _port;
@ -41,7 +41,7 @@ namespace Ocelot.UnitTests.Kubernetes
_port = 86; _port = 86;
_kubeHost = "localhost"; _kubeHost = "localhost";
_fakekubeServiceDiscoveryUrl = $"http://{_kubeHost}:{_port}"; _fakekubeServiceDiscoveryUrl = $"http://{_kubeHost}:{_port}";
_serviceEntries = new ServiceV1(); _endpointEntries = new EndpointsV1();
_factory = new Mock<IOcelotLoggerFactory>(); _factory = new Mock<IOcelotLoggerFactory>();
var option = new KubeClientOptions var option = new KubeClientOptions
@ -49,51 +49,47 @@ namespace Ocelot.UnitTests.Kubernetes
ApiEndPoint = new Uri(_fakekubeServiceDiscoveryUrl), ApiEndPoint = new Uri(_fakekubeServiceDiscoveryUrl),
AccessToken = "txpc696iUhbVoudg164r93CxDTrKRVWG", AccessToken = "txpc696iUhbVoudg164r93CxDTrKRVWG",
AuthStrategy = KubeClient.KubeAuthStrategy.BearerToken, AuthStrategy = KubeClient.KubeAuthStrategy.BearerToken,
AllowInsecure = true AllowInsecure = true,
}; };
_clientFactory = KubeApiClient.Create(option); _clientFactory = KubeApiClient.Create(option);
_logger = new Mock<IOcelotLogger>(); _logger = new Mock<IOcelotLogger>();
_factory.Setup(x => x.CreateLogger<Kube>()).Returns(_logger.Object); _factory.Setup(x => x.CreateLogger<KubernetesServiceDiscoveryProvider>()).Returns(_logger.Object);
var config = new KubeRegistryConfiguration() var config = new KubeRegistryConfiguration()
{ {
KeyOfServiceInK8s = _serviceName, KeyOfServiceInK8s = _serviceName,
KubeNamespace = _namespaces KubeNamespace = _namespaces,
}; };
_provider = new Kube(config, _factory.Object, _clientFactory); _provider = new KubernetesServiceDiscoveryProvider(config, _factory.Object, _clientFactory);
} }
[Fact] [Fact]
public void should_return_service_from_k8s() public void should_return_service_from_k8s()
{ {
var token = "Bearer txpc696iUhbVoudg164r93CxDTrKRVWG"; var token = "Bearer txpc696iUhbVoudg164r93CxDTrKRVWG";
var serviceEntryOne = new ServiceV1() var endPointEntryOne = new EndpointsV1
{ {
Kind = "service", Kind = "endpoint",
ApiVersion = "1.0", ApiVersion = "1.0",
Metadata = new ObjectMetaV1() Metadata = new ObjectMetaV1()
{ {
Namespace = "dev" Namespace = "dev",
}, },
Spec = new ServiceSpecV1()
{
ClusterIP = "localhost"
},
Status = new ServiceStatusV1()
{
LoadBalancer = new LoadBalancerStatusV1()
}
}; };
var endpointSubsetV1 = new EndpointSubsetV1();
serviceEntryOne.Spec.Ports.Add( endpointSubsetV1.Addresses.Add(new EndpointAddressV1()
new ServicePortV1()
{ {
Port = 80 Ip = "127.0.0.1",
} Hostname = "localhost",
); });
endpointSubsetV1.Ports.Add(new EndpointPortV1()
{
Port = 80,
});
endPointEntryOne.Subsets.Add(endpointSubsetV1);
this.Given(x => GivenThereIsAFakeKubeServiceDiscoveryProvider(_fakekubeServiceDiscoveryUrl, _serviceName, _namespaces)) this.Given(x => GivenThereIsAFakeKubeServiceDiscoveryProvider(_fakekubeServiceDiscoveryUrl, _serviceName, _namespaces))
.And(x => GivenTheServicesAreRegisteredWithKube(serviceEntryOne)) .And(x => GivenTheServicesAreRegisteredWithKube(endPointEntryOne))
.When(x => WhenIGetTheServices()) .When(x => WhenIGetTheServices())
.Then(x => ThenTheCountIs(1)) .Then(x => ThenTheCountIs(1))
.And(_ => _receivedToken.ShouldBe(token)) .And(_ => _receivedToken.ShouldBe(token))
@ -110,9 +106,9 @@ namespace Ocelot.UnitTests.Kubernetes
_services = _provider.Get().GetAwaiter().GetResult(); _services = _provider.Get().GetAwaiter().GetResult();
} }
private void GivenTheServicesAreRegisteredWithKube(ServiceV1 serviceEntries) private void GivenTheServicesAreRegisteredWithKube(EndpointsV1 endpointEntries)
{ {
_serviceEntries = serviceEntries; _endpointEntries = endpointEntries;
} }
private void GivenThereIsAFakeKubeServiceDiscoveryProvider(string url, string serviceName, string namespaces) private void GivenThereIsAFakeKubeServiceDiscoveryProvider(string url, string serviceName, string namespaces)
@ -127,14 +123,14 @@ namespace Ocelot.UnitTests.Kubernetes
{ {
app.Run(async context => app.Run(async context =>
{ {
if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/services/{serviceName}") if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/endpoints/{serviceName}")
{ {
if (context.Request.Headers.TryGetValue("Authorization", out var values)) if (context.Request.Headers.TryGetValue("Authorization", out var values))
{ {
_receivedToken = values.First(); _receivedToken = values.First();
} }
var json = JsonConvert.SerializeObject(_serviceEntries); var json = JsonConvert.SerializeObject(_endpointEntries);
context.Response.Headers.Add("Content-Type", "application/json"); context.Response.Headers.Add("Content-Type", "application/json");
await context.Response.WriteAsync(json); await context.Response.WriteAsync(json);
} }

View File

@ -15,7 +15,7 @@ namespace Ocelot.UnitTests.Kubernetes
public class PollingKubeServiceDiscoveryProviderTests public class PollingKubeServiceDiscoveryProviderTests
{ {
private readonly int _delay; private readonly int _delay;
private PollKube _provider; private PollKubernetes _provider;
private readonly List<Service> _services; private readonly List<Service> _services;
private readonly Mock<IOcelotLoggerFactory> _factory; private readonly Mock<IOcelotLoggerFactory> _factory;
private readonly Mock<IOcelotLogger> _logger; private readonly Mock<IOcelotLogger> _logger;
@ -28,7 +28,7 @@ namespace Ocelot.UnitTests.Kubernetes
_delay = 1; _delay = 1;
_factory = new Mock<IOcelotLoggerFactory>(); _factory = new Mock<IOcelotLoggerFactory>();
_logger = new Mock<IOcelotLogger>(); _logger = new Mock<IOcelotLogger>();
_factory.Setup(x => x.CreateLogger<PollKube>()).Returns(_logger.Object); _factory.Setup(x => x.CreateLogger<PollKubernetes>()).Returns(_logger.Object);
_kubeServiceDiscoveryProvider = new Mock<IServiceDiscoveryProvider>(); _kubeServiceDiscoveryProvider = new Mock<IServiceDiscoveryProvider>();
} }
@ -56,7 +56,7 @@ namespace Ocelot.UnitTests.Kubernetes
private void WhenIGetTheServices(int expected) private void WhenIGetTheServices(int expected)
{ {
_provider = new PollKube(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object); _provider = new PollKubernetes(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object);
var result = Wait.WaitFor(3000).Until(() => var result = Wait.WaitFor(3000).Until(() =>
{ {