Refactor CircuitBreaker Pattern

Add Qos Config in ReRoute And Refactor CircuitBreakingDelegatingHandler
This commit is contained in:
geffzhang
2017-02-03 14:49:46 +08:00
parent e80364a1f8
commit 883be802b3
28 changed files with 600 additions and 259 deletions

View File

@ -32,6 +32,9 @@ namespace Ocelot.Configuration.Builder
private string _downstreamScheme;
private string _downstreamHost;
private int _dsPort;
private int _exceptionsAllowedBeforeBreaking;
private int _durationOfBreak;
private int _timeoutValue;
public ReRouteBuilder()
{
@ -192,6 +195,24 @@ namespace Ocelot.Configuration.Builder
return this;
}
public ReRouteBuilder WithExceptionsAllowedBeforeBreaking(int exceptionsAllowedBeforeBreaking)
{
_exceptionsAllowedBeforeBreaking = exceptionsAllowedBeforeBreaking;
return this;
}
public ReRouteBuilder WithDurationOfBreak(int durationOfBreak)
{
_durationOfBreak = durationOfBreak;
return this;
}
public ReRouteBuilder WithTimeoutValue(int timeoutValue)
{
_timeoutValue = timeoutValue;
return this;
}
public ReRoute Build()
{
Func<HostAndPort> downstreamHostFunc = () => new HostAndPort(_downstreamHost, _dsPort);
@ -200,7 +221,8 @@ namespace Ocelot.Configuration.Builder
_isAuthenticated, new AuthenticationOptions(_authenticationProvider, _authenticationProviderUrl, _scopeName,
_requireHttps, _additionalScopes, _scopeSecret), _configHeaderExtractorProperties, _claimToClaims, _routeClaimRequirement,
_isAuthorised, _claimToQueries, _requestIdHeaderKey, _isCached, _fileCacheOptions, _serviceName,
_useServiceDiscovery, _serviceDiscoveryAddress, _serviceDiscoveryProvider, downstreamHostFunc, _downstreamScheme);
_useServiceDiscovery, _serviceDiscoveryAddress, _serviceDiscoveryProvider, downstreamHostFunc, _downstreamScheme,
_exceptionsAllowedBeforeBreaking,_durationOfBreak, _timeoutValue);
}
}
}

View File

@ -96,7 +96,6 @@ namespace Ocelot.Configuration.Creator
&& !string.IsNullOrEmpty(globalConfiguration?.ServiceDiscoveryProvider?.Address)
&& !string.IsNullOrEmpty(globalConfiguration?.ServiceDiscoveryProvider?.Provider);
Func<HostAndPort> downstreamHostAndPortFunc = () => new HostAndPort(reRoute.DownstreamHost.Trim('/'), reRoute.DownstreamPort);
if (isAuthenticated)
@ -116,7 +115,8 @@ namespace Ocelot.Configuration.Creator
reRoute.RouteClaimsRequirement, isAuthorised, claimsToQueries,
requestIdKey, isCached, new CacheOptions(reRoute.FileCacheOptions.TtlSeconds),
reRoute.ServiceName, useServiceDiscovery, globalConfiguration?.ServiceDiscoveryProvider?.Provider,
globalConfiguration?.ServiceDiscoveryProvider?.Address, downstreamHostAndPortFunc, reRoute.DownstreamScheme);
globalConfiguration?.ServiceDiscoveryProvider?.Address, downstreamHostAndPortFunc, reRoute.DownstreamScheme,
reRoute.ExceptionsAllowedBeforeBreaking, reRoute.DurationOfBreak, reRoute.TimeoutValue);
}
return new ReRoute(new DownstreamPathTemplate(reRoute.DownstreamPathTemplate), reRoute.UpstreamTemplate,
@ -125,7 +125,8 @@ namespace Ocelot.Configuration.Creator
reRoute.RouteClaimsRequirement, isAuthorised, new List<ClaimToThing>(),
requestIdKey, isCached, new CacheOptions(reRoute.FileCacheOptions.TtlSeconds),
reRoute.ServiceName, useServiceDiscovery, globalConfiguration?.ServiceDiscoveryProvider?.Provider,
globalConfiguration?.ServiceDiscoveryProvider?.Address, downstreamHostAndPortFunc, reRoute.DownstreamScheme);
globalConfiguration?.ServiceDiscoveryProvider?.Address, downstreamHostAndPortFunc, reRoute.DownstreamScheme,
reRoute.ExceptionsAllowedBeforeBreaking, reRoute.DurationOfBreak, reRoute.TimeoutValue);
}
private string BuildUpstreamTemplate(FileReRoute reRoute)

