From 88034a5f9f1a95bdea969e3972a852976a87e656 Mon Sep 17 00:00:00 2001 From: WebMed Date: Mon, 10 Feb 2020 23:52:55 +0100 Subject: [PATCH 1/2] Updated KubeProvider to use pod endpoints instead of service --- .../EndPointClientV1.cs | 41 +++++++++++++++++++ .../KubeClientExtensions.cs | 12 ++++++ .../KubeProvider.cs | 36 +++++++--------- .../Ocelot.Provider.Kubernetes.csproj | 4 +- 4 files changed, 69 insertions(+), 24 deletions(-) create mode 100644 src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/EndPointClientV1.cs create mode 100644 src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/KubeClientExtensions.cs diff --git a/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/EndPointClientV1.cs b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/EndPointClientV1.cs new file mode 100644 index 00000000..dfae202b --- /dev/null +++ b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/EndPointClientV1.cs @@ -0,0 +1,41 @@ +using HTTPlease; +using KubeClient; +using KubeClient.Models; +using KubeClient.ResourceClients; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Ocelot.Provider.Kubernetes.KubeApiClientExtensions +{ + public class EndPointClientV1 : KubeResourceClient + { + public EndPointClientV1(IKubeApiClient client) : base(client) + { + } + + public async Task Get(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(serviceName)) throw new ArgumentNullException(nameof(serviceName)); + + var response = await Http.GetAsync( + Requests.Collection.WithTemplateParameters(new + { + Namespace = kubeNamespace ?? KubeClient.DefaultNamespace, + ServiceName = serviceName + }), + cancellationToken: cancellationToken + ); + + if (response.IsSuccessStatusCode) + return await response.ReadContentAsAsync(); + + return null; + } + + public static class Requests + { + public static readonly HttpRequest Collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}"); + } + } +} diff --git a/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/KubeClientExtensions.cs b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/KubeClientExtensions.cs new file mode 100644 index 00000000..1985dda8 --- /dev/null +++ b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/KubeClientExtensions.cs @@ -0,0 +1,12 @@ +using KubeClient; + +namespace Ocelot.Provider.Kubernetes.KubeApiClientExtensions +{ + public static class KubeClientExtensions + { + public static EndPointClientV1 EndPointsV1(this IKubeApiClient kubeClient) + { + return kubeClient.ResourceClient(client => new EndPointClientV1(client)); + } + } +} diff --git a/src/Ocelot.Provider.Kubernetes/KubeProvider.cs b/src/Ocelot.Provider.Kubernetes/KubeProvider.cs index 22f5b115..a4615b8c 100644 --- a/src/Ocelot.Provider.Kubernetes/KubeProvider.cs +++ b/src/Ocelot.Provider.Kubernetes/KubeProvider.cs @@ -6,14 +6,15 @@ using Ocelot.Values; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Ocelot.Provider.Kubernetes.KubeApiClientExtensions; namespace Ocelot.Provider.Kubernetes { public class Kube : IServiceDiscoveryProvider { private KubeRegistryConfiguration kubeRegistryConfiguration; - private IOcelotLogger logger; - private IKubeApiClient kubeApi; + private readonly IOcelotLogger logger; + private readonly IKubeApiClient kubeApi; public Kube(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi) { @@ -22,14 +23,13 @@ namespace Ocelot.Provider.Kubernetes this.kubeApi = kubeApi; } - public async Task> Get() { - var service = await kubeApi.ServicesV1().Get(kubeRegistryConfiguration.KeyOfServiceInK8s, kubeRegistryConfiguration.KubeNamespace); + var endpoint = await kubeApi.EndPointsV1().Get(kubeRegistryConfiguration.KeyOfServiceInK8s, kubeRegistryConfiguration.KubeNamespace); var services = new List(); - if (IsValid(service)) + if (endpoint != null && endpoint.Subsets.Any()) { - services.Add(BuildService(service)); + services.AddRange(BuildServices(endpoint)); } else { @@ -38,25 +38,17 @@ namespace Ocelot.Provider.Kubernetes return services; } - private bool IsValid(ServiceV1 service) + private List BuildServices(EndpointsV1 endpoint) { - if (string.IsNullOrEmpty(service.Spec.ClusterIP) || service.Spec.Ports.Count <= 0) + var services = new List(); + + foreach (var subset in endpoint.Subsets) { - return false; + services.AddRange(subset.Addresses.Select(address => new Service(endpoint.Metadata.Name, + new ServiceHostAndPort(address.Ip, subset.Ports.First().Port), + endpoint.Metadata.Uid, string.Empty, Enumerable.Empty()))); } - - return true; - } - - private Service BuildService(ServiceV1 serviceEntry) - { - var servicePort = serviceEntry.Spec.Ports.FirstOrDefault(); - return new Service( - serviceEntry.Metadata.Name, - new ServiceHostAndPort(serviceEntry.Spec.ClusterIP, servicePort.Port), - serviceEntry.Metadata.Uid, - string.Empty, - Enumerable.Empty()); + return services; } } } diff --git a/src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj b/src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj index 980cd909..8f342865 100644 --- a/src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj +++ b/src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj @@ -28,8 +28,8 @@ - - + + From 84907bd4cee8a0358dfe040bbaebd18bfe8b8dc5 Mon Sep 17 00:00:00 2001 From: WebMed Date: Sat, 22 Feb 2020 22:39:06 +0100 Subject: [PATCH 2/2] Update KubeServiceDiscoveryProviderTests --- .../KubeServiceDiscoveryProviderTests.cs | 50 +++++++++---------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs b/test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs index 8534194d..0c6afcf0 100644 --- a/test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs +++ b/test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs @@ -21,7 +21,7 @@ namespace Ocelot.UnitTests.Kubernetes public class KubeServiceDiscoveryProviderTests : IDisposable { private IWebHost _fakeKubeBuilder; - private ServiceV1 _serviceEntries; + private EndpointsV1 _endpointEntries; private Kube _provider; private readonly string _serviceName; private readonly string _namespaces; @@ -41,7 +41,7 @@ namespace Ocelot.UnitTests.Kubernetes _port = 8001; _kubeHost = "localhost"; _fakekubeServiceDiscoveryUrl = $"http://{_kubeHost}:{_port}"; - _serviceEntries = new ServiceV1(); + _endpointEntries = new EndpointsV1(); _factory = new Mock(); var option = new KubeClientOptions @@ -49,7 +49,7 @@ namespace Ocelot.UnitTests.Kubernetes ApiEndPoint = new Uri(_fakekubeServiceDiscoveryUrl), AccessToken = "txpc696iUhbVoudg164r93CxDTrKRVWG", AuthStrategy = KubeClient.KubeAuthStrategy.BearerToken, - AllowInsecure = true + AllowInsecure = true, }; _clientFactory = KubeApiClient.Create(option); @@ -58,7 +58,7 @@ namespace Ocelot.UnitTests.Kubernetes var config = new KubeRegistryConfiguration() { KeyOfServiceInK8s = _serviceName, - KubeNamespace = _namespaces + KubeNamespace = _namespaces, }; _provider = new Kube(config, _factory.Object, _clientFactory); } @@ -67,33 +67,29 @@ namespace Ocelot.UnitTests.Kubernetes public void should_return_service_from_k8s() { var token = "Bearer txpc696iUhbVoudg164r93CxDTrKRVWG"; - var serviceEntryOne = new ServiceV1() + var endPointEntryOne = new EndpointsV1 { - Kind = "service", + Kind = "endpoint", ApiVersion = "1.0", Metadata = new ObjectMetaV1() { - Namespace = "dev" + Namespace = "dev", }, - Spec = new ServiceSpecV1() - { - ClusterIP = "localhost" - }, - Status = new ServiceStatusV1() - { - LoadBalancer = new LoadBalancerStatusV1() - } }; - - serviceEntryOne.Spec.Ports.Add( - new ServicePortV1() - { - Port = 80 - } - ); + var endpointSubsetV1 = new EndpointSubsetV1(); + endpointSubsetV1.Addresses.Add(new EndpointAddressV1() + { + Ip = "127.0.0.1", + Hostname = "localhost", + }); + endpointSubsetV1.Ports.Add(new EndpointPortV1() + { + Port = 80, + }); + endPointEntryOne.Subsets.Add(endpointSubsetV1); this.Given(x => GivenThereIsAFakeKubeServiceDiscoveryProvider(_fakekubeServiceDiscoveryUrl, _serviceName, _namespaces)) - .And(x => GivenTheServicesAreRegisteredWithKube(serviceEntryOne)) + .And(x => GivenTheServicesAreRegisteredWithKube(endPointEntryOne)) .When(x => WhenIGetTheServices()) .Then(x => ThenTheCountIs(1)) .And(_ => _receivedToken.ShouldBe(token)) @@ -110,9 +106,9 @@ namespace Ocelot.UnitTests.Kubernetes _services = _provider.Get().GetAwaiter().GetResult(); } - private void GivenTheServicesAreRegisteredWithKube(ServiceV1 serviceEntries) + private void GivenTheServicesAreRegisteredWithKube(EndpointsV1 endpointEntries) { - _serviceEntries = serviceEntries; + _endpointEntries = endpointEntries; } private void GivenThereIsAFakeKubeServiceDiscoveryProvider(string url, string serviceName, string namespaces) @@ -127,14 +123,14 @@ namespace Ocelot.UnitTests.Kubernetes { app.Run(async context => { - if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/services/{serviceName}") + if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/endpoints/{serviceName}") { if (context.Request.Headers.TryGetValue("Authorization", out var values)) { _receivedToken = values.First(); } - var json = JsonConvert.SerializeObject(_serviceEntries); + var json = JsonConvert.SerializeObject(_endpointEntries); context.Response.Headers.Add("Content-Type", "application/json"); await context.Response.WriteAsync(json); }