mirror of
https://github.com/nsnail/Ocelot.git
synced 2025-04-22 14:02:49 +08:00
implemented a send to self pattern for sticky session timeouts rather than a normal timer
This commit is contained in:
parent
c041d90e38
commit
fb3af754ab
15
src/Ocelot/Infrastructure/DelayedMessage.cs
Normal file
15
src/Ocelot/Infrastructure/DelayedMessage.cs
Normal file
@ -0,0 +1,15 @@
|
||||
namespace Ocelot.Infrastructure
|
||||
{
|
||||
internal class DelayedMessage<T>
|
||||
{
|
||||
public DelayedMessage(T message, int delay)
|
||||
{
|
||||
Delay = delay;
|
||||
Message = message;
|
||||
}
|
||||
|
||||
public T Message { get; set; }
|
||||
|
||||
public int Delay { get; set; }
|
||||
}
|
||||
}
|
@ -1,14 +1,10 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Ocelot.Infrastructure
|
||||
{
|
||||
public interface IBus<T>
|
||||
{
|
||||
void Subscribe(Action<T> action);
|
||||
Task Publish(T message, int delay);
|
||||
void Publish(T message, int delay);
|
||||
}
|
||||
}
|
||||
|
@ -8,15 +8,15 @@ namespace Ocelot.Infrastructure
|
||||
{
|
||||
public class InMemoryBus<T> : IBus<T>
|
||||
{
|
||||
private readonly BlockingCollection<T> _queue;
|
||||
private readonly BlockingCollection<DelayedMessage<T>> _queue;
|
||||
private readonly List<Action<T>> _subscriptions;
|
||||
private Thread _processing;
|
||||
|
||||
public InMemoryBus()
|
||||
{
|
||||
_queue = new BlockingCollection<T>();
|
||||
_queue = new BlockingCollection<DelayedMessage<T>>();
|
||||
_subscriptions = new List<Action<T>>();
|
||||
_processing = new Thread(Process);
|
||||
_processing = new Thread(async () => await Process());
|
||||
_processing.Start();
|
||||
}
|
||||
|
||||
@ -25,19 +25,21 @@ namespace Ocelot.Infrastructure
|
||||
_subscriptions.Add(action);
|
||||
}
|
||||
|
||||
public async Task Publish(T message, int delay)
|
||||
public void Publish(T message, int delay)
|
||||
{
|
||||
await Task.Delay(delay);
|
||||
_queue.Add(message);
|
||||
var delayed = new DelayedMessage<T>(message, delay);
|
||||
_queue.Add(delayed);
|
||||
}
|
||||
|
||||
private void Process()
|
||||
private async Task Process()
|
||||
{
|
||||
foreach(var message in _queue.GetConsumingEnumerable())
|
||||
foreach(var delayedMessage in _queue.GetConsumingEnumerable())
|
||||
{
|
||||
foreach(var subscription in _subscriptions)
|
||||
await Task.Delay(delayedMessage.Delay);
|
||||
|
||||
foreach (var subscription in _subscriptions)
|
||||
{
|
||||
subscription(message);
|
||||
subscription(delayedMessage.Message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ namespace Ocelot.LoadBalancer.LoadBalancers
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Ocelot.Infrastructure;
|
||||
@ -11,13 +10,13 @@ namespace Ocelot.LoadBalancer.LoadBalancers
|
||||
using Responses;
|
||||
using Values;
|
||||
|
||||
public class CookieStickySessions : ILoadBalancer, IDisposable
|
||||
public class CookieStickySessions : ILoadBalancer
|
||||
{
|
||||
private readonly int _keyExpiryInMs;
|
||||
private readonly string _key;
|
||||
private readonly ILoadBalancer _loadBalancer;
|
||||
private readonly ConcurrentDictionary<string, StickySession> _stored;
|
||||
private IBus<StickySession> _bus;
|
||||
private readonly IBus<StickySession> _bus;
|
||||
private readonly object _lock = new object();
|
||||
|
||||
public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs, IBus<StickySession> bus)
|
||||
@ -27,12 +26,14 @@ namespace Ocelot.LoadBalancer.LoadBalancers
|
||||
_keyExpiryInMs = keyExpiryInMs;
|
||||
_loadBalancer = loadBalancer;
|
||||
_stored = new ConcurrentDictionary<string, StickySession>();
|
||||
_bus.Subscribe(ss => {
|
||||
if(_stored.TryGetValue(ss.Key, out var stickySession))
|
||||
_bus.Subscribe(ss =>
|
||||
{
|
||||
//todo - get test coverage for this.
|
||||
if (_stored.TryGetValue(ss.Key, out var stickySession))
|
||||
{
|
||||
lock(_lock)
|
||||
lock (_lock)
|
||||
{
|
||||
if(stickySession.Expiry < DateTime.Now)
|
||||
if (stickySession.Expiry < DateTime.UtcNow)
|
||||
{
|
||||
_stored.Remove(stickySession.Key, out _);
|
||||
_loadBalancer.Release(stickySession.HostAndPort);
|
||||
@ -42,26 +43,24 @@ namespace Ocelot.LoadBalancer.LoadBalancers
|
||||
});
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_timer?.Dispose();
|
||||
}
|
||||
|
||||
public async Task<Response<ServiceHostAndPort>> Lease(DownstreamContext context)
|
||||
{
|
||||
var key = context.HttpContext.Request.Cookies[_key];
|
||||
|
||||
if (!string.IsNullOrEmpty(key) && _stored.ContainsKey(key))
|
||||
lock (_lock)
|
||||
{
|
||||
var cached = _stored[key];
|
||||
if (!string.IsNullOrEmpty(key) && _stored.ContainsKey(key))
|
||||
{
|
||||
var cached = _stored[key];
|
||||
|
||||
var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
|
||||
var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
|
||||
|
||||
_stored[key] = updated;
|
||||
_stored[key] = updated;
|
||||
|
||||
await _bus.Publish(updated, _keyExpiryInMs);
|
||||
_bus.Publish(updated, _keyExpiryInMs);
|
||||
|
||||
return new OkResponse<ServiceHostAndPort>(updated.HostAndPort);
|
||||
return new OkResponse<ServiceHostAndPort>(updated.HostAndPort);
|
||||
}
|
||||
}
|
||||
|
||||
var next = await _loadBalancer.Lease(context);
|
||||
@ -71,11 +70,14 @@ namespace Ocelot.LoadBalancer.LoadBalancers
|
||||
return new ErrorResponse<ServiceHostAndPort>(next.Errors);
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(key) && !_stored.ContainsKey(key))
|
||||
lock (_lock)
|
||||
{
|
||||
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
|
||||
_stored[key] = ss;
|
||||
await _bus.Publish(ss, _keyExpiryInMs);
|
||||
if (!string.IsNullOrEmpty(key) && !_stored.ContainsKey(key))
|
||||
{
|
||||
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
|
||||
_stored[key] = ss;
|
||||
_bus.Publish(ss, _keyExpiryInMs);
|
||||
}
|
||||
}
|
||||
|
||||
return new OkResponse<ServiceHostAndPort>(next.Data);
|
||||
|
@ -99,7 +99,7 @@ namespace Ocelot.Raft
|
||||
var response = _httpClient.PostAsync($"{_hostAndPort}/administration/raft/command", content).GetAwaiter().GetResult();
|
||||
if(response.IsSuccessStatusCode)
|
||||
{
|
||||
var okResponse = JsonConvert.DeserializeObject<OkResponse<ICommand>>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), _jsonSerializerSettings);
|
||||
var okResponse = JsonConvert.DeserializeObject<OkResponse<ICommand>>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), _jsonSerializerSettings);
|
||||
return new OkResponse<T>((T)okResponse.Command);
|
||||
}
|
||||
else
|
||||
|
@ -701,7 +701,7 @@ namespace Ocelot.AcceptanceTests
|
||||
Task.WaitAll(tasks);
|
||||
}
|
||||
|
||||
public async Task WhenIGetUrlOnTheApiGatewayMultipleTimes(string url, int times, string cookie, string value)
|
||||
public void WhenIGetUrlOnTheApiGatewayMultipleTimes(string url, int times, string cookie, string value)
|
||||
{
|
||||
var tasks = new Task[times];
|
||||
|
||||
|
@ -76,16 +76,16 @@ namespace Ocelot.Benchmarks
|
||||
response.EnsureSuccessStatusCode();
|
||||
}
|
||||
|
||||
// * Summary*
|
||||
// BenchmarkDotNet = v0.10.13, OS = macOS 10.12.6 (16G1212) [Darwin 16.7.0]
|
||||
// Intel Core i5-4278U CPU 2.60GHz(Haswell), 1 CPU, 4 logical cores and 2 physical cores
|
||||
//.NET Core SDK = 2.1.4
|
||||
/* * Summary*
|
||||
BenchmarkDotNet = v0.10.13, OS = macOS 10.12.6 (16G1212) [Darwin 16.7.0]
|
||||
Intel Core i5-4278U CPU 2.60GHz(Haswell), 1 CPU, 4 logical cores and 2 physical cores
|
||||
.NET Core SDK = 2.1.4
|
||||
|
||||
// [Host] : .NET Core 2.0.6 (CoreCLR 4.6.0.0, CoreFX 4.6.26212.01), 64bit RyuJIT
|
||||
// DefaultJob : .NET Core 2.0.6 (CoreCLR 4.6.0.0, CoreFX 4.6.26212.01), 64bit RyuJIT
|
||||
// Method | Mean | Error | StdDev | StdErr | Min | Q1 | Median | Q3 | Max | Op/s | Scaled | Gen 0 | Gen 1 | Allocated |
|
||||
// --------- |---------:|----------:|----------:|----------:|---------:|---------:|---------:|---------:|---------:|------:|-------:|--------:|-------:|----------:|
|
||||
// Baseline | 2.102 ms | 0.0292 ms | 0.0273 ms | 0.0070 ms | 2.063 ms | 2.080 ms | 2.093 ms | 2.122 ms | 2.152 ms | 475.8 | 1.00 | 31.2500 | 3.9063 | 1.63 KB |
|
||||
[Host] : .NET Core 2.0.6 (CoreCLR 4.6.0.0, CoreFX 4.6.26212.01), 64bit RyuJIT
|
||||
DefaultJob : .NET Core 2.0.6 (CoreCLR 4.6.0.0, CoreFX 4.6.26212.01), 64bit RyuJIT
|
||||
Method | Mean | Error | StdDev | StdErr | Min | Q1 | Median | Q3 | Max | Op/s | Scaled | Gen 0 | Gen 1 | Allocated |
|
||||
--------- |---------:|----------:|----------:|----------:|---------:|---------:|---------:|---------:|---------:|------:|-------:|--------:|-------:|----------:|
|
||||
Baseline | 2.102 ms | 0.0292 ms | 0.0273 ms | 0.0070 ms | 2.063 ms | 2.080 ms | 2.093 ms | 2.122 ms | 2.152 ms | 475.8 | 1.00 | 31.2500 | 3.9063 | 1.63 KB |*/
|
||||
|
||||
private void GivenOcelotIsRunning(string url)
|
||||
{
|
||||
|
@ -281,11 +281,6 @@ namespace Ocelot.IntegrationTests
|
||||
_response = _httpClient.PostAsync(url, content).Result;
|
||||
var responseContent = _response.Content.ReadAsStringAsync().Result;
|
||||
|
||||
//Console.ForegroundColor = ConsoleColor.Green;
|
||||
//Console.WriteLine(responseContent);
|
||||
//Console.WriteLine(_response.StatusCode);
|
||||
//Console.ForegroundColor = ConsoleColor.White;
|
||||
|
||||
if(responseContent == "There was a problem. This error message sucks raise an issue in GitHub.")
|
||||
{
|
||||
return false;
|
||||
@ -330,12 +325,13 @@ namespace Ocelot.IntegrationTests
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
_token = JsonConvert.DeserializeObject<BearerToken>(responseContent);
|
||||
var configPath = $"{adminPath}/.well-known/openid-configuration";
|
||||
response = _httpClient.GetAsync(configPath).Result;
|
||||
return response.IsSuccessStatusCode;
|
||||
}
|
||||
catch(Exception e)
|
||||
catch(Exception)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@ -343,7 +339,6 @@ namespace Ocelot.IntegrationTests
|
||||
|
||||
var addToken = WaitFor(20000).Until(() => AddToken());
|
||||
addToken.ShouldBeTrue();
|
||||
|
||||
}
|
||||
|
||||
private void GivenThereIsAConfiguration(FileConfiguration fileConfiguration)
|
||||
|
@ -7,11 +7,11 @@ namespace Ocelot.UnitTests.Infrastructure
|
||||
{
|
||||
public class InMemoryBusTests
|
||||
{
|
||||
private InMemoryBus<Message> _bus;
|
||||
private readonly InMemoryBus<object> _bus;
|
||||
|
||||
public InMemoryBusTests()
|
||||
{
|
||||
_bus = new InMemoryBus<Message>();
|
||||
_bus = new InMemoryBus<object>();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
@ -21,26 +21,20 @@ namespace Ocelot.UnitTests.Infrastructure
|
||||
_bus.Subscribe(x => {
|
||||
called = true;
|
||||
});
|
||||
await _bus.Publish(new Message(), 1);
|
||||
_bus.Publish(new object(), 1);
|
||||
await Task.Delay(10);
|
||||
called.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task should_not_be_publish_yet_as_no_delay_in_caller()
|
||||
public void should_not_be_publish_yet_as_no_delay_in_caller()
|
||||
{
|
||||
var called = false;
|
||||
_bus.Subscribe(x => {
|
||||
called = true;
|
||||
});
|
||||
await _bus.Publish(new Message(), 1);
|
||||
_bus.Publish(new object(), 1);
|
||||
called.ShouldBeFalse();
|
||||
}
|
||||
|
||||
|
||||
class Message
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
namespace Ocelot.UnitTests.LoadBalancer
|
||||
{
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Ocelot.LoadBalancer.LoadBalancers;
|
||||
using Ocelot.Responses;
|
||||
@ -25,13 +26,13 @@ namespace Ocelot.UnitTests.LoadBalancer
|
||||
private Response<ServiceHostAndPort> _result;
|
||||
private Response<ServiceHostAndPort> _firstHostAndPort;
|
||||
private Response<ServiceHostAndPort> _secondHostAndPort;
|
||||
private IBus<StickySession> _bus;
|
||||
private readonly FakeBus<StickySession> _bus;
|
||||
|
||||
public CookieStickySessionsTests()
|
||||
{
|
||||
_bus = new InMemoryBus<StickySession>();
|
||||
_bus = new FakeBus<StickySession>();
|
||||
_loadBalancer = new Mock<ILoadBalancer>();
|
||||
_defaultExpiryInMs = 100;
|
||||
_defaultExpiryInMs = 0;
|
||||
_stickySessions = new CookieStickySessions(_loadBalancer.Object, "sessionid", _defaultExpiryInMs, _bus);
|
||||
_downstreamContext = new DownstreamContext(new DefaultHttpContext());
|
||||
}
|
||||
@ -52,6 +53,7 @@ namespace Ocelot.UnitTests.LoadBalancer
|
||||
.And(_ => GivenTheDownstreamRequestHasSessionId("321"))
|
||||
.When(_ => WhenILeaseTwiceInARow())
|
||||
.Then(_ => ThenTheFirstAndSecondResponseAreTheSame())
|
||||
.And(_ => ThenTheStickySessionWillTimeout())
|
||||
.BDDfy();
|
||||
}
|
||||
|
||||
@ -73,94 +75,12 @@ namespace Ocelot.UnitTests.LoadBalancer
|
||||
.BDDfy();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void should_expire_sticky_session()
|
||||
{
|
||||
this.Given(_ => GivenTheLoadBalancerReturnsSequence())
|
||||
.When(_ => WhenTheStickySessionExpires())
|
||||
.Then(_ => ThenANewHostAndPortIsReturned())
|
||||
.BDDfy();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void should_refresh_sticky_session()
|
||||
{
|
||||
this.Given(_ => GivenTheLoadBalancerReturnsSequence())
|
||||
.When(_ => WhenIMakeRequestsToKeepRefreshingTheSession())
|
||||
.Then(_ => ThenTheSessionIsRefreshed())
|
||||
.BDDfy();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void should_dispose()
|
||||
{
|
||||
_stickySessions.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void should_release()
|
||||
{
|
||||
_stickySessions.Release(new ServiceHostAndPort("", 0));
|
||||
}
|
||||
|
||||
private async Task ThenTheSessionIsRefreshed()
|
||||
{
|
||||
var postExpireHostAndPort = await _stickySessions.Lease(_downstreamContext);
|
||||
postExpireHostAndPort.Data.DownstreamHost.ShouldBe("one");
|
||||
postExpireHostAndPort.Data.DownstreamPort.ShouldBe(80);
|
||||
|
||||
_loadBalancer
|
||||
.Verify(x => x.Lease(It.IsAny<DownstreamContext>()), Times.Once);
|
||||
}
|
||||
|
||||
private async Task WhenIMakeRequestsToKeepRefreshingTheSession()
|
||||
{
|
||||
var context = new DefaultHttpContext();
|
||||
var cookies = new FakeCookies();
|
||||
cookies.AddCookie("sessionid", "321");
|
||||
context.Request.Cookies = cookies;
|
||||
_downstreamContext = new DownstreamContext(context);
|
||||
|
||||
var firstHostAndPort = await _stickySessions.Lease(_downstreamContext);
|
||||
firstHostAndPort.Data.DownstreamHost.ShouldBe("one");
|
||||
firstHostAndPort.Data.DownstreamPort.ShouldBe(80);
|
||||
|
||||
Thread.Sleep(80);
|
||||
|
||||
var secondHostAndPort = await _stickySessions.Lease(_downstreamContext);
|
||||
secondHostAndPort.Data.DownstreamHost.ShouldBe("one");
|
||||
secondHostAndPort.Data.DownstreamPort.ShouldBe(80);
|
||||
|
||||
Thread.Sleep(80);
|
||||
}
|
||||
|
||||
private async Task ThenANewHostAndPortIsReturned()
|
||||
{
|
||||
var postExpireHostAndPort = await _stickySessions.Lease(_downstreamContext);
|
||||
postExpireHostAndPort.Data.DownstreamHost.ShouldBe("two");
|
||||
postExpireHostAndPort.Data.DownstreamPort.ShouldBe(80);
|
||||
}
|
||||
|
||||
private async Task WhenTheStickySessionExpires()
|
||||
{
|
||||
var context = new DefaultHttpContext();
|
||||
var cookies = new FakeCookies();
|
||||
cookies.AddCookie("sessionid", "321");
|
||||
context.Request.Cookies = cookies;
|
||||
_downstreamContext = new DownstreamContext(context);
|
||||
|
||||
var firstHostAndPort = await _stickySessions.Lease(_downstreamContext);
|
||||
var secondHostAndPort = await _stickySessions.Lease(_downstreamContext);
|
||||
|
||||
firstHostAndPort.Data.DownstreamHost.ShouldBe("one");
|
||||
firstHostAndPort.Data.DownstreamPort.ShouldBe(80);
|
||||
|
||||
secondHostAndPort.Data.DownstreamHost.ShouldBe("one");
|
||||
secondHostAndPort.Data.DownstreamPort.ShouldBe(80);
|
||||
|
||||
Thread.Sleep(300);
|
||||
}
|
||||
|
||||
private void ThenAnErrorIsReturned()
|
||||
{
|
||||
_result.IsError.ShouldBeTrue();
|
||||
@ -240,9 +160,14 @@ namespace Ocelot.UnitTests.LoadBalancer
|
||||
{
|
||||
_result.Data.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
private void ThenTheStickySessionWillTimeout()
|
||||
{
|
||||
_bus.Messages.Count.ShouldBe(2);
|
||||
}
|
||||
}
|
||||
|
||||
class FakeCookies : IRequestCookieCollection
|
||||
internal class FakeCookies : IRequestCookieCollection
|
||||
{
|
||||
private readonly Dictionary<string, string> _cookies = new Dictionary<string, string>();
|
||||
|
||||
@ -277,4 +202,23 @@ namespace Ocelot.UnitTests.LoadBalancer
|
||||
return _cookies.GetEnumerator();
|
||||
}
|
||||
}
|
||||
|
||||
internal class FakeBus<T> : IBus<T>
|
||||
{
|
||||
public FakeBus()
|
||||
{
|
||||
Messages = new List<T>();
|
||||
}
|
||||
|
||||
public List<T> Messages { get; }
|
||||
|
||||
public void Subscribe(Action<T> action)
|
||||
{
|
||||
}
|
||||
|
||||
public void Publish(T message, int delay)
|
||||
{
|
||||
Messages.Add(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user