more messing with send ot self

This commit is contained in:
Tom Gardham-Pallister 2018-05-05 12:53:19 +01:00
parent 17a515c4c0
commit c041d90e38
7 changed files with 99 additions and 36 deletions

View File

@ -0,0 +1,14 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Ocelot.Infrastructure
{
public interface IBus<T>
{
void Subscribe(Action<T> action);
Task Publish(T message, int delay);
}
}

View File

@ -0,0 +1,45 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Ocelot.Infrastructure
{
public class InMemoryBus<T> : IBus<T>
{
private readonly BlockingCollection<T> _queue;
private readonly List<Action<T>> _subscriptions;
private Thread _processing;
public InMemoryBus()
{
_queue = new BlockingCollection<T>();
_subscriptions = new List<Action<T>>();
_processing = new Thread(Process);
_processing.Start();
}
public void Subscribe(Action<T> action)
{
_subscriptions.Add(action);
}
public async Task Publish(T message, int delay)
{
await Task.Delay(delay);
_queue.Add(message);
}
private void Process()
{
foreach(var message in _queue.GetConsumingEnumerable())
{
foreach(var subscription in _subscriptions)
{
subscription(message);
}
}
}
}
}

View File

@ -6,6 +6,7 @@ namespace Ocelot.LoadBalancer.LoadBalancers
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Ocelot.Infrastructure;
using Ocelot.Middleware;
using Responses;
using Values;
@ -16,28 +17,29 @@ namespace Ocelot.LoadBalancer.LoadBalancers
private readonly string _key;
private readonly ILoadBalancer _loadBalancer;
private readonly ConcurrentDictionary<string, StickySession> _stored;
private readonly Timer _timer;
private bool _expiring;
private IBus<StickySession> _bus;
private readonly object _lock = new object();
public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs)
public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs, IBus<StickySession> bus)
{
_bus = bus;
_key = key;
_keyExpiryInMs = keyExpiryInMs;
_loadBalancer = loadBalancer;
_stored = new ConcurrentDictionary<string, StickySession>();
_timer = new Timer(x =>
_bus.Subscribe(ss => {
if(_stored.TryGetValue(ss.Key, out var stickySession))
{
if (_expiring)
lock(_lock)
{
return;
if(stickySession.Expiry < DateTime.Now)
{
_stored.Remove(stickySession.Key, out _);
_loadBalancer.Release(stickySession.HostAndPort);
}
_expiring = true;
Expire();
_expiring = false;
}, null, 0, 50);
}
}
});
}
public void Dispose()
@ -47,15 +49,17 @@ namespace Ocelot.LoadBalancer.LoadBalancers
public async Task<Response<ServiceHostAndPort>> Lease(DownstreamContext context)
{
var value = context.HttpContext.Request.Cookies[_key];
var key = context.HttpContext.Request.Cookies[_key];
if (!string.IsNullOrEmpty(value) && _stored.ContainsKey(value))
if (!string.IsNullOrEmpty(key) && _stored.ContainsKey(key))
{
var cached = _stored[value];
var cached = _stored[key];
var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs));
var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
_stored[value] = updated;
_stored[key] = updated;
await _bus.Publish(updated, _keyExpiryInMs);
return new OkResponse<ServiceHostAndPort>(updated.HostAndPort);
}
@ -67,9 +71,11 @@ namespace Ocelot.LoadBalancer.LoadBalancers
return new ErrorResponse<ServiceHostAndPort>(next.Errors);
}
if (!string.IsNullOrEmpty(value) && !_stored.ContainsKey(value))
if (!string.IsNullOrEmpty(key) && !_stored.ContainsKey(key))
{
_stored[value] = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs));
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
_stored[key] = ss;
await _bus.Publish(ss, _keyExpiryInMs);
}
return new OkResponse<ServiceHostAndPort>(next.Data);
@ -78,16 +84,5 @@ namespace Ocelot.LoadBalancer.LoadBalancers
public void Release(ServiceHostAndPort hostAndPort)
{
}
private void Expire()
{
var expired = _stored.Where(x => x.Value.Expiry < DateTime.UtcNow);
foreach (var expire in expired)
{
_stored.Remove(expire.Key, out _);
_loadBalancer.Release(expire.Value.HostAndPort);
}
}
}
}

View File

@ -1,5 +1,6 @@
using System.Threading.Tasks;
using Ocelot.Configuration;
using Ocelot.Infrastructure;
using Ocelot.ServiceDiscovery;
namespace Ocelot.LoadBalancer.LoadBalancers
@ -25,7 +26,8 @@ namespace Ocelot.LoadBalancer.LoadBalancers
return new LeastConnection(async () => await serviceProvider.Get(), reRoute.ServiceName);
case nameof(CookieStickySessions):
var loadBalancer = new RoundRobin(async () => await serviceProvider.Get());
return new CookieStickySessions(loadBalancer, reRoute.LoadBalancerOptions.Key, reRoute.LoadBalancerOptions.ExpiryInMs);
var bus = new InMemoryBus<StickySession>();
return new CookieStickySessions(loadBalancer, reRoute.LoadBalancerOptions.Key, reRoute.LoadBalancerOptions.ExpiryInMs, bus);
default:
return new NoLoadBalancer(await serviceProvider.Get());
}

View File

@ -5,14 +5,17 @@ namespace Ocelot.LoadBalancer.LoadBalancers
{
public class StickySession
{
public StickySession(ServiceHostAndPort hostAndPort, DateTime expiry)
public StickySession(ServiceHostAndPort hostAndPort, DateTime expiry, string key)
{
HostAndPort = hostAndPort;
Expiry = expiry;
Key = key;
}
public ServiceHostAndPort HostAndPort { get; }
public DateTime Expiry { get; }
public string Key {get;}
}
}

View File

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using Ocelot.Infrastructure;
using Shouldly;
using Xunit;

View File

@ -14,6 +14,7 @@ namespace Ocelot.UnitTests.LoadBalancer
using Ocelot.Middleware;
using Ocelot.UnitTests.Responder;
using TestStack.BDDfy;
using Ocelot.Infrastructure;
public class CookieStickySessionsTests
{
@ -24,12 +25,14 @@ namespace Ocelot.UnitTests.LoadBalancer
private Response<ServiceHostAndPort> _result;
private Response<ServiceHostAndPort> _firstHostAndPort;
private Response<ServiceHostAndPort> _secondHostAndPort;
private IBus<StickySession> _bus;
public CookieStickySessionsTests()
{
_bus = new InMemoryBus<StickySession>();
_loadBalancer = new Mock<ILoadBalancer>();
_defaultExpiryInMs = 100;
_stickySessions = new CookieStickySessions(_loadBalancer.Object, "sessionid", _defaultExpiryInMs);
_stickySessions = new CookieStickySessions(_loadBalancer.Object, "sessionid", _defaultExpiryInMs, _bus);
_downstreamContext = new DownstreamContext(new DefaultHttpContext());
}