diff --git a/src/ConductorSharp.Engine/ExecutionManager.cs b/src/ConductorSharp.Engine/ExecutionManager.cs index 82fd77dc..7d93fdde 100644 --- a/src/ConductorSharp.Engine/ExecutionManager.cs +++ b/src/ConductorSharp.Engine/ExecutionManager.cs @@ -12,6 +12,7 @@ using System.Threading.Tasks; using Autofac; using ConductorSharp.Engine.Util; +using ConductorSharp.Engine.Polling; namespace ConductorSharp.Engine { @@ -23,22 +24,17 @@ public class ExecutionManager private readonly ITaskService _taskManager; private readonly IEnumerable _registeredWorkers; private readonly ILifetimeScope _lifetimeScope; - - // TODO: Implement polling strategy so that if there - // are no requests incoming we poll less, and when queues are full - // we poll more often - - // TODO: Implement load balancing strategy so we can avoid - // task starvation. One way is to create some sort of task priority - // which will dynamically change based on how many of those tasks we served - // When a task is polled we reduce its priorty and when increse the priorty of all others + private readonly IPollTimingStrategy _pollTimingStrategy; + private readonly IPollOrderStrategy _pollOrderStrategy; public ExecutionManager( WorkerSetConfig options, ILogger logger, ITaskService taskService, IEnumerable workerMappings, - ILifetimeScope lifetimeScope + ILifetimeScope lifetimeScope, + IPollTimingStrategy pollTimingStrategy, + IPollOrderStrategy pollOrderStrategy ) { _configuration = options; @@ -47,14 +43,30 @@ ILifetimeScope lifetimeScope _taskManager = taskService; _registeredWorkers = workerMappings; _lifetimeScope = lifetimeScope; + _pollTimingStrategy = pollTimingStrategy; + _pollOrderStrategy = pollOrderStrategy; } public async Task StartAsync(CancellationToken cancellationToken) { + var currentSleepInterval = _configuration.SleepInterval; + while (!cancellationToken.IsCancellationRequested) { - var scheduleQueue = await _taskManager.GetAllQueues(); - var scheduledWorkers = _registeredWorkers.Where(a => scheduleQueue.ContainsKey(a.TaskName) && scheduleQueue[a.TaskName] > 0).ToList(); + var queuedTasks = (await _taskManager.GetAllQueues()) + .Where(a => _registeredWorkers.Any(b => b.TaskName == a.Key) && a.Value > 0) + .ToDictionary(a => a.Key, a => a.Value); + + var scheduledWorkers = _registeredWorkers.Where(a => queuedTasks.ContainsKey(a.TaskName)).ToList(); + + currentSleepInterval = _pollTimingStrategy.CalculateDelay( + queuedTasks, + scheduledWorkers, + _configuration.SleepInterval, + currentSleepInterval + ); + + scheduledWorkers = _pollOrderStrategy.CalculateOrder(queuedTasks, scheduledWorkers, _semaphore.CurrentCount); foreach (var scheduledWorker in scheduledWorkers) { @@ -62,7 +74,7 @@ public async Task StartAsync(CancellationToken cancellationToken) _ = PollAndHandle(scheduledWorker, cancellationToken).ContinueWith(_ => _semaphore.Release()); } - await Task.Delay(_configuration.SleepInterval, cancellationToken); + await Task.Delay(currentSleepInterval, cancellationToken); } } diff --git a/src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs b/src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs index 4d55e2b6..ec4dd4b9 100644 --- a/src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs +++ b/src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs @@ -1,6 +1,7 @@ using Autofac; using ConductorSharp.Engine.Behaviors; using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Polling; using ConductorSharp.Engine.Service; using ConductorSharp.Engine.Util; using MediatR; @@ -39,6 +40,10 @@ public IExecutionManagerBuilder AddExecutionManager(int maxConcurrentWorkers, in _builder.RegisterType().InstancePerLifetimeScope(); + _builder.RegisterType().As(); + + _builder.RegisterType().As(); + return this; } diff --git a/src/ConductorSharp.Engine/Extensions/WorkflowEngineBuilder.cs b/src/ConductorSharp.Engine/Extensions/WorkflowEngineBuilder.cs index 3ddcc654..3bbb1e99 100644 --- a/src/ConductorSharp.Engine/Extensions/WorkflowEngineBuilder.cs +++ b/src/ConductorSharp.Engine/Extensions/WorkflowEngineBuilder.cs @@ -1,6 +1,7 @@ using Autofac; using ConductorSharp.Engine.Behaviors; using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Polling; using ConductorSharp.Engine.Service; using ConductorSharp.Engine.Util; using MediatR; @@ -44,6 +45,10 @@ public IWorkflowEngineExecutionManager AddExecutionManager( _builder.RegisterGeneric(typeof(ValidationBehavior<,>)).As(typeof(IPipelineBehavior<,>)); + _builder.RegisterType().As(); + + _builder.RegisterType().As(); + return this; } } diff --git a/src/ConductorSharp.Engine/Polling/IPollOrderStrategy.cs b/src/ConductorSharp.Engine/Polling/IPollOrderStrategy.cs new file mode 100644 index 00000000..29c90a54 --- /dev/null +++ b/src/ConductorSharp.Engine/Polling/IPollOrderStrategy.cs @@ -0,0 +1,12 @@ +using ConductorSharp.Engine.Model; +using System; +using System.Collections.Generic; +using System.Text; + +namespace ConductorSharp.Engine.Polling +{ + public interface IPollOrderStrategy + { + List CalculateOrder(IDictionary taskQueue, List taskToWorkerList, int limit); + } +} diff --git a/src/ConductorSharp.Engine/Polling/IPollTimingStrategy.cs b/src/ConductorSharp.Engine/Polling/IPollTimingStrategy.cs new file mode 100644 index 00000000..995783f1 --- /dev/null +++ b/src/ConductorSharp.Engine/Polling/IPollTimingStrategy.cs @@ -0,0 +1,12 @@ +using ConductorSharp.Engine.Model; +using System; +using System.Collections.Generic; +using System.Text; + +namespace ConductorSharp.Engine.Polling +{ + public interface IPollTimingStrategy + { + int CalculateDelay(IDictionary taskQueue, List taskToWorkerList, int baseSleepInterval, int currentSleepInterval); + } +} diff --git a/src/ConductorSharp.Engine/Polling/InverseExponentialBackoff.cs b/src/ConductorSharp.Engine/Polling/InverseExponentialBackoff.cs new file mode 100644 index 00000000..b572129d --- /dev/null +++ b/src/ConductorSharp.Engine/Polling/InverseExponentialBackoff.cs @@ -0,0 +1,36 @@ +using ConductorSharp.Engine.Model; +using System; +using System.Collections.Generic; +using System.Text; + +namespace ConductorSharp.Engine.Polling +{ + public class InverseExponentialBackoff : IPollTimingStrategy + { + private const int _backoffRatio = 2; + private const int _recoveryValue = 50; + private readonly TimeSpan _recoveryInterval = TimeSpan.FromMilliseconds(5000); + + private DateTimeOffset _lastRecoveryTime = DateTimeOffset.UtcNow; + + public int CalculateDelay( + IDictionary taskQueue, + List taskToWorkerList, + int baseSleepInterval, + int currentSleepInterval + ) + { + if (taskToWorkerList.Count > 0) + { + currentSleepInterval /= _backoffRatio; + } + else if (DateTimeOffset.UtcNow - _lastRecoveryTime > _recoveryInterval) + { + currentSleepInterval += 100; + _lastRecoveryTime = DateTimeOffset.UtcNow; + } + + return Math.Clamp(currentSleepInterval, _recoveryValue, baseSleepInterval); + } + } +} diff --git a/src/ConductorSharp.Engine/Polling/RandomOrdering.cs b/src/ConductorSharp.Engine/Polling/RandomOrdering.cs new file mode 100644 index 00000000..c311762f --- /dev/null +++ b/src/ConductorSharp.Engine/Polling/RandomOrdering.cs @@ -0,0 +1,18 @@ +using ConductorSharp.Engine.Model; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace ConductorSharp.Engine.Polling +{ + public class RandomOrdering : IPollOrderStrategy + { + private readonly Random _random = new Random(); + + public List CalculateOrder(IDictionary taskQueue, List taskToWorkerList, int limit) + { + return taskToWorkerList.OrderBy(a => _random.Next()).Take(limit).ToList(); + } + } +}