mirror of
				https://github.com/nsnail/Ocelot.git
				synced 2025-11-04 23:30:50 +08:00 
			
		
		
		
	Merge branch 'feature/fix-unstable-raft-tests' into develop
This commit is contained in:
		@@ -1,69 +1,76 @@
 | 
			
		||||
using System;
 | 
			
		||||
using System.Threading.Tasks;
 | 
			
		||||
using Microsoft.AspNetCore.Authorization;
 | 
			
		||||
using Microsoft.AspNetCore.Mvc;
 | 
			
		||||
using Ocelot.Configuration.File;
 | 
			
		||||
using Ocelot.Configuration.Setter;
 | 
			
		||||
using Ocelot.Raft;
 | 
			
		||||
using Rafty.Concensus;
 | 
			
		||||
 | 
			
		||||
namespace Ocelot.Configuration
 | 
			
		||||
{
 | 
			
		||||
    using Repository;
 | 
			
		||||
 | 
			
		||||
    [Authorize]
 | 
			
		||||
    [Route("configuration")]
 | 
			
		||||
    public class FileConfigurationController : Controller
 | 
			
		||||
    {
 | 
			
		||||
        private readonly IFileConfigurationRepository _repo;
 | 
			
		||||
        private readonly IFileConfigurationSetter _setter;
 | 
			
		||||
        private readonly IServiceProvider _provider;
 | 
			
		||||
 | 
			
		||||
        public FileConfigurationController(IFileConfigurationRepository repo, IFileConfigurationSetter setter, IServiceProvider provider)
 | 
			
		||||
        {
 | 
			
		||||
            _repo = repo;
 | 
			
		||||
            _setter = setter;
 | 
			
		||||
            _provider = provider;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        [HttpGet]
 | 
			
		||||
        public async Task<IActionResult> Get()
 | 
			
		||||
        {
 | 
			
		||||
            var response = await _repo.Get();
 | 
			
		||||
 | 
			
		||||
            if(response.IsError)
 | 
			
		||||
            {
 | 
			
		||||
                return new BadRequestObjectResult(response.Errors);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            return new OkObjectResult(response.Data);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        [HttpPost]
 | 
			
		||||
        public async Task<IActionResult> Post([FromBody]FileConfiguration fileConfiguration)
 | 
			
		||||
        {
 | 
			
		||||
            //todo - this code is a bit shit sort it out..
 | 
			
		||||
            var test = _provider.GetService(typeof(INode));
 | 
			
		||||
            if (test != null)
 | 
			
		||||
            {
 | 
			
		||||
                var node = (INode)test;
 | 
			
		||||
                var result = node.Accept(new UpdateFileConfiguration(fileConfiguration));
 | 
			
		||||
                if (result.GetType() == typeof(Rafty.Concensus.ErrorResponse<UpdateFileConfiguration>))
 | 
			
		||||
                {
 | 
			
		||||
                    return new BadRequestObjectResult("There was a problem. This error message sucks raise an issue in GitHub.");
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                return new OkObjectResult(result.Command.Configuration);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            var response = await _setter.Set(fileConfiguration);
 | 
			
		||||
 | 
			
		||||
            if (response.IsError)
 | 
			
		||||
            {
 | 
			
		||||
                return new BadRequestObjectResult(response.Errors);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            return new OkObjectResult(fileConfiguration);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
using System;
 | 
			
		||||
using System.Threading.Tasks;
 | 
			
		||||
using Microsoft.AspNetCore.Authorization;
 | 
			
		||||
using Microsoft.AspNetCore.Mvc;
 | 
			
		||||
using Ocelot.Configuration.File;
 | 
			
		||||
using Ocelot.Configuration.Setter;
 | 
			
		||||
using Ocelot.Raft;
 | 
			
		||||
using Rafty.Concensus;
 | 
			
		||||
 | 
			
		||||
namespace Ocelot.Configuration
 | 
			
		||||
{
 | 
			
		||||
    using Repository;
 | 
			
		||||
 | 
			
		||||
    [Authorize]
 | 
			
		||||
    [Route("configuration")]
 | 
			
		||||
    public class FileConfigurationController : Controller
 | 
			
		||||
    {
 | 
			
		||||
        private readonly IFileConfigurationRepository _repo;
 | 
			
		||||
        private readonly IFileConfigurationSetter _setter;
 | 
			
		||||
        private readonly IServiceProvider _provider;
 | 
			
		||||
 | 
			
		||||
        public FileConfigurationController(IFileConfigurationRepository repo, IFileConfigurationSetter setter, IServiceProvider provider)
 | 
			
		||||
        {
 | 
			
		||||
            _repo = repo;
 | 
			
		||||
            _setter = setter;
 | 
			
		||||
            _provider = provider;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        [HttpGet]
 | 
			
		||||
        public async Task<IActionResult> Get()
 | 
			
		||||
        {
 | 
			
		||||
            var response = await _repo.Get();
 | 
			
		||||
 | 
			
		||||
            if(response.IsError)
 | 
			
		||||
            {
 | 
			
		||||
                return new BadRequestObjectResult(response.Errors);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            return new OkObjectResult(response.Data);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        [HttpPost]
 | 
			
		||||
        public async Task<IActionResult> Post([FromBody]FileConfiguration fileConfiguration)
 | 
			
		||||
        {
 | 
			
		||||
            try
 | 
			
		||||
            {
 | 
			
		||||
                //todo - this code is a bit shit sort it out..
 | 
			
		||||
                var test = _provider.GetService(typeof(INode));
 | 
			
		||||
                if (test != null)
 | 
			
		||||
                {
 | 
			
		||||
                    var node = (INode)test;
 | 
			
		||||
                    var result = node.Accept(new UpdateFileConfiguration(fileConfiguration));
 | 
			
		||||
                    if (result.GetType() == typeof(Rafty.Concensus.ErrorResponse<UpdateFileConfiguration>))
 | 
			
		||||
                    {
 | 
			
		||||
                        return new BadRequestObjectResult("There was a problem. This error message sucks raise an issue in GitHub.");
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    return new OkObjectResult(result.Command.Configuration);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                var response = await _setter.Set(fileConfiguration);
 | 
			
		||||
 | 
			
		||||
                if (response.IsError)
 | 
			
		||||
                {
 | 
			
		||||
                    return new BadRequestObjectResult(response.Errors);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                return new OkObjectResult(fileConfiguration);
 | 
			
		||||
            }
 | 
			
		||||
            catch(Exception e)
 | 
			
		||||
            {
 | 
			
		||||
                return new BadRequestObjectResult($"{e.Message}:{e.StackTrace}");
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -40,7 +40,7 @@ namespace Ocelot.Configuration.Repository
 | 
			
		||||
                _polling = true;
 | 
			
		||||
                await Poll();
 | 
			
		||||
                _polling = false;
 | 
			
		||||
            }, null, 0, _config.Delay);
 | 
			
		||||
            }, null, _config.Delay, _config.Delay);
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        private async Task Poll()
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										15
									
								
								src/Ocelot/Infrastructure/DelayedMessage.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								src/Ocelot/Infrastructure/DelayedMessage.cs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,15 @@
 | 
			
		||||
namespace Ocelot.Infrastructure
 | 
			
		||||
{
 | 
			
		||||
    internal class DelayedMessage<T>
 | 
			
		||||
    {
 | 
			
		||||
        public DelayedMessage(T message, int delay)
 | 
			
		||||
        {
 | 
			
		||||
            Delay = delay;
 | 
			
		||||
            Message = message;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public T Message { get; set; }
 | 
			
		||||
 | 
			
		||||
        public int Delay { get; set; }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										10
									
								
								src/Ocelot/Infrastructure/IBus.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								src/Ocelot/Infrastructure/IBus.cs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,10 @@
 | 
			
		||||
using System;
 | 
			
		||||
 | 
			
		||||
namespace Ocelot.Infrastructure
 | 
			
		||||
{
 | 
			
		||||
    public interface IBus<T>
 | 
			
		||||
    {
 | 
			
		||||
        void Subscribe(Action<T> action);   
 | 
			
		||||
        void Publish(T message, int delay);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										47
									
								
								src/Ocelot/Infrastructure/InMemoryBus.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								src/Ocelot/Infrastructure/InMemoryBus.cs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,47 @@
 | 
			
		||||
using System;
 | 
			
		||||
using System.Collections.Concurrent;
 | 
			
		||||
using System.Collections.Generic;
 | 
			
		||||
using System.Threading;
 | 
			
		||||
using System.Threading.Tasks;
 | 
			
		||||
 | 
			
		||||
namespace Ocelot.Infrastructure
 | 
			
		||||
{
 | 
			
		||||
    public class InMemoryBus<T> : IBus<T>
 | 
			
		||||
    {
 | 
			
		||||
        private readonly BlockingCollection<DelayedMessage<T>> _queue;
 | 
			
		||||
        private readonly List<Action<T>> _subscriptions;
 | 
			
		||||
        private Thread _processing;
 | 
			
		||||
 | 
			
		||||
        public InMemoryBus()
 | 
			
		||||
        {
 | 
			
		||||
            _queue = new BlockingCollection<DelayedMessage<T>>();
 | 
			
		||||
            _subscriptions = new List<Action<T>>();
 | 
			
		||||
            _processing = new Thread(async () => await Process());
 | 
			
		||||
            _processing.Start();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public void Subscribe(Action<T> action)
 | 
			
		||||
        {
 | 
			
		||||
            _subscriptions.Add(action);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public void Publish(T message, int delay)
 | 
			
		||||
        {
 | 
			
		||||
            var delayed = new DelayedMessage<T>(message, delay);
 | 
			
		||||
            _queue.Add(delayed);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        private async Task Process()
 | 
			
		||||
        {
 | 
			
		||||
            foreach(var delayedMessage in _queue.GetConsumingEnumerable())
 | 
			
		||||
            {
 | 
			
		||||
                await Task.Delay(delayedMessage.Delay);
 | 
			
		||||
 | 
			
		||||
                foreach (var subscription in _subscriptions)
 | 
			
		||||
                {
 | 
			
		||||
                    subscription(delayedMessage.Message);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -3,61 +3,64 @@ namespace Ocelot.LoadBalancer.LoadBalancers
 | 
			
		||||
    using System;
 | 
			
		||||
    using System.Collections.Concurrent;
 | 
			
		||||
    using System.Collections.Generic;
 | 
			
		||||
    using System.Linq;
 | 
			
		||||
    using System.Threading;
 | 
			
		||||
    using System.Threading.Tasks;
 | 
			
		||||
    using Ocelot.Infrastructure;
 | 
			
		||||
    using Ocelot.Middleware;
 | 
			
		||||
    using Responses;
 | 
			
		||||
    using Values;
 | 
			
		||||
 | 
			
		||||
    public class CookieStickySessions : ILoadBalancer, IDisposable
 | 
			
		||||
    public class CookieStickySessions : ILoadBalancer
 | 
			
		||||
    {
 | 
			
		||||
        private readonly int _expiryInMs;
 | 
			
		||||
        private readonly int _keyExpiryInMs;
 | 
			
		||||
        private readonly string _key;
 | 
			
		||||
        private readonly ILoadBalancer _loadBalancer;
 | 
			
		||||
        private readonly ConcurrentDictionary<string, StickySession> _stored;
 | 
			
		||||
        private readonly Timer _timer;
 | 
			
		||||
        private bool _expiring;
 | 
			
		||||
        private readonly IBus<StickySession> _bus;
 | 
			
		||||
        private readonly object _lock = new object();
 | 
			
		||||
 | 
			
		||||
        public CookieStickySessions(ILoadBalancer loadBalancer, string key, int expiryInMs)
 | 
			
		||||
        public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs, IBus<StickySession> bus)
 | 
			
		||||
        {
 | 
			
		||||
            _bus = bus;
 | 
			
		||||
            _key = key;
 | 
			
		||||
            _expiryInMs = expiryInMs;
 | 
			
		||||
            _keyExpiryInMs = keyExpiryInMs;
 | 
			
		||||
            _loadBalancer = loadBalancer;
 | 
			
		||||
            _stored = new ConcurrentDictionary<string, StickySession>();
 | 
			
		||||
            _timer = new Timer(x =>
 | 
			
		||||
            _bus.Subscribe(ss =>
 | 
			
		||||
            {
 | 
			
		||||
                if (_expiring)
 | 
			
		||||
                //todo - get test coverage for this.
 | 
			
		||||
                if (_stored.TryGetValue(ss.Key, out var stickySession))
 | 
			
		||||
                {
 | 
			
		||||
                    return;
 | 
			
		||||
                    lock (_lock)
 | 
			
		||||
                    {
 | 
			
		||||
                        if (stickySession.Expiry < DateTime.UtcNow)
 | 
			
		||||
                        {
 | 
			
		||||
                            _stored.Remove(stickySession.Key, out _);
 | 
			
		||||
                            _loadBalancer.Release(stickySession.HostAndPort);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                _expiring = true;
 | 
			
		||||
 | 
			
		||||
                Expire();
 | 
			
		||||
 | 
			
		||||
                _expiring = false;
 | 
			
		||||
            }, null, 0, 50);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public void Dispose()
 | 
			
		||||
        {
 | 
			
		||||
            _timer?.Dispose();
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public async Task<Response<ServiceHostAndPort>> Lease(DownstreamContext context)
 | 
			
		||||
        {
 | 
			
		||||
            var value = context.HttpContext.Request.Cookies[_key];
 | 
			
		||||
            var key = context.HttpContext.Request.Cookies[_key];
 | 
			
		||||
 | 
			
		||||
            if (!string.IsNullOrEmpty(value) && _stored.ContainsKey(value))
 | 
			
		||||
            lock (_lock)
 | 
			
		||||
            {
 | 
			
		||||
                var cached = _stored[value];
 | 
			
		||||
                if (!string.IsNullOrEmpty(key) && _stored.ContainsKey(key))
 | 
			
		||||
                {
 | 
			
		||||
                    var cached = _stored[key];
 | 
			
		||||
 | 
			
		||||
                var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_expiryInMs));
 | 
			
		||||
                    var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
 | 
			
		||||
 | 
			
		||||
                _stored[value] = updated;
 | 
			
		||||
                    _stored[key] = updated;
 | 
			
		||||
 | 
			
		||||
                return new OkResponse<ServiceHostAndPort>(updated.HostAndPort);
 | 
			
		||||
                    _bus.Publish(updated, _keyExpiryInMs);
 | 
			
		||||
 | 
			
		||||
                    return new OkResponse<ServiceHostAndPort>(updated.HostAndPort);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            var next = await _loadBalancer.Lease(context);
 | 
			
		||||
@@ -67,9 +70,14 @@ namespace Ocelot.LoadBalancer.LoadBalancers
 | 
			
		||||
                return new ErrorResponse<ServiceHostAndPort>(next.Errors);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (!string.IsNullOrEmpty(value) && !_stored.ContainsKey(value))
 | 
			
		||||
            lock (_lock)
 | 
			
		||||
            {
 | 
			
		||||
                _stored[value] = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_expiryInMs));
 | 
			
		||||
                if (!string.IsNullOrEmpty(key) && !_stored.ContainsKey(key))
 | 
			
		||||
                {
 | 
			
		||||
                    var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
 | 
			
		||||
                    _stored[key] = ss;
 | 
			
		||||
                    _bus.Publish(ss, _keyExpiryInMs);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            return new OkResponse<ServiceHostAndPort>(next.Data);
 | 
			
		||||
@@ -78,16 +86,5 @@ namespace Ocelot.LoadBalancer.LoadBalancers
 | 
			
		||||
        public void Release(ServiceHostAndPort hostAndPort)
 | 
			
		||||
        {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        private void Expire()
 | 
			
		||||
        {
 | 
			
		||||
            var expired = _stored.Where(x => x.Value.Expiry < DateTime.UtcNow);
 | 
			
		||||
 | 
			
		||||
            foreach (var expire in expired)
 | 
			
		||||
            {
 | 
			
		||||
                _stored.Remove(expire.Key, out _);
 | 
			
		||||
                _loadBalancer.Release(expire.Value.HostAndPort);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,6 @@
 | 
			
		||||
using System.Threading.Tasks;
 | 
			
		||||
using Ocelot.Configuration;
 | 
			
		||||
using Ocelot.Infrastructure;
 | 
			
		||||
using Ocelot.ServiceDiscovery;
 | 
			
		||||
 | 
			
		||||
namespace Ocelot.LoadBalancer.LoadBalancers
 | 
			
		||||
@@ -25,7 +26,8 @@ namespace Ocelot.LoadBalancer.LoadBalancers
 | 
			
		||||
                    return new LeastConnection(async () => await serviceProvider.Get(), reRoute.ServiceName);
 | 
			
		||||
                case nameof(CookieStickySessions):
 | 
			
		||||
                    var loadBalancer = new RoundRobin(async () => await serviceProvider.Get());
 | 
			
		||||
                    return new CookieStickySessions(loadBalancer, reRoute.LoadBalancerOptions.Key, reRoute.LoadBalancerOptions.ExpiryInMs);
 | 
			
		||||
                    var bus = new InMemoryBus<StickySession>();
 | 
			
		||||
                    return new CookieStickySessions(loadBalancer, reRoute.LoadBalancerOptions.Key, reRoute.LoadBalancerOptions.ExpiryInMs, bus);
 | 
			
		||||
                default:
 | 
			
		||||
                    return new NoLoadBalancer(await serviceProvider.Get());
 | 
			
		||||
            }
 | 
			
		||||
 
 | 
			
		||||
@@ -5,14 +5,17 @@ namespace Ocelot.LoadBalancer.LoadBalancers
 | 
			
		||||
{
 | 
			
		||||
    public class StickySession
 | 
			
		||||
    {
 | 
			
		||||
        public StickySession(ServiceHostAndPort hostAndPort, DateTime expiry)
 | 
			
		||||
        public StickySession(ServiceHostAndPort hostAndPort, DateTime expiry, string key)
 | 
			
		||||
        {
 | 
			
		||||
            HostAndPort = hostAndPort;
 | 
			
		||||
            Expiry = expiry;
 | 
			
		||||
            Key = key;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public ServiceHostAndPort HostAndPort { get; }
 | 
			
		||||
        
 | 
			
		||||
        public DateTime Expiry { get; }
 | 
			
		||||
 | 
			
		||||
        public string Key {get;}
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,128 +1,132 @@
 | 
			
		||||
using System;
 | 
			
		||||
using System.Collections.Generic;
 | 
			
		||||
using System.Net.Http;
 | 
			
		||||
using Newtonsoft.Json;
 | 
			
		||||
using Ocelot.Configuration;
 | 
			
		||||
using Ocelot.Middleware;
 | 
			
		||||
using Rafty.Concensus;
 | 
			
		||||
using Rafty.FiniteStateMachine;
 | 
			
		||||
 | 
			
		||||
namespace Ocelot.Raft
 | 
			
		||||
{
 | 
			
		||||
    [ExcludeFromCoverage]
 | 
			
		||||
    public class HttpPeer : IPeer
 | 
			
		||||
    {
 | 
			
		||||
        private readonly string _hostAndPort;
 | 
			
		||||
        private readonly HttpClient _httpClient;
 | 
			
		||||
        private readonly JsonSerializerSettings _jsonSerializerSettings;
 | 
			
		||||
        private readonly string _baseSchemeUrlAndPort;
 | 
			
		||||
        private BearerToken _token;
 | 
			
		||||
        private readonly IInternalConfiguration _config;
 | 
			
		||||
        private readonly IIdentityServerConfiguration _identityServerConfiguration;
 | 
			
		||||
 | 
			
		||||
        public HttpPeer(string hostAndPort, HttpClient httpClient, IBaseUrlFinder finder, IInternalConfiguration config, IIdentityServerConfiguration identityServerConfiguration)
 | 
			
		||||
        {
 | 
			
		||||
            _identityServerConfiguration = identityServerConfiguration;
 | 
			
		||||
            _config = config;
 | 
			
		||||
            Id = hostAndPort;
 | 
			
		||||
            _hostAndPort = hostAndPort;
 | 
			
		||||
            _httpClient = httpClient;
 | 
			
		||||
            _jsonSerializerSettings = new JsonSerializerSettings() { 
 | 
			
		||||
                TypeNameHandling = TypeNameHandling.All
 | 
			
		||||
            };
 | 
			
		||||
            _baseSchemeUrlAndPort = finder.Find();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public string Id {get; private set;}
 | 
			
		||||
 | 
			
		||||
        public RequestVoteResponse Request(RequestVote requestVote)
 | 
			
		||||
        {
 | 
			
		||||
            if(_token == null)
 | 
			
		||||
            {
 | 
			
		||||
                SetToken();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            var json = JsonConvert.SerializeObject(requestVote, _jsonSerializerSettings);
 | 
			
		||||
            var content = new StringContent(json);
 | 
			
		||||
            content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
 | 
			
		||||
            var response = _httpClient.PostAsync($"{_hostAndPort}/administration/raft/requestvote", content).GetAwaiter().GetResult();
 | 
			
		||||
            if(response.IsSuccessStatusCode)
 | 
			
		||||
            {
 | 
			
		||||
                return JsonConvert.DeserializeObject<RequestVoteResponse>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), _jsonSerializerSettings);
 | 
			
		||||
            }
 | 
			
		||||
            else
 | 
			
		||||
            {
 | 
			
		||||
                return new RequestVoteResponse(false, requestVote.Term);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public AppendEntriesResponse Request(AppendEntries appendEntries)
 | 
			
		||||
        {
 | 
			
		||||
            try
 | 
			
		||||
            {
 | 
			
		||||
                if(_token == null)
 | 
			
		||||
                {
 | 
			
		||||
                    SetToken();
 | 
			
		||||
                }                
 | 
			
		||||
 | 
			
		||||
                var json = JsonConvert.SerializeObject(appendEntries, _jsonSerializerSettings);
 | 
			
		||||
                var content = new StringContent(json);
 | 
			
		||||
                content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
 | 
			
		||||
                var response = _httpClient.PostAsync($"{_hostAndPort}/administration/raft/appendEntries", content).GetAwaiter().GetResult();
 | 
			
		||||
                if(response.IsSuccessStatusCode)
 | 
			
		||||
                {
 | 
			
		||||
                    return JsonConvert.DeserializeObject<AppendEntriesResponse>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(),_jsonSerializerSettings);
 | 
			
		||||
                }
 | 
			
		||||
                else
 | 
			
		||||
                {
 | 
			
		||||
                    return new AppendEntriesResponse(appendEntries.Term, false);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            catch(Exception ex)
 | 
			
		||||
            {
 | 
			
		||||
                Console.WriteLine(ex);
 | 
			
		||||
                return new AppendEntriesResponse(appendEntries.Term, false);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public Response<T> Request<T>(T command)
 | 
			
		||||
            where T : ICommand
 | 
			
		||||
        {
 | 
			
		||||
            if(_token == null)
 | 
			
		||||
            {
 | 
			
		||||
                SetToken();
 | 
			
		||||
            }   
 | 
			
		||||
 | 
			
		||||
            var json = JsonConvert.SerializeObject(command, _jsonSerializerSettings);
 | 
			
		||||
            var content = new StringContent(json);
 | 
			
		||||
            content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
 | 
			
		||||
            var response = _httpClient.PostAsync($"{_hostAndPort}/administration/raft/command", content).GetAwaiter().GetResult();
 | 
			
		||||
            if(response.IsSuccessStatusCode)
 | 
			
		||||
            {
 | 
			
		||||
                return JsonConvert.DeserializeObject<OkResponse<T>>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), _jsonSerializerSettings);
 | 
			
		||||
            }
 | 
			
		||||
            else 
 | 
			
		||||
            {
 | 
			
		||||
                return new ErrorResponse<T>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), command);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        private void SetToken()
 | 
			
		||||
        {
 | 
			
		||||
            var tokenUrl = $"{_baseSchemeUrlAndPort}{_config.AdministrationPath}/connect/token";
 | 
			
		||||
            var formData = new List<KeyValuePair<string, string>>
 | 
			
		||||
            {
 | 
			
		||||
                new KeyValuePair<string, string>("client_id", _identityServerConfiguration.ApiName),
 | 
			
		||||
                new KeyValuePair<string, string>("client_secret", _identityServerConfiguration.ApiSecret),
 | 
			
		||||
                new KeyValuePair<string, string>("scope", _identityServerConfiguration.ApiName),
 | 
			
		||||
                new KeyValuePair<string, string>("grant_type", "client_credentials")
 | 
			
		||||
            };
 | 
			
		||||
            var content = new FormUrlEncodedContent(formData);
 | 
			
		||||
            var response = _httpClient.PostAsync(tokenUrl, content).GetAwaiter().GetResult();
 | 
			
		||||
            var responseContent = response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
 | 
			
		||||
            response.EnsureSuccessStatusCode();
 | 
			
		||||
            _token = JsonConvert.DeserializeObject<BearerToken>(responseContent);
 | 
			
		||||
            _httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue(_token.TokenType, _token.AccessToken);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
using System;
 | 
			
		||||
using System.Collections.Generic;
 | 
			
		||||
using System.Net.Http;
 | 
			
		||||
using Newtonsoft.Json;
 | 
			
		||||
using Ocelot.Configuration;
 | 
			
		||||
using Ocelot.Middleware;
 | 
			
		||||
using Rafty.Concensus;
 | 
			
		||||
using Rafty.FiniteStateMachine;
 | 
			
		||||
 | 
			
		||||
namespace Ocelot.Raft
 | 
			
		||||
{
 | 
			
		||||
    [ExcludeFromCoverage]
 | 
			
		||||
    public class HttpPeer : IPeer
 | 
			
		||||
    {
 | 
			
		||||
        private readonly string _hostAndPort;
 | 
			
		||||
        private readonly HttpClient _httpClient;
 | 
			
		||||
        private readonly JsonSerializerSettings _jsonSerializerSettings;
 | 
			
		||||
        private readonly string _baseSchemeUrlAndPort;
 | 
			
		||||
        private BearerToken _token;
 | 
			
		||||
        private readonly IInternalConfiguration _config;
 | 
			
		||||
        private readonly IIdentityServerConfiguration _identityServerConfiguration;
 | 
			
		||||
 | 
			
		||||
        public HttpPeer(string hostAndPort, HttpClient httpClient, IBaseUrlFinder finder, IInternalConfiguration config, IIdentityServerConfiguration identityServerConfiguration)
 | 
			
		||||
        {
 | 
			
		||||
            _identityServerConfiguration = identityServerConfiguration;
 | 
			
		||||
            _config = config;
 | 
			
		||||
            Id = hostAndPort;
 | 
			
		||||
            _hostAndPort = hostAndPort;
 | 
			
		||||
            _httpClient = httpClient;
 | 
			
		||||
            _jsonSerializerSettings = new JsonSerializerSettings() { 
 | 
			
		||||
                TypeNameHandling = TypeNameHandling.All
 | 
			
		||||
            };
 | 
			
		||||
            _baseSchemeUrlAndPort = finder.Find();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public string Id {get; private set;}
 | 
			
		||||
 | 
			
		||||
        public RequestVoteResponse Request(RequestVote requestVote)
 | 
			
		||||
        {
 | 
			
		||||
            if(_token == null)
 | 
			
		||||
            {
 | 
			
		||||
                SetToken();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            var json = JsonConvert.SerializeObject(requestVote, _jsonSerializerSettings);
 | 
			
		||||
            var content = new StringContent(json);
 | 
			
		||||
            content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
 | 
			
		||||
            var response = _httpClient.PostAsync($"{_hostAndPort}/administration/raft/requestvote", content).GetAwaiter().GetResult();
 | 
			
		||||
            if(response.IsSuccessStatusCode)
 | 
			
		||||
            {
 | 
			
		||||
                return JsonConvert.DeserializeObject<RequestVoteResponse>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), _jsonSerializerSettings);
 | 
			
		||||
            }
 | 
			
		||||
            else
 | 
			
		||||
            {
 | 
			
		||||
                return new RequestVoteResponse(false, requestVote.Term);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public AppendEntriesResponse Request(AppendEntries appendEntries)
 | 
			
		||||
        {
 | 
			
		||||
            try
 | 
			
		||||
            {
 | 
			
		||||
                if(_token == null)
 | 
			
		||||
                {
 | 
			
		||||
                    SetToken();
 | 
			
		||||
                }                
 | 
			
		||||
 | 
			
		||||
                var json = JsonConvert.SerializeObject(appendEntries, _jsonSerializerSettings);
 | 
			
		||||
                var content = new StringContent(json);
 | 
			
		||||
                content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
 | 
			
		||||
                var response = _httpClient.PostAsync($"{_hostAndPort}/administration/raft/appendEntries", content).GetAwaiter().GetResult();
 | 
			
		||||
                if(response.IsSuccessStatusCode)
 | 
			
		||||
                {
 | 
			
		||||
                    return JsonConvert.DeserializeObject<AppendEntriesResponse>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(),_jsonSerializerSettings);
 | 
			
		||||
                }
 | 
			
		||||
                else
 | 
			
		||||
                {
 | 
			
		||||
                    return new AppendEntriesResponse(appendEntries.Term, false);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            catch(Exception ex)
 | 
			
		||||
            {
 | 
			
		||||
                Console.WriteLine(ex);
 | 
			
		||||
                return new AppendEntriesResponse(appendEntries.Term, false);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public Response<T> Request<T>(T command)
 | 
			
		||||
            where T : ICommand
 | 
			
		||||
        {
 | 
			
		||||
            Console.WriteLine("SENDING REQUEST....");
 | 
			
		||||
            if(_token == null)
 | 
			
		||||
            {
 | 
			
		||||
                SetToken();
 | 
			
		||||
            }   
 | 
			
		||||
 | 
			
		||||
            var json = JsonConvert.SerializeObject(command, _jsonSerializerSettings);
 | 
			
		||||
            var content = new StringContent(json);
 | 
			
		||||
            content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
 | 
			
		||||
            var response = _httpClient.PostAsync($"{_hostAndPort}/administration/raft/command", content).GetAwaiter().GetResult();
 | 
			
		||||
            if(response.IsSuccessStatusCode)
 | 
			
		||||
            {
 | 
			
		||||
                Console.WriteLine("REQUEST OK....");
 | 
			
		||||
                var okResponse = JsonConvert.DeserializeObject<OkResponse<ICommand>>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), _jsonSerializerSettings);
 | 
			
		||||
                return new OkResponse<T>((T)okResponse.Command);
 | 
			
		||||
            }
 | 
			
		||||
            else 
 | 
			
		||||
            {
 | 
			
		||||
                Console.WriteLine("REQUEST NOT OK....");
 | 
			
		||||
                return new ErrorResponse<T>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), command);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        private void SetToken()
 | 
			
		||||
        {
 | 
			
		||||
            var tokenUrl = $"{_baseSchemeUrlAndPort}{_config.AdministrationPath}/connect/token";
 | 
			
		||||
            var formData = new List<KeyValuePair<string, string>>
 | 
			
		||||
            {
 | 
			
		||||
                new KeyValuePair<string, string>("client_id", _identityServerConfiguration.ApiName),
 | 
			
		||||
                new KeyValuePair<string, string>("client_secret", _identityServerConfiguration.ApiSecret),
 | 
			
		||||
                new KeyValuePair<string, string>("scope", _identityServerConfiguration.ApiName),
 | 
			
		||||
                new KeyValuePair<string, string>("grant_type", "client_credentials")
 | 
			
		||||
            };
 | 
			
		||||
            var content = new FormUrlEncodedContent(formData);
 | 
			
		||||
            var response = _httpClient.PostAsync(tokenUrl, content).GetAwaiter().GetResult();
 | 
			
		||||
            var responseContent = response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
 | 
			
		||||
            response.EnsureSuccessStatusCode();
 | 
			
		||||
            _token = JsonConvert.DeserializeObject<BearerToken>(responseContent);
 | 
			
		||||
            _httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue(_token.TokenType, _token.AccessToken);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user