View File

@ -29,5 +29,8 @@ namespace Ocelot.Configuration.File
public string DownstreamScheme {get;set;}
public string DownstreamHost {get;set;}
public int DownstreamPort { get; set; }
public int ExceptionsAllowedBeforeBreaking { get; set; }
public int DurationOfBreak { get; set; }
public int TimeoutValue { get; set; }
}
}

View File

@ -10,7 +10,8 @@ namespace Ocelot.Configuration
bool isAuthenticated, AuthenticationOptions authenticationOptions, List<ClaimToThing> configurationHeaderExtractorProperties,
List<ClaimToThing> claimsToClaims, Dictionary<string, string> routeClaimsRequirement, bool isAuthorised, List<ClaimToThing> claimsToQueries,
string requestIdKey, bool isCached, CacheOptions fileCacheOptions, string serviceName, bool useServiceDiscovery,
string serviceDiscoveryProvider, string serviceDiscoveryAddress, Func<HostAndPort> downstreamHostAndPort, string downstreamScheme)
string serviceDiscoveryProvider, string serviceDiscoveryAddress, Func<HostAndPort> downstreamHostAndPort, string downstreamScheme,
int exceptionsAllowedBeforeBreaking =3, int durationofBreak =8, int timeoutValue = 5000)
{
DownstreamPathTemplate = downstreamPathTemplate;
UpstreamTemplate = upstreamTemplate;
@ -35,6 +36,9 @@ namespace Ocelot.Configuration
ServiceDiscoveryAddress = serviceDiscoveryAddress;
DownstreamHostAndPort = downstreamHostAndPort;
DownstreamScheme = downstreamScheme;
ExceptionsAllowedBeforeBreaking = exceptionsAllowedBeforeBreaking;
DurationOfBreak = durationofBreak;
TimeoutValue = timeoutValue;
}
public DownstreamPathTemplate DownstreamPathTemplate { get; private set; }
@ -57,5 +61,8 @@ namespace Ocelot.Configuration
public string ServiceDiscoveryAddress { get; private set;}
public Func<HostAndPort> DownstreamHostAndPort {get;private set;}
public string DownstreamScheme {get;private set;}
public int ExceptionsAllowedBeforeBreaking { get; private set; }
public int DurationOfBreak { get; private set; }
public int TimeoutValue { get; private set; }
}
}

View File

@ -15,7 +15,8 @@ namespace Ocelot.Request.Builder
IRequestCookieCollection cookies,
QueryString queryString,
string contentType,
RequestId.RequestId requestId)
RequestId.RequestId requestId,
Values.QoS qos)
{
var request = await new RequestBuilder()
.WithHttpMethod(httpMethod)
@ -26,6 +27,7 @@ namespace Ocelot.Request.Builder
.WithHeaders(headers)
.WithRequestId(requestId)
.WithCookies(cookies)
.WithQos(qos)
.Build();
return new OkResponse<Request>(request);

View File

@ -14,6 +14,7 @@ namespace Ocelot.Request.Builder
IRequestCookieCollection cookies,
QueryString queryString,
string contentType,
RequestId.RequestId requestId);
RequestId.RequestId requestId,
Values.QoS qos);
}
}

View File

@ -22,6 +22,7 @@ namespace Ocelot.Request.Builder
private RequestId.RequestId _requestId;
private IRequestCookieCollection _cookies;
private readonly string[] _unsupportedHeaders = {"host"};
private Values.QoS _qos;
public RequestBuilder WithHttpMethod(string httpMethod)
{
@ -71,6 +72,12 @@ namespace Ocelot.Request.Builder
return this;
}
public RequestBuilder WithQos(Values.QoS qos)
{
_qos = qos;
return this;
}
public async Task<Request> Build()
{
var uri = CreateUri();
@ -90,7 +97,7 @@ namespace Ocelot.Request.Builder
var cookieContainer = CreateCookieContainer(uri);
return new Request(httpRequestMessage, cookieContainer);
return new Request(httpRequestMessage, cookieContainer, _qos);
}
private Uri CreateUri()

View File

