diff --git a/docs/features/kubernetes.rst b/docs/features/kubernetes.rst index fd0d3f79..90120e5b 100644 --- a/docs/features/kubernetes.rst +++ b/docs/features/kubernetes.rst @@ -1,7 +1,9 @@ Kubernetes ============== -This feature was requested as part of `Issue 345 `_ . to add support for kubernetes's service discovery provider. +This feature was requested as part of `Issue 345 `_ . to add support for kubernetes's provider. + +Ocelot will call the k8s endpoints API in a given namespace to get all of the endpoints for a pod and then load balance across them. Ocelot used to use the services api to send requests to the k8s service but this was changed in `PR 1134 `_ because the service did not load balance as expected. The first thing you need to do is install the NuGet package that provides kubernetes support in Ocelot. @@ -23,7 +25,7 @@ If you have services deployed in kubernetes you will normally use the naming ser } You can replicate a Permissive. Using RBAC role bindings. -`Permissive RBAC Permissions `_, k8s api server and token will read from pod . +`Permissive RBAC Permissions `_, k8s api server and token will read from pod. .. code-block::bash kubectl create clusterrolebinding permissive-binding --clusterrole=cluster-admin --user=admin --user=kubelet --group=system:serviceaccounts @@ -76,7 +78,7 @@ The polling interval is in milliseconds and tells Ocelot how often to call kuber Please note there are tradeoffs here. If you poll kubernetes it is possible Ocelot will not know if a service is down depending on your polling interval and you might get more errors than if you get the latest services per request. This really depends on how volatile your services are. I doubt it will matter for most people and polling may give a tiny performance improvement over calling kubernetes per request. There is no way for Ocelot to work these out for you. -If your downstream service resides in a different namespace you can override the global setting at the ReRoute level by specifying a ServiceNamespace +If your downstream service resides in a different namespace you can override the global setting at the ReRoute level by specifying a ServiceNamespace. .. code-block:: json diff --git a/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/EndPointClientV1.cs b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/EndPointClientV1.cs new file mode 100644 index 00000000..3e384d32 --- /dev/null +++ b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions/EndPointClientV1.cs @@ -0,0 +1,38 @@ +using HTTPlease; +using KubeClient; +using KubeClient.Models; +using KubeClient.ResourceClients; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Ocelot.Provider.Kubernetes.KubeApiClientExtensions +{ + public class EndPointClientV1 : KubeResourceClient + { + private readonly HttpRequest _collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}"); + + public EndPointClientV1(IKubeApiClient client) : base(client) + { + } + + public async Task Get(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(serviceName)) throw new ArgumentNullException(nameof(serviceName)); + + var response = await Http.GetAsync( + _collection.WithTemplateParameters(new + { + Namespace = kubeNamespace ?? KubeClient.DefaultNamespace, + ServiceName = serviceName + }), + cancellationToken + ); + + if (response.IsSuccessStatusCode) + return await response.ReadContentAsAsync(); + + return null; + } + } +} diff --git a/src/Ocelot.Provider.Kubernetes/KubeProvider.cs b/src/Ocelot.Provider.Kubernetes/KubeProvider.cs index 22f5b115..927e4eea 100644 --- a/src/Ocelot.Provider.Kubernetes/KubeProvider.cs +++ b/src/Ocelot.Provider.Kubernetes/KubeProvider.cs @@ -6,57 +6,52 @@ using Ocelot.Values; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Ocelot.Provider.Kubernetes.KubeApiClientExtensions; namespace Ocelot.Provider.Kubernetes { - public class Kube : IServiceDiscoveryProvider + public class KubernetesServiceDiscoveryProvider : IServiceDiscoveryProvider { - private KubeRegistryConfiguration kubeRegistryConfiguration; - private IOcelotLogger logger; - private IKubeApiClient kubeApi; + private readonly KubeRegistryConfiguration _kubeRegistryConfiguration; + private readonly IOcelotLogger _logger; + private readonly IKubeApiClient _kubeApi; - public Kube(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi) + public KubernetesServiceDiscoveryProvider(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi) { - this.kubeRegistryConfiguration = kubeRegistryConfiguration; - this.logger = factory.CreateLogger(); - this.kubeApi = kubeApi; + _kubeRegistryConfiguration = kubeRegistryConfiguration; + _logger = factory.CreateLogger(); + _kubeApi = kubeApi; } - public async Task> Get() { - var service = await kubeApi.ServicesV1().Get(kubeRegistryConfiguration.KeyOfServiceInK8s, kubeRegistryConfiguration.KubeNamespace); + var endpoint = await _kubeApi + .ResourceClient(client => new EndPointClientV1(client)) + .Get(_kubeRegistryConfiguration.KeyOfServiceInK8s, _kubeRegistryConfiguration.KubeNamespace); + var services = new List(); - if (IsValid(service)) + if (endpoint != null && endpoint.Subsets.Any()) { - services.Add(BuildService(service)); + services.AddRange(BuildServices(endpoint)); } else { - logger.LogWarning($"namespace:{kubeRegistryConfiguration.KubeNamespace }service:{kubeRegistryConfiguration.KeyOfServiceInK8s} Unable to use ,it is invalid. Address must contain host only e.g. localhost and port must be greater than 0"); + _logger.LogWarning($"namespace:{_kubeRegistryConfiguration.KubeNamespace }service:{_kubeRegistryConfiguration.KeyOfServiceInK8s} Unable to use ,it is invalid. Address must contain host only e.g. localhost and port must be greater than 0"); } return services; } - private bool IsValid(ServiceV1 service) + private List BuildServices(EndpointsV1 endpoint) { - if (string.IsNullOrEmpty(service.Spec.ClusterIP) || service.Spec.Ports.Count <= 0) + var services = new List(); + + foreach (var subset in endpoint.Subsets) { - return false; + services.AddRange(subset.Addresses.Select(address => new Service(endpoint.Metadata.Name, + new ServiceHostAndPort(address.Ip, subset.Ports.First().Port), + endpoint.Metadata.Uid, string.Empty, Enumerable.Empty()))); } - - return true; - } - - private Service BuildService(ServiceV1 serviceEntry) - { - var servicePort = serviceEntry.Spec.Ports.FirstOrDefault(); - return new Service( - serviceEntry.Metadata.Name, - new ServiceHostAndPort(serviceEntry.Spec.ClusterIP, servicePort.Port), - serviceEntry.Metadata.Uid, - string.Empty, - Enumerable.Empty()); + return services; } } } diff --git a/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs b/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs index f12d0ebc..2918451a 100644 --- a/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs +++ b/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs @@ -12,22 +12,24 @@ namespace Ocelot.Provider.Kubernetes public static ServiceDiscoveryFinderDelegate Get = (provider, config, reRoute) => { var factory = provider.GetService(); - return GetkubeProvider(provider, config, reRoute, factory); + return GetKubeProvider(provider, config, reRoute, factory); }; - private static ServiceDiscovery.Providers.IServiceDiscoveryProvider GetkubeProvider(IServiceProvider provider, Configuration.ServiceProviderConfiguration config, DownstreamReRoute reRoute, IOcelotLoggerFactory factory) + private static ServiceDiscovery.Providers.IServiceDiscoveryProvider GetKubeProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamReRoute reRoute, IOcelotLoggerFactory factory) { var kubeClient = provider.GetService(); + var k8sRegistryConfiguration = new KubeRegistryConfiguration() { KeyOfServiceInK8s = reRoute.ServiceName, KubeNamespace = string.IsNullOrEmpty(reRoute.ServiceNamespace) ? config.Namespace : reRoute.ServiceNamespace }; - var k8sServiceDiscoveryProvider = new Kube(k8sRegistryConfiguration, factory, kubeClient); + var k8sServiceDiscoveryProvider = new KubernetesServiceDiscoveryProvider(k8sRegistryConfiguration, factory, kubeClient); + if (config.Type?.ToLower() == "pollkube") { - return new PollKube(config.PollingInterval, factory, k8sServiceDiscoveryProvider); + return new PollKubernetes(config.PollingInterval, factory, k8sServiceDiscoveryProvider); } return k8sServiceDiscoveryProvider; } diff --git a/src/Ocelot.Provider.Kubernetes/PollKube.cs b/src/Ocelot.Provider.Kubernetes/PollKubernetes.cs similarity index 81% rename from src/Ocelot.Provider.Kubernetes/PollKube.cs rename to src/Ocelot.Provider.Kubernetes/PollKubernetes.cs index 31d42a56..b1a90596 100644 --- a/src/Ocelot.Provider.Kubernetes/PollKube.cs +++ b/src/Ocelot.Provider.Kubernetes/PollKubernetes.cs @@ -7,7 +7,7 @@ using System.Threading.Tasks; namespace Ocelot.Provider.Kubernetes { - public class PollKube : IServiceDiscoveryProvider + public class PollKubernetes : IServiceDiscoveryProvider { private readonly IOcelotLogger _logger; private readonly IServiceDiscoveryProvider _kubeServiceDiscoveryProvider; @@ -15,9 +15,9 @@ namespace Ocelot.Provider.Kubernetes private bool _polling; private List _services; - public PollKube(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider) + public PollKubernetes(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider) { - _logger = factory.CreateLogger(); + _logger = factory.CreateLogger(); _kubeServiceDiscoveryProvider = kubeServiceDiscoveryProvider; _services = new List(); diff --git a/test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs b/test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs index 0f15ff56..b3f0193c 100644 --- a/test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs +++ b/test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs @@ -21,8 +21,8 @@ namespace Ocelot.UnitTests.Kubernetes public class KubeServiceDiscoveryProviderTests : IDisposable { private IWebHost _fakeKubeBuilder; - private ServiceV1 _serviceEntries; - private readonly Kube _provider; + private readonly KubernetesServiceDiscoveryProvider _provider; + private EndpointsV1 _endpointEntries; private readonly string _serviceName; private readonly string _namespaces; private readonly int _port; @@ -41,7 +41,7 @@ namespace Ocelot.UnitTests.Kubernetes _port = 86; _kubeHost = "localhost"; _fakekubeServiceDiscoveryUrl = $"http://{_kubeHost}:{_port}"; - _serviceEntries = new ServiceV1(); + _endpointEntries = new EndpointsV1(); _factory = new Mock(); var option = new KubeClientOptions @@ -49,51 +49,47 @@ namespace Ocelot.UnitTests.Kubernetes ApiEndPoint = new Uri(_fakekubeServiceDiscoveryUrl), AccessToken = "txpc696iUhbVoudg164r93CxDTrKRVWG", AuthStrategy = KubeClient.KubeAuthStrategy.BearerToken, - AllowInsecure = true + AllowInsecure = true, }; _clientFactory = KubeApiClient.Create(option); _logger = new Mock(); - _factory.Setup(x => x.CreateLogger()).Returns(_logger.Object); + _factory.Setup(x => x.CreateLogger()).Returns(_logger.Object); var config = new KubeRegistryConfiguration() { KeyOfServiceInK8s = _serviceName, - KubeNamespace = _namespaces + KubeNamespace = _namespaces, }; - _provider = new Kube(config, _factory.Object, _clientFactory); + _provider = new KubernetesServiceDiscoveryProvider(config, _factory.Object, _clientFactory); } [Fact] public void should_return_service_from_k8s() { var token = "Bearer txpc696iUhbVoudg164r93CxDTrKRVWG"; - var serviceEntryOne = new ServiceV1() + var endPointEntryOne = new EndpointsV1 { - Kind = "service", + Kind = "endpoint", ApiVersion = "1.0", Metadata = new ObjectMetaV1() { - Namespace = "dev" + Namespace = "dev", }, - Spec = new ServiceSpecV1() - { - ClusterIP = "localhost" - }, - Status = new ServiceStatusV1() - { - LoadBalancer = new LoadBalancerStatusV1() - } }; - - serviceEntryOne.Spec.Ports.Add( - new ServicePortV1() - { - Port = 80 - } - ); + var endpointSubsetV1 = new EndpointSubsetV1(); + endpointSubsetV1.Addresses.Add(new EndpointAddressV1() + { + Ip = "127.0.0.1", + Hostname = "localhost", + }); + endpointSubsetV1.Ports.Add(new EndpointPortV1() + { + Port = 80, + }); + endPointEntryOne.Subsets.Add(endpointSubsetV1); this.Given(x => GivenThereIsAFakeKubeServiceDiscoveryProvider(_fakekubeServiceDiscoveryUrl, _serviceName, _namespaces)) - .And(x => GivenTheServicesAreRegisteredWithKube(serviceEntryOne)) + .And(x => GivenTheServicesAreRegisteredWithKube(endPointEntryOne)) .When(x => WhenIGetTheServices()) .Then(x => ThenTheCountIs(1)) .And(_ => _receivedToken.ShouldBe(token)) @@ -110,9 +106,9 @@ namespace Ocelot.UnitTests.Kubernetes _services = _provider.Get().GetAwaiter().GetResult(); } - private void GivenTheServicesAreRegisteredWithKube(ServiceV1 serviceEntries) + private void GivenTheServicesAreRegisteredWithKube(EndpointsV1 endpointEntries) { - _serviceEntries = serviceEntries; + _endpointEntries = endpointEntries; } private void GivenThereIsAFakeKubeServiceDiscoveryProvider(string url, string serviceName, string namespaces) @@ -127,14 +123,14 @@ namespace Ocelot.UnitTests.Kubernetes { app.Run(async context => { - if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/services/{serviceName}") + if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/endpoints/{serviceName}") { if (context.Request.Headers.TryGetValue("Authorization", out var values)) { _receivedToken = values.First(); } - var json = JsonConvert.SerializeObject(_serviceEntries); + var json = JsonConvert.SerializeObject(_endpointEntries); context.Response.Headers.Add("Content-Type", "application/json"); await context.Response.WriteAsync(json); } diff --git a/test/Ocelot.UnitTests/Kubernetes/PollingKubeServiceDiscoveryProviderTests.cs b/test/Ocelot.UnitTests/Kubernetes/PollingKubeServiceDiscoveryProviderTests.cs index 0009ec02..0fed68b7 100644 --- a/test/Ocelot.UnitTests/Kubernetes/PollingKubeServiceDiscoveryProviderTests.cs +++ b/test/Ocelot.UnitTests/Kubernetes/PollingKubeServiceDiscoveryProviderTests.cs @@ -15,7 +15,7 @@ namespace Ocelot.UnitTests.Kubernetes public class PollingKubeServiceDiscoveryProviderTests { private readonly int _delay; - private PollKube _provider; + private PollKubernetes _provider; private readonly List _services; private readonly Mock _factory; private readonly Mock _logger; @@ -28,7 +28,7 @@ namespace Ocelot.UnitTests.Kubernetes _delay = 1; _factory = new Mock(); _logger = new Mock(); - _factory.Setup(x => x.CreateLogger()).Returns(_logger.Object); + _factory.Setup(x => x.CreateLogger()).Returns(_logger.Object); _kubeServiceDiscoveryProvider = new Mock(); } @@ -56,7 +56,7 @@ namespace Ocelot.UnitTests.Kubernetes private void WhenIGetTheServices(int expected) { - _provider = new PollKube(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object); + _provider = new PollKubernetes(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object); var result = Wait.WaitFor(3000).Until(() => {