fix issue #661 for Advanced aggregations (#704)

* Add Advanced Aggregation Feature

* fix overwrite error

* distinct data for better performance

* remove constructor parameter

* fix tests issue

* fix tests

* fix tests issue

* Add UnitTest and AcceptanceTest

* fix responseKeys typo

* Update SimpleJsonResponseAggregator.cs

* change port
This commit is contained in:
Abolfazl
2019-02-04 15:51:50 +03:30
committed by Marcelo Castagna
parent 44dccf1fce
commit faaabbe7a7
11 changed files with 352 additions and 59 deletions

View File

@ -4,18 +4,21 @@
using System.Net.Http;
using Ocelot.Values;
using System.Linq;
using Ocelot.Configuration.File;
public class ReRouteBuilder
{
private UpstreamPathTemplate _upstreamTemplatePattern;
private List<HttpMethod> _upstreamHttpMethod;
private string _upstreamHost;
private List<DownstreamReRoute> _downstreamReRoutes;
private List<AggregateReRouteConfig> _downstreamReRoutesConfig;
private string _aggregator;
public ReRouteBuilder()
{
_downstreamReRoutes = new List<DownstreamReRoute>();
_downstreamReRoutesConfig = new List<AggregateReRouteConfig>();
}
public ReRouteBuilder WithDownstreamReRoute(DownstreamReRoute value)
@ -48,6 +51,12 @@
return this;
}
public ReRouteBuilder WithAggregateReRouteConfig(List<AggregateReRouteConfig> aggregateReRouteConfigs)
{
_downstreamReRoutesConfig = aggregateReRouteConfigs;
return this;
}
public ReRouteBuilder WithAggregator(string aggregator)
{
_aggregator = aggregator;
@ -57,7 +66,8 @@
public ReRoute Build()
{
return new ReRoute(
_downstreamReRoutes,
_downstreamReRoutes,
_downstreamReRoutesConfig,
_upstreamHttpMethod,
_upstreamTemplatePattern,
_upstreamHost,

View File

@ -24,14 +24,18 @@ namespace Ocelot.Configuration.Creator
private ReRoute SetUpAggregateReRoute(IEnumerable<ReRoute> reRoutes, FileAggregateReRoute aggregateReRoute, FileGlobalConfiguration globalConfiguration)
{
var applicableReRoutes = reRoutes
.SelectMany(x => x.DownstreamReRoute)
.Where(r => aggregateReRoute.ReRouteKeys.Contains(r.Key))
.ToList();
var applicableReRoutes = new List<DownstreamReRoute>();
var allReRoutes = reRoutes.SelectMany(x => x.DownstreamReRoute);
if (applicableReRoutes.Count != aggregateReRoute.ReRouteKeys.Count)
foreach (var reRouteKey in aggregateReRoute.ReRouteKeys)
{
return null;
var selec = allReRoutes.FirstOrDefault(q => q.Key == reRouteKey);
if (selec == null)
{
return null;
}
applicableReRoutes.Add(selec);
}
var upstreamTemplatePattern = _creator.Create(aggregateReRoute);
@ -40,6 +44,7 @@ namespace Ocelot.Configuration.Creator
.WithUpstreamHttpMethod(aggregateReRoute.UpstreamHttpMethod)
.WithUpstreamPathTemplate(upstreamTemplatePattern)
.WithDownstreamReRoutes(applicableReRoutes)
.WithAggregateReRouteConfig(aggregateReRoute.ReRouteKeysConfig)
.WithUpstreamHost(aggregateReRoute.UpstreamHost)
.WithAggregator(aggregateReRoute.Aggregator)
.Build();

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Ocelot.Configuration.File
{
public class AggregateReRouteConfig
{
public string ReRouteKey { get; set; }
public string Parameter { get; set; }
public string JsonPath { get; set; }
}
}

View File

@ -5,6 +5,7 @@ namespace Ocelot.Configuration.File
public class FileAggregateReRoute : IReRoute
{
public List<string> ReRouteKeys { get;set; }
public List<AggregateReRouteConfig> ReRouteKeysConfig { get;set; }
public string UpstreamPathTemplate { get;set; }
public string UpstreamHost { get; set; }
public bool ReRouteIsCaseSensitive { get; set; }
@ -17,5 +18,5 @@ namespace Ocelot.Configuration.File
}
public int Priority {get;set;} = 1;
}
}
}

View File

@ -2,11 +2,13 @@
{
using System.Collections.Generic;
using System.Net.Http;
using Ocelot.Configuration.File;
using Ocelot.Values;
public class ReRoute
{
public ReRoute(List<DownstreamReRoute> downstreamReRoute,
public ReRoute(List<DownstreamReRoute> downstreamReRoute,
List<AggregateReRouteConfig> downstreamReRouteConfig,
List<HttpMethod> upstreamHttpMethod,
UpstreamPathTemplate upstreamTemplatePattern,
string upstreamHost,
@ -14,6 +16,7 @@
{
UpstreamHost = upstreamHost;
DownstreamReRoute = downstreamReRoute;
DownstreamReRouteConfig = downstreamReRouteConfig;
UpstreamHttpMethod = upstreamHttpMethod;
UpstreamTemplatePattern = upstreamTemplatePattern;
Aggregator = aggregator;
@ -23,6 +26,7 @@
public List<HttpMethod> UpstreamHttpMethod { get; private set; }
public string UpstreamHost { get; private set; }
public List<DownstreamReRoute> DownstreamReRoute { get; private set; }
public List<AggregateReRouteConfig> DownstreamReRouteConfig { get; private set; }
public string Aggregator {get; private set;}
}
}

View File

@ -5,12 +5,12 @@ namespace Ocelot.Middleware.Multiplexer
public class InMemoryResponseAggregatorFactory : IResponseAggregatorFactory
{
private readonly UserDefinedResponseAggregator _userDefined;
private readonly SimpleJsonResponseAggregator _simple;
private readonly IResponseAggregator _simple;
public InMemoryResponseAggregatorFactory(IDefinedAggregatorProvider provider)
public InMemoryResponseAggregatorFactory(IDefinedAggregatorProvider provider, IResponseAggregator responseAggregator)
{
_userDefined = new UserDefinedResponseAggregator(provider);
_simple = new SimpleJsonResponseAggregator();
_simple = responseAggregator;
}
public IResponseAggregator Get(ReRoute reRoute)

View File

@ -1,7 +1,9 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using Ocelot.Configuration;
using Ocelot.DownstreamRouteFinder.UrlMatcher;
namespace Ocelot.Middleware.Multiplexer
{
@ -16,31 +18,105 @@ namespace Ocelot.Middleware.Multiplexer
public async Task Multiplex(DownstreamContext context, ReRoute reRoute, OcelotRequestDelegate next)
{
var tasks = new Task<DownstreamContext>[reRoute.DownstreamReRoute.Count];
for (var i = 0; i < reRoute.DownstreamReRoute.Count; i++)
var reRouteKeysConfigs = reRoute.DownstreamReRouteConfig;
if (reRouteKeysConfigs == null || !reRouteKeysConfigs.Any())
{
var downstreamContext = new DownstreamContext(context.HttpContext)
var tasks = new Task<DownstreamContext>[reRoute.DownstreamReRoute.Count];
for (var i = 0; i < reRoute.DownstreamReRoute.Count; i++)
{
var downstreamContext = new DownstreamContext(context.HttpContext)
{
TemplatePlaceholderNameAndValues = context.TemplatePlaceholderNameAndValues,
Configuration = context.Configuration,
DownstreamReRoute = reRoute.DownstreamReRoute[i],
};
tasks[i] = Fire(downstreamContext, next);
}
await Task.WhenAll(tasks);
var contexts = new List<DownstreamContext>();
foreach (var task in tasks)
{
var finished = await task;
contexts.Add(finished);
}
await Map(reRoute, context, contexts);
}
else
{
var downstreamContextMain = new DownstreamContext(context.HttpContext)
{
TemplatePlaceholderNameAndValues = context.TemplatePlaceholderNameAndValues,
Configuration = context.Configuration,
DownstreamReRoute = reRoute.DownstreamReRoute[i],
DownstreamReRoute = reRoute.DownstreamReRoute[0],
};
var mainResponse = await Fire(downstreamContextMain, next);
tasks[i] = Fire(downstreamContext, next);
if (reRoute.DownstreamReRoute.Count == 1)
{
MapNotAggregate(context, new List<DownstreamContext>() { mainResponse });
return;
}
var tasks = new List<Task<DownstreamContext>>();
if (mainResponse.DownstreamResponse == null)
{
return;
}
var content = await mainResponse.DownstreamResponse.Content.ReadAsStringAsync();
var jObject = Newtonsoft.Json.Linq.JToken.Parse(content);
for (var i = 1; i < reRoute.DownstreamReRoute.Count; i++)
{
var templatePlaceholderNameAndValues = context.TemplatePlaceholderNameAndValues;
var downstreamReRoute = reRoute.DownstreamReRoute[i];
var matchAdvancedAgg = reRouteKeysConfigs.FirstOrDefault(q => q.ReRouteKey == downstreamReRoute.Key);
if (matchAdvancedAgg != null)
{
var values = jObject.SelectTokens(matchAdvancedAgg.JsonPath).Select(s => s.ToString()).Distinct().ToList();
foreach (var value in values)
{
var downstreamContext = new DownstreamContext(context.HttpContext)
{
TemplatePlaceholderNameAndValues = new List<PlaceholderNameAndValue>(templatePlaceholderNameAndValues),
Configuration = context.Configuration,
DownstreamReRoute = downstreamReRoute,
};
downstreamContext.TemplatePlaceholderNameAndValues.Add(new PlaceholderNameAndValue("{" + matchAdvancedAgg.Parameter + "}", value.ToString()));
tasks.Add(Fire(downstreamContext, next));
}
}
else
{
var downstreamContext = new DownstreamContext(context.HttpContext)
{
TemplatePlaceholderNameAndValues = new List<PlaceholderNameAndValue>(templatePlaceholderNameAndValues),
Configuration = context.Configuration,
DownstreamReRoute = downstreamReRoute,
};
tasks.Add(Fire(downstreamContext, next));
}
}
await Task.WhenAll(tasks);
var contexts = new List<DownstreamContext>() { mainResponse };
foreach (var task in tasks)
{
var finished = await task;
contexts.Add(finished);
}
await Map(reRoute, context, contexts);
}
await Task.WhenAll(tasks);
var contexts = new List<DownstreamContext>();
foreach (var task in tasks)
{
var finished = await task;
contexts.Add(finished);
}
await Map(reRoute, context, contexts);
}
private async Task Map(ReRoute reRoute, DownstreamContext context, List<DownstreamContext> contexts)
@ -55,7 +131,7 @@ namespace Ocelot.Middleware.Multiplexer
MapNotAggregate(context, contexts);
}
}
private void MapNotAggregate(DownstreamContext originalContext, List<DownstreamContext> downstreamContexts)
{
//assume at least one..if this errors then it will be caught by global exception handler

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
@ -21,19 +22,54 @@ namespace Ocelot.Middleware.Multiplexer
contentBuilder.Append("{");
for (var i = 0; i < downstreamContexts.Count; i++)
var responseKeys = downstreamContexts.Select(s => s.DownstreamReRoute.Key).Distinct().ToList();
for (var k = 0; k < responseKeys.Count; k++)
{
if (downstreamContexts[i].IsError)
var contexts = downstreamContexts.Where(w => w.DownstreamReRoute.Key == responseKeys[k]).ToList();
if (contexts.Count == 1)
{
MapAggregateError(originalContext, downstreamContexts, i);
return;
if (contexts[0].IsError)
{
MapAggregateError(originalContext, contexts[0]);
return;
}
var content = await contexts[0].DownstreamResponse.Content.ReadAsStringAsync();
contentBuilder.Append($"\"{responseKeys[k]}\":{content}");
}
else
{
contentBuilder.Append($"\"{responseKeys[k]}\":");
contentBuilder.Append("[");
for (var i = 0; i < contexts.Count; i++)
{
if (contexts[i].IsError)
{
MapAggregateError(originalContext, contexts[i]);
return;
}
var content = await contexts[i].DownstreamResponse.Content.ReadAsStringAsync();
if (string.IsNullOrWhiteSpace(content))
{
continue;
}
contentBuilder.Append($"{content}");
if (i + 1 < contexts.Count)
{
contentBuilder.Append(",");
}
}
contentBuilder.Append("]");
}
var content = await downstreamContexts[i].DownstreamResponse.Content.ReadAsStringAsync();
contentBuilder.Append($"\"{downstreamContexts[i].DownstreamReRoute.Key}\":{content}");
if (i + 1 < downstreamContexts.Count)
if (k + 1 < responseKeys.Count)
{
contentBuilder.Append(",");
}
@ -43,16 +79,16 @@ namespace Ocelot.Middleware.Multiplexer
var stringContent = new StringContent(contentBuilder.ToString())
{
Headers = {ContentType = new MediaTypeHeaderValue("application/json")}
Headers = { ContentType = new MediaTypeHeaderValue("application/json") }
};
originalContext.DownstreamResponse = new DownstreamResponse(stringContent, HttpStatusCode.OK, new List<KeyValuePair<string, IEnumerable<string>>>(), "cannot return from aggregate..which reason phrase would you use?");
}
private static void MapAggregateError(DownstreamContext originalContext, List<DownstreamContext> downstreamContexts, int i)
private static void MapAggregateError(DownstreamContext originalContext, DownstreamContext downstreamContext)
{
originalContext.Errors.AddRange(downstreamContexts[i].Errors);
originalContext.DownstreamResponse = downstreamContexts[i].DownstreamResponse;
originalContext.Errors.AddRange(downstreamContext.Errors);
originalContext.DownstreamResponse = downstreamContext.DownstreamResponse;
}
}
}