Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions src/ConductorSharp.Engine/ExecutionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using Autofac;
using ConductorSharp.Engine.Util;
using ConductorSharp.Engine.Polling;

namespace ConductorSharp.Engine
{
Expand All @@ -23,22 +24,17 @@ public class ExecutionManager
private readonly ITaskService _taskManager;
private readonly IEnumerable<TaskToWorker> _registeredWorkers;
private readonly ILifetimeScope _lifetimeScope;

// TODO: Implement polling strategy so that if there
// are no requests incoming we poll less, and when queues are full
// we poll more often

// TODO: Implement load balancing strategy so we can avoid
// task starvation. One way is to create some sort of task priority
// which will dynamically change based on how many of those tasks we served
// When a task is polled we reduce its priorty and when increse the priorty of all others
private readonly IPollTimingStrategy _pollTimingStrategy;
private readonly IPollOrderStrategy _pollOrderStrategy;

public ExecutionManager(
WorkerSetConfig options,
ILogger<ExecutionManager> logger,
ITaskService taskService,
IEnumerable<TaskToWorker> workerMappings,
ILifetimeScope lifetimeScope
ILifetimeScope lifetimeScope,
IPollTimingStrategy pollTimingStrategy,
IPollOrderStrategy pollOrderStrategy
)
{
_configuration = options;
Expand All @@ -47,22 +43,38 @@ ILifetimeScope lifetimeScope
_taskManager = taskService;
_registeredWorkers = workerMappings;
_lifetimeScope = lifetimeScope;
_pollTimingStrategy = pollTimingStrategy;
_pollOrderStrategy = pollOrderStrategy;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
var currentSleepInterval = _configuration.SleepInterval;

while (!cancellationToken.IsCancellationRequested)
{
var scheduleQueue = await _taskManager.GetAllQueues();
var scheduledWorkers = _registeredWorkers.Where(a => scheduleQueue.ContainsKey(a.TaskName) && scheduleQueue[a.TaskName] > 0).ToList();
var queuedTasks = (await _taskManager.GetAllQueues())
.Where(a => _registeredWorkers.Any(b => b.TaskName == a.Key) && a.Value > 0)
.ToDictionary(a => a.Key, a => a.Value);

var scheduledWorkers = _registeredWorkers.Where(a => queuedTasks.ContainsKey(a.TaskName)).ToList();

currentSleepInterval = _pollTimingStrategy.CalculateDelay(
queuedTasks,
scheduledWorkers,
_configuration.SleepInterval,
currentSleepInterval
);

scheduledWorkers = _pollOrderStrategy.CalculateOrder(queuedTasks, scheduledWorkers, _semaphore.CurrentCount);

foreach (var scheduledWorker in scheduledWorkers)
{
await _semaphore.WaitAsync(cancellationToken);
_ = PollAndHandle(scheduledWorker, cancellationToken).ContinueWith(_ => _semaphore.Release());
}

await Task.Delay(_configuration.SleepInterval, cancellationToken);
await Task.Delay(currentSleepInterval, cancellationToken);
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Autofac;
using ConductorSharp.Engine.Behaviors;
using ConductorSharp.Engine.Interface;
using ConductorSharp.Engine.Polling;
using ConductorSharp.Engine.Service;
using ConductorSharp.Engine.Util;
using MediatR;
Expand Down Expand Up @@ -39,6 +40,10 @@ public IExecutionManagerBuilder AddExecutionManager(int maxConcurrentWorkers, in

_builder.RegisterType<ConductorSharpExecutionContext>().InstancePerLifetimeScope();

_builder.RegisterType<InverseExponentialBackoff>().As<IPollTimingStrategy>();

_builder.RegisterType<RandomOrdering>().As<IPollOrderStrategy>();

return this;
}

Expand Down
5 changes: 5 additions & 0 deletions src/ConductorSharp.Engine/Extensions/WorkflowEngineBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Autofac;
using ConductorSharp.Engine.Behaviors;
using ConductorSharp.Engine.Interface;
using ConductorSharp.Engine.Polling;
using ConductorSharp.Engine.Service;
using ConductorSharp.Engine.Util;
using MediatR;
Expand Down Expand Up @@ -44,6 +45,10 @@ public IWorkflowEngineExecutionManager AddExecutionManager(

_builder.RegisterGeneric(typeof(ValidationBehavior<,>)).As(typeof(IPipelineBehavior<,>));

_builder.RegisterType<InverseExponentialBackoff>().As<IPollTimingStrategy>();

_builder.RegisterType<RandomOrdering>().As<IPollOrderStrategy>();

return this;
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/ConductorSharp.Engine/Polling/IPollOrderStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using ConductorSharp.Engine.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace ConductorSharp.Engine.Polling
{
public interface IPollOrderStrategy
{
List<TaskToWorker> CalculateOrder(IDictionary<string, int> taskQueue, List<TaskToWorker> taskToWorkerList, int limit);
}
}
12 changes: 12 additions & 0 deletions src/ConductorSharp.Engine/Polling/IPollTimingStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using ConductorSharp.Engine.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace ConductorSharp.Engine.Polling
{
public interface IPollTimingStrategy
{
int CalculateDelay(IDictionary<string, int> taskQueue, List<TaskToWorker> taskToWorkerList, int baseSleepInterval, int currentSleepInterval);
}
}
36 changes: 36 additions & 0 deletions src/ConductorSharp.Engine/Polling/InverseExponentialBackoff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using ConductorSharp.Engine.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace ConductorSharp.Engine.Polling
{
public class InverseExponentialBackoff : IPollTimingStrategy
{
private const int _backoffRatio = 2;
private const int _recoveryValue = 50;
private readonly TimeSpan _recoveryInterval = TimeSpan.FromMilliseconds(5000);

private DateTimeOffset _lastRecoveryTime = DateTimeOffset.UtcNow;

public int CalculateDelay(
IDictionary<string, int> taskQueue,
List<TaskToWorker> taskToWorkerList,
int baseSleepInterval,
int currentSleepInterval
)
{
if (taskToWorkerList.Count > 0)
{
currentSleepInterval /= _backoffRatio;
}
else if (DateTimeOffset.UtcNow - _lastRecoveryTime > _recoveryInterval)
{
currentSleepInterval += 100;
_lastRecoveryTime = DateTimeOffset.UtcNow;
}

return Math.Clamp(currentSleepInterval, _recoveryValue, baseSleepInterval);
}
}
}
18 changes: 18 additions & 0 deletions src/ConductorSharp.Engine/Polling/RandomOrdering.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using ConductorSharp.Engine.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConductorSharp.Engine.Polling
{
public class RandomOrdering : IPollOrderStrategy
{
private readonly Random _random = new Random();

public List<TaskToWorker> CalculateOrder(IDictionary<string, int> taskQueue, List<TaskToWorker> taskToWorkerList, int limit)
{
return taskToWorkerList.OrderBy(a => _random.Next()).Take(limit).ToList();
}
}
}