@ -32,7 +32,8 @@ namespace Ocelot.Request.Middleware
var buildResult = await _requestCreator
.Build(context.Request.Method, DownstreamUrl, context.Request.Body,
context.Request.Headers, context.Request.Cookies, context.Request.QueryString,
context.Request.ContentType, new RequestId.RequestId(DownstreamRoute?.ReRoute?.RequestIdKey, context.TraceIdentifier));
context.Request.ContentType, new RequestId.RequestId(DownstreamRoute?.ReRoute?.RequestIdKey, context.TraceIdentifier),
new Values.QoS(DownstreamRoute.ReRoute.ExceptionsAllowedBeforeBreaking, DownstreamRoute.ReRoute.DurationOfBreak, DownstreamRoute.ReRoute.TimeoutValue));
if (buildResult.IsError)
{

View File

@ -1,17 +1,20 @@
using System.Net;
using Ocelot.Values;
using System.Net;
using System.Net.Http;
namespace Ocelot.Request
{
public class Request
{
public Request(HttpRequestMessage httpRequestMessage, CookieContainer cookieContainer)
public Request(HttpRequestMessage httpRequestMessage, CookieContainer cookieContainer, QoS qos)
{
HttpRequestMessage = httpRequestMessage;
CookieContainer = cookieContainer;
Qos = qos;
}
public HttpRequestMessage HttpRequestMessage { get; private set; }
public CookieContainer CookieContainer { get; private set; }
public QoS Qos { get; private set; }
}
}

View File

@ -0,0 +1,74 @@
using Ocelot.Logging;
using Polly;
using Polly.CircuitBreaker;
using Polly.Timeout;
using System;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace Ocelot.Requester
{
public class CircuitBreakingDelegatingHandler : DelegatingHandler
{
private readonly IOcelotLogger _logger;
private readonly int _exceptionsAllowedBeforeBreaking;
private readonly TimeSpan _durationOfBreak;
private readonly Policy _circuitBreakerPolicy;
private readonly TimeoutPolicy _timeoutPolicy;
public CircuitBreakingDelegatingHandler(int exceptionsAllowedBeforeBreaking, TimeSpan durationOfBreak,TimeSpan timeoutValue
,TimeoutStrategy timeoutStrategy, IOcelotLogger logger, HttpMessageHandler innerHandler)
: base(innerHandler)
{
this._exceptionsAllowedBeforeBreaking = exceptionsAllowedBeforeBreaking;
this._durationOfBreak = durationOfBreak;
_circuitBreakerPolicy = Policy
.Handle<HttpRequestException>()
.Or<TimeoutRejectedException>()
.Or<TimeoutException>()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: exceptionsAllowedBeforeBreaking,
durationOfBreak: durationOfBreak,
onBreak: (ex, breakDelay) =>
{
_logger.LogError(".Breaker logging: Breaking the circuit for " + breakDelay.TotalMilliseconds + "ms!", ex);
},
onReset: () => _logger.LogDebug(".Breaker logging: Call ok! Closed the circuit again."),
onHalfOpen: () => _logger.LogDebug(".Breaker logging: Half-open; next call is a trial.")
);
_timeoutPolicy = Policy.TimeoutAsync(timeoutValue, timeoutStrategy);
_logger = logger;
}
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
Task<HttpResponseMessage> responseTask = null;
try
{
responseTask = Policy.WrapAsync(_circuitBreakerPolicy, _timeoutPolicy).ExecuteAsync<HttpResponseMessage>(() =>
{
return base.SendAsync(request,cancellationToken);
});
return responseTask;
}
catch (BrokenCircuitException ex)
{
_logger.LogError($"Reached to allowed number of exceptions. Circuit is open. AllowedExceptionCount: {_exceptionsAllowedBeforeBreaking}, DurationOfBreak: {_durationOfBreak}",ex);
throw;
}
catch (HttpRequestException)
{
return responseTask;
}
}
private static bool IsTransientFailure(HttpResponseMessage result)
{
return result.StatusCode >= HttpStatusCode.InternalServerError;
}
}
}

View File

@ -0,0 +1,41 @@
using Ocelot.Logging;
using Ocelot.Values;
using Polly.Timeout;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace Ocelot.Requester
{
internal class HttpClientBuilder
{
private readonly Dictionary<int, Func<DelegatingHandler>> handlers = new Dictionary<int, Func<DelegatingHandler>>();
public HttpClientBuilder WithCircuitBreaker(QoS qos, IOcelotLogger logger, HttpMessageHandler innerHandler)
{
handlers.Add(5000, () => new CircuitBreakingDelegatingHandler(qos.ExceptionsAllowedBeforeBreaking, qos.DurationOfBreak, qos.TimeoutValue, qos.TimeoutStrategy, logger, innerHandler));
return this;
}
internal HttpClient Build()
{
return handlers.Any() ? new HttpClient(CreateHttpMessageHandler()) : new HttpClient();
}
private HttpMessageHandler CreateHttpMessageHandler()
{
HttpMessageHandler httpMessageHandler = new HttpClientHandler();
handlers.OrderByDescending(handler => handler.Key).Select(handler => handler.Value).Reverse().ToList().ForEach(handler =>
{
var delegatingHandler = handler();
delegatingHandler.InnerHandler = httpMessageHandler;
httpMessageHandler = delegatingHandler;
});
return httpMessageHandler;
}
}
}

