diff --git a/src/Ocelot/Infrastructure/IBus.cs b/src/Ocelot/Infrastructure/IBus.cs new file mode 100644 index 00000000..7d4e82d1 --- /dev/null +++ b/src/Ocelot/Infrastructure/IBus.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Ocelot.Infrastructure +{ + public interface IBus + { + void Subscribe(Action action); + Task Publish(T message, int delay); + } +} diff --git a/src/Ocelot/Infrastructure/InMemoryBus.cs b/src/Ocelot/Infrastructure/InMemoryBus.cs index e69de29b..7d2b6836 100644 --- a/src/Ocelot/Infrastructure/InMemoryBus.cs +++ b/src/Ocelot/Infrastructure/InMemoryBus.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Ocelot.Infrastructure +{ + public class InMemoryBus : IBus + { + private readonly BlockingCollection _queue; + private readonly List> _subscriptions; + private Thread _processing; + + public InMemoryBus() + { + _queue = new BlockingCollection(); + _subscriptions = new List>(); + _processing = new Thread(Process); + _processing.Start(); + } + + public void Subscribe(Action action) + { + _subscriptions.Add(action); + } + + public async Task Publish(T message, int delay) + { + await Task.Delay(delay); + _queue.Add(message); + } + + private void Process() + { + foreach(var message in _queue.GetConsumingEnumerable()) + { + foreach(var subscription in _subscriptions) + { + subscription(message); + } + } + } + } +} diff --git a/src/Ocelot/LoadBalancer/LoadBalancers/CookieStickySessions.cs b/src/Ocelot/LoadBalancer/LoadBalancers/CookieStickySessions.cs index 49af2079..25411e88 100644 --- a/src/Ocelot/LoadBalancer/LoadBalancers/CookieStickySessions.cs +++ b/src/Ocelot/LoadBalancer/LoadBalancers/CookieStickySessions.cs @@ -6,6 +6,7 @@ namespace Ocelot.LoadBalancer.LoadBalancers using System.Linq; using System.Threading; using System.Threading.Tasks; + using Ocelot.Infrastructure; using Ocelot.Middleware; using Responses; using Values; @@ -16,28 +17,29 @@ namespace Ocelot.LoadBalancer.LoadBalancers private readonly string _key; private readonly ILoadBalancer _loadBalancer; private readonly ConcurrentDictionary _stored; - private readonly Timer _timer; - private bool _expiring; + private IBus _bus; + private readonly object _lock = new object(); - public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs) + public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs, IBus bus) { + _bus = bus; _key = key; _keyExpiryInMs = keyExpiryInMs; _loadBalancer = loadBalancer; _stored = new ConcurrentDictionary(); - _timer = new Timer(x => - { - if (_expiring) + _bus.Subscribe(ss => { + if(_stored.TryGetValue(ss.Key, out var stickySession)) { - return; + lock(_lock) + { + if(stickySession.Expiry < DateTime.Now) + { + _stored.Remove(stickySession.Key, out _); + _loadBalancer.Release(stickySession.HostAndPort); + } + } } - - _expiring = true; - - Expire(); - - _expiring = false; - }, null, 0, 50); + }); } public void Dispose() @@ -47,15 +49,17 @@ namespace Ocelot.LoadBalancer.LoadBalancers public async Task> Lease(DownstreamContext context) { - var value = context.HttpContext.Request.Cookies[_key]; - - if (!string.IsNullOrEmpty(value) && _stored.ContainsKey(value)) + var key = context.HttpContext.Request.Cookies[_key]; + + if (!string.IsNullOrEmpty(key) && _stored.ContainsKey(key)) { - var cached = _stored[value]; + var cached = _stored[key]; - var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs)); + var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key); - _stored[value] = updated; + _stored[key] = updated; + + await _bus.Publish(updated, _keyExpiryInMs); return new OkResponse(updated.HostAndPort); } @@ -67,9 +71,11 @@ namespace Ocelot.LoadBalancer.LoadBalancers return new ErrorResponse(next.Errors); } - if (!string.IsNullOrEmpty(value) && !_stored.ContainsKey(value)) + if (!string.IsNullOrEmpty(key) && !_stored.ContainsKey(key)) { - _stored[value] = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs)); + var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key); + _stored[key] = ss; + await _bus.Publish(ss, _keyExpiryInMs); } return new OkResponse(next.Data); @@ -78,16 +84,5 @@ namespace Ocelot.LoadBalancer.LoadBalancers public void Release(ServiceHostAndPort hostAndPort) { } - - private void Expire() - { - var expired = _stored.Where(x => x.Value.Expiry < DateTime.UtcNow); - - foreach (var expire in expired) - { - _stored.Remove(expire.Key, out _); - _loadBalancer.Release(expire.Value.HostAndPort); - } - } } } diff --git a/src/Ocelot/LoadBalancer/LoadBalancers/LoadBalancerFactory.cs b/src/Ocelot/LoadBalancer/LoadBalancers/LoadBalancerFactory.cs index 1f0ecc72..58f6ff4e 100644 --- a/src/Ocelot/LoadBalancer/LoadBalancers/LoadBalancerFactory.cs +++ b/src/Ocelot/LoadBalancer/LoadBalancers/LoadBalancerFactory.cs @@ -1,5 +1,6 @@ using System.Threading.Tasks; using Ocelot.Configuration; +using Ocelot.Infrastructure; using Ocelot.ServiceDiscovery; namespace Ocelot.LoadBalancer.LoadBalancers @@ -25,7 +26,8 @@ namespace Ocelot.LoadBalancer.LoadBalancers return new LeastConnection(async () => await serviceProvider.Get(), reRoute.ServiceName); case nameof(CookieStickySessions): var loadBalancer = new RoundRobin(async () => await serviceProvider.Get()); - return new CookieStickySessions(loadBalancer, reRoute.LoadBalancerOptions.Key, reRoute.LoadBalancerOptions.ExpiryInMs); + var bus = new InMemoryBus(); + return new CookieStickySessions(loadBalancer, reRoute.LoadBalancerOptions.Key, reRoute.LoadBalancerOptions.ExpiryInMs, bus); default: return new NoLoadBalancer(await serviceProvider.Get()); } diff --git a/src/Ocelot/LoadBalancer/LoadBalancers/StickySession.cs b/src/Ocelot/LoadBalancer/LoadBalancers/StickySession.cs index 152bdf4b..bce476c0 100644 --- a/src/Ocelot/LoadBalancer/LoadBalancers/StickySession.cs +++ b/src/Ocelot/LoadBalancer/LoadBalancers/StickySession.cs @@ -5,14 +5,17 @@ namespace Ocelot.LoadBalancer.LoadBalancers { public class StickySession { - public StickySession(ServiceHostAndPort hostAndPort, DateTime expiry) + public StickySession(ServiceHostAndPort hostAndPort, DateTime expiry, string key) { HostAndPort = hostAndPort; Expiry = expiry; + Key = key; } public ServiceHostAndPort HostAndPort { get; } public DateTime Expiry { get; } + + public string Key {get;} } } diff --git a/test/Ocelot.UnitTests/Infrastructure/InMemoryBusTests.cs b/test/Ocelot.UnitTests/Infrastructure/InMemoryBusTests.cs index 918cfb9f..81cae7c6 100644 --- a/test/Ocelot.UnitTests/Infrastructure/InMemoryBusTests.cs +++ b/test/Ocelot.UnitTests/Infrastructure/InMemoryBusTests.cs @@ -1,4 +1,5 @@ using System.Threading.Tasks; +using Ocelot.Infrastructure; using Shouldly; using Xunit; diff --git a/test/Ocelot.UnitTests/LoadBalancer/CookieStickySessionsTests.cs b/test/Ocelot.UnitTests/LoadBalancer/CookieStickySessionsTests.cs index ff908123..0d6437cb 100644 --- a/test/Ocelot.UnitTests/LoadBalancer/CookieStickySessionsTests.cs +++ b/test/Ocelot.UnitTests/LoadBalancer/CookieStickySessionsTests.cs @@ -14,6 +14,7 @@ namespace Ocelot.UnitTests.LoadBalancer using Ocelot.Middleware; using Ocelot.UnitTests.Responder; using TestStack.BDDfy; + using Ocelot.Infrastructure; public class CookieStickySessionsTests { @@ -24,12 +25,14 @@ namespace Ocelot.UnitTests.LoadBalancer private Response _result; private Response _firstHostAndPort; private Response _secondHostAndPort; + private IBus _bus; public CookieStickySessionsTests() { + _bus = new InMemoryBus(); _loadBalancer = new Mock(); _defaultExpiryInMs = 100; - _stickySessions = new CookieStickySessions(_loadBalancer.Object, "sessionid", _defaultExpiryInMs); + _stickySessions = new CookieStickySessions(_loadBalancer.Object, "sessionid", _defaultExpiryInMs, _bus); _downstreamContext = new DownstreamContext(new DefaultHttpContext()); }