View File

@ -4,9 +4,6 @@ using System.Net.Http;
using System.Threading.Tasks;
using Ocelot.Errors;
using Ocelot.Responses;
using Polly;
using Polly.Timeout;
using Polly.CircuitBreaker;
using Ocelot.Logging;
namespace Ocelot.Requester
@ -14,7 +11,7 @@ namespace Ocelot.Requester
public class HttpClientHttpRequester : IHttpRequester
{
private readonly IOcelotLogger _logger;
public HttpClientHttpRequester(IOcelotLoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<HttpClientHttpRequester>();
@ -22,46 +19,26 @@ namespace Ocelot.Requester
public async Task<Response<HttpResponseMessage>> GetResponse(Request.Request request)
{
double timeoutvalue = 5000;
TimeoutStrategy timeoutStrategy = TimeoutStrategy.Pessimistic;
var timeoutPolicy = Policy
.TimeoutAsync(TimeSpan.FromMilliseconds(timeoutvalue), timeoutStrategy);
var circuitBreakerPolicy = Policy
.Handle<Exception>()
.Or<TimeoutRejectedException>()
.Or<TimeoutException>()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 4,
durationOfBreak: TimeSpan.FromSeconds(8),
onBreak: (ex, breakDelay) =>
{
_logger.LogError(".Breaker logging: Breaking the circuit for " + breakDelay.TotalMilliseconds + "ms!", ex);
},
onReset: () => _logger.LogDebug(".Breaker logging: Call ok! Closed the circuit again."),
onHalfOpen: () => _logger.LogDebug(".Breaker logging: Half-open; next call is a trial.")
);
HttpClientBuilder builder = new HttpClientBuilder();
using (var handler = new HttpClientHandler { CookieContainer = request.CookieContainer })
using (var httpClient = new HttpClient(handler))
{
try
builder.WithCircuitBreaker(request.Qos, _logger, handler);
using (var httpClient = builder.Build())
{
// Retry the following call according to the policy - 3 times.
HttpResponseMessage response = await Policy.WrapAsync(circuitBreakerPolicy, timeoutPolicy).ExecuteAsync<HttpResponseMessage>(() =>
try
{
return httpClient.SendAsync(request.HttpRequestMessage);
});
return new OkResponse<HttpResponseMessage>(response);
}
catch (BrokenCircuitException exception)
{
return
new ErrorResponse<HttpResponseMessage>(new List<Error>
{
new UnableToCompleteRequestError(exception)
});
var response = await httpClient.SendAsync(request.HttpRequestMessage);
return new OkResponse<HttpResponseMessage>(response);
}
catch (Exception exception)
{
return
new ErrorResponse<HttpResponseMessage>(new List<Error>
{
new UnableToCompleteRequestError(exception)
});
}
}
}
}

View File

@ -7,5 +7,7 @@ namespace Ocelot.Requester
public interface IHttpRequester
{
Task<Response<HttpResponseMessage>> GetResponse(Request.Request request);
}
}

27
src/Ocelot/Values/QoS.cs Normal file
View File

@ -0,0 +1,27 @@
using Polly.Timeout;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ocelot.Values
{
public class QoS
{
public QoS(int exceptionsAllowedBeforeBreaking, int durationofBreak, int timeoutValue, TimeoutStrategy timeoutStrategy = TimeoutStrategy.Pessimistic)
{
ExceptionsAllowedBeforeBreaking = exceptionsAllowedBeforeBreaking;
DurationOfBreak = TimeSpan.FromMilliseconds(durationofBreak);
TimeoutValue = TimeSpan.FromMilliseconds(timeoutValue);
TimeoutStrategy = timeoutStrategy;
}
public int ExceptionsAllowedBeforeBreaking { get; private set; }
public TimeSpan DurationOfBreak { get; private set; }
public TimeSpan TimeoutValue { get; private set; }
public TimeoutStrategy TimeoutStrategy { get; private set; }
}
}