Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="7.2.0" />
<PackageReference Include="MediatR.Extensions.Autofac.DependencyInjection" Version="8.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.15.1" />
<PackageReference Include="Serilog" Version="2.11.0" />
<PackageReference Include="Serilog.AspNetCore" Version="6.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.3.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static IHostBuilder ConfigureApiEnabled(this IHostBuilder hostBuilder, Co
return hostBuilder.ConfigureContainer<ContainerBuilder>(builder =>
{
builder
.AddWorkflowEngine(
.AddConductorSharp(
baseUrl: configuration.GetValue<string>("Conductor:BaseUrl"),
apiPath: configuration.GetValue<string>("Conductor:ApiUrl"),
preventErrorOnBadRequest: configuration.GetValue<bool>("Conductor:PreventErrorOnBadRequest")
Expand All @@ -21,7 +21,13 @@ public static IHostBuilder ConfigureApiEnabled(this IHostBuilder hostBuilder, Co
sleepInterval: configuration.GetValue<int>("Conductor:SleepInterval"),
longPollInterval: configuration.GetValue<int>("Conductor:LongPollInterval"),
domain: configuration.GetValue<string>("Conductor:WorkerDomain")
);
)
.AddPipelines(pipelines =>
{
pipelines.AddRequestResponseLogging();
pipelines.AddValidation();
});

builder.RegisterMediatR(typeof(Program).Assembly);
builder.RegisterModule<ConductorModule>();
});
Expand Down
10 changes: 10 additions & 0 deletions examples/ConductorSharp.ApiEnabled/Handlers/PrepareEmailHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ public class PrepareEmailResponse
[OriginalName("EMAIL_prepare")]
public class PrepareEmailHandler : ITaskRequestHandler<PrepareEmailRequest, PrepareEmailResponse>
{
private readonly ConductorSharpExecutionContext _context;

public PrepareEmailHandler(ConductorSharpExecutionContext context)
{
_context = context;
}

public Task<PrepareEmailResponse> Handle(PrepareEmailRequest request, CancellationToken cancellationToken)
{
var emailBodyBuilder = new StringBuilder();
Expand All @@ -28,6 +35,9 @@ public Task<PrepareEmailResponse> Handle(PrepareEmailRequest request, Cancellati
emailBodyBuilder.AppendLine($"ProvisionDateTime: {DateTimeOffset.Now.ToString("r")}");
emailBodyBuilder.AppendLine($"Customer: {request.CustomerName}");
emailBodyBuilder.AppendLine($"Address: {request.Address}");
emailBodyBuilder.AppendLine("------------------");
emailBodyBuilder.AppendLine($"WorkflowId : {_context.WorkflowId}");
emailBodyBuilder.AppendLine($"WorkflowName: {_context.WorkflowName}");

return Task.FromResult(new PrepareEmailResponse { EmailBody = emailBodyBuilder.ToString() });
}
Expand Down
3 changes: 3 additions & 0 deletions examples/ConductorSharp.ApiEnabled/Program.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
using Autofac.Extensions.DependencyInjection;
using ConductorSharp.ApiEnabled.Extensions;
using Serilog;

var builder = WebApplication.CreateBuilder(args);
var configuration = builder.Configuration;

builder.Host.UseSerilog((ctx, lc) => lc.WriteTo.Console());

// Add services to the container.
builder.Services.AddControllers();

Expand Down
9 changes: 7 additions & 2 deletions examples/ConductorSharp.Definitions/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
(container, builder) =>
{
builder
.AddWorkflowEngine(
.AddConductorSharp(
baseUrl: configuration.GetValue<string>("Conductor:BaseUrl"),
apiPath: configuration.GetValue<string>("Conductor:ApiUrl"),
preventErrorOnBadRequest: configuration.GetValue<bool>("Conductor:PreventErrorOnBadRequest")
Expand All @@ -38,7 +38,12 @@
sleepInterval: configuration.GetValue<int>("Conductor:SleepInterval"),
longPollInterval: configuration.GetValue<int>("Conductor:LongPollInterval"),
domain: configuration.GetValue<string>("Conductor:WorkerDomain")
);
)
.AddPipelines(pipelines =>
{
pipelines.AddRequestResponseLogging();
pipelines.AddValidation();
});

builder.RegisterMediatR(typeof(Program).Assembly);
builder.RegisterModule<ConductorModule>();
Expand Down
9 changes: 7 additions & 2 deletions examples/ConductorSharp.NoApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
(container, builder) =>
{
builder
.AddWorkflowEngine(
.AddConductorSharp(
baseUrl: configuration.GetValue<string>("Conductor:BaseUrl"),
apiPath: configuration.GetValue<string>("Conductor:ApiUrl"),
preventErrorOnBadRequest: configuration.GetValue<bool>("Conductor:PreventErrorOnBadRequest")
Expand All @@ -38,7 +38,12 @@
sleepInterval: configuration.GetValue<int>("Conductor:SleepInterval"),
longPollInterval: configuration.GetValue<int>("Conductor:LongPollInterval"),
domain: configuration.GetValue<string>("Conductor:WorkerDomain")
);
)
.AddPipelines(pipelines =>
{
pipelines.AddRequestResponseLogging();
pipelines.AddValidation();
});

builder.RegisterMediatR(typeof(Program).Assembly);
builder.RegisterModule<ConductorModule>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using ConductorSharp.Engine.Util;
using MediatR;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConductorSharp.Engine.Behaviors
{
public class RequestResponseLoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TRequest : IRequest<TResponse>
{
private readonly ILogger<RequestResponseLoggingBehavior<TRequest, TResponse>> _logger;
private readonly ConductorSharpExecutionContext _executionContext;

public RequestResponseLoggingBehavior(
ILogger<RequestResponseLoggingBehavior<TRequest, TResponse>> logger,
ConductorSharpExecutionContext executionContext
)
{
_logger = logger;
_executionContext = executionContext;
}

public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
var stopwatch = new Stopwatch();
_logger.LogInformation($"Submitting request {{@{typeof(TRequest).Name}}} in {{@conductorSharpContext}}", request, _executionContext);
stopwatch.Start();

try
{
var response = await next();
stopwatch.Stop();

_logger.LogInformation(
$"Received response {{@{typeof(TResponse).Name}}} in {{@conductorSharpContext}} in {{ellapsedMilliseconds}}",
response,
_executionContext,
stopwatch.ElapsedMilliseconds
);

return response;
}
catch (Exception exc)
{
stopwatch.Stop();
_logger.LogInformation(
$"Exception {{exceptionMessage}} in {{@conductorSharpContext}} in {{elapsedMilliseconds}}",
exc.Message,
_executionContext,
stopwatch.ElapsedMilliseconds
);
throw;
}
}
}
}
27 changes: 23 additions & 4 deletions src/ConductorSharp.Engine/ExecutionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using ConductorSharp.Engine.Util;

namespace ConductorSharp.Engine
{
Expand All @@ -20,7 +22,7 @@ public class ExecutionManager
private readonly ILogger<ExecutionManager> _logger;
private readonly ITaskService _taskManager;
private readonly IEnumerable<TaskToWorker> _registeredWorkers;
private readonly IMediator _mediator;
private readonly ILifetimeScope _lifetimeScope;

// TODO: Implement polling strategy so that if there
// are no requests incoming we poll less, and when queues are full
Expand All @@ -36,15 +38,15 @@ public ExecutionManager(
ILogger<ExecutionManager> logger,
ITaskService taskService,
IEnumerable<TaskToWorker> workerMappings,
IMediator mediator
ILifetimeScope lifetimeScope
)
{
_configuration = options;
_semaphore = new SemaphoreSlim(_configuration.MaxConcurrentWorkers);
_logger = logger;
_taskManager = taskService;
_registeredWorkers = workerMappings;
_mediator = mediator;
_lifetimeScope = lifetimeScope;
}

public async Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -99,7 +101,24 @@ private async Task PollAndHandle(TaskToWorker scheduledWorker, CancellationToken
{
var inputType = GetInputType(scheduledWorker.TaskType);
var inputData = pollResponse.InputData.ToObject(inputType, ConductorConstants.IoJsonSerializer);
var response = await _mediator.Send(inputData, cancellationToken);

using var scope = _lifetimeScope.BeginLifetimeScope();

var context = scope.ResolveOptional<ConductorSharpExecutionContext>();
var mediator = scope.Resolve<IMediator>();

if (context != null)
{
context.WorkflowName = pollResponse.WorkflowType;
context.TaskName = pollResponse.TaskDefName;
context.TaskReferenceName = pollResponse.ReferenceTaskName;
context.WorkflowId = pollResponse.WorkflowInstanceId;
context.CorrelationId = pollResponse.CorrelationId;
context.TaskId = pollResponse.TaskId;
context.WorkerId = workerId;
}

var response = await mediator.Send(inputData, cancellationToken);
await _taskManager.UpdateTaskCompleted(response, pollResponse.TaskId, pollResponse.WorkflowInstanceId);
}
catch (Exception exception)
Expand Down
56 changes: 56 additions & 0 deletions src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Autofac;
using ConductorSharp.Engine.Behaviors;
using ConductorSharp.Engine.Interface;
using ConductorSharp.Engine.Service;
using ConductorSharp.Engine.Util;
using MediatR;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;

namespace ConductorSharp.Engine.Extensions
{
public class ConductorSharpBuilder : IConductorSharpBuilder, IExecutionManagerBuilder, IPipelineBuilder
{
private readonly ContainerBuilder _builder;

public ConductorSharpBuilder(ContainerBuilder builder) => _builder = builder;

public IExecutionManagerBuilder AddExecutionManager(int maxConcurrentWorkers, int sleepInterval, int longPollInterval, string domain = null)
{
var workerConfig = new WorkerSetConfig
{
MaxConcurrentWorkers = maxConcurrentWorkers,
LongPollInterval = longPollInterval,
Domain = domain,
SleepInterval = sleepInterval
};

_builder.RegisterInstance(workerConfig).SingleInstance();

_builder.RegisterType<WorkflowEngineBackgroundService>().As<IHostedService>();

_builder.RegisterType<DeploymentService>().As<IDeploymentService>();

_builder.RegisterType<ModuleDeployment>();

_builder.RegisterType<ExecutionManager>().SingleInstance();

_builder.RegisterType<ConductorSharpExecutionContext>().InstancePerLifetimeScope();

return this;
}

public IExecutionManagerBuilder AddPipelines(Action<IPipelineBuilder> behaviorBuilder)
{
behaviorBuilder(this);
return this;
}

public void AddRequestResponseLogging() =>
_builder.RegisterGeneric(typeof(RequestResponseLoggingBehavior<,>)).As(typeof(IPipelineBehavior<,>));

public void AddValidation() => _builder.RegisterGeneric(typeof(ValidationBehavior<,>)).As(typeof(IPipelineBehavior<,>));
}
}
43 changes: 43 additions & 0 deletions src/ConductorSharp.Engine/Extensions/ContainerBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ namespace ConductorSharp.Engine.Extensions
{
public static class ContainerBuilderExtensions
{
/// <summary>
/// This method is deprecated. Use AddConductorSharp method instead. WARNING: AddExecutionManager method will change when you move to AddConductorSharp,
/// it will no longer register validation pipeline by default, and you will have to do it manually using the AddPipelines method. Check
/// <see href="https://github.com/codaxy/conductor-sharp/blob/master/examples/ConductorSharp.ApiEnabled/Extensions/HostConfiguration.cs">example</see>.
/// </summary>
/// <param name="builder"></param>
/// <param name="baseUrl"></param>
/// <param name="apiPath"></param>
/// <param name="preventErrorOnBadRequest"></param>
/// <param name="createClient"></param>
/// <returns></returns>
[Obsolete("Use AddConductorSharp method instead")]
public static IWorkflowEngineBuilder AddWorkflowEngine(
this ContainerBuilder builder,
string baseUrl,
Expand Down Expand Up @@ -45,6 +57,37 @@ public static IWorkflowEngineBuilder AddWorkflowEngine(
return new WorkflowEngineBuilder(builder);
}

public static IConductorSharpBuilder AddConductorSharp(
this ContainerBuilder builder,
string baseUrl,
string apiPath,
bool preventErrorOnBadRequest = false,
Func<RestClient> createClient = null
)
{
builder.RegisterInstance(
new RestConfig
{
ApiPath = apiPath,
BaseUrl = baseUrl,
CreateClient = createClient,
IgnoreValidationErrors = preventErrorOnBadRequest
}
);

builder.RegisterType<ConductorClient>().As<IConductorClient>().SingleInstance();

builder.RegisterType<TaskService>().As<ITaskService>();

builder.RegisterType<HealthService>().As<IHealthService>();

builder.RegisterType<MetadataService>().As<IMetadataService>();

builder.RegisterType<WorkflowService>().As<IWorkflowService>();

return new ConductorSharpBuilder(builder);
}

public static void RegisterWorkerTask<TWorkerTask>(this ContainerBuilder builder, Action<TaskDefinitionOptions> updateOptions = null)
where TWorkerTask : IWorker
{
Expand Down
11 changes: 11 additions & 0 deletions src/ConductorSharp.Engine/Extensions/IConductorSharpBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace ConductorSharp.Engine.Extensions
{
public interface IConductorSharpBuilder
{
IExecutionManagerBuilder AddExecutionManager(int maxConcurrentWorkers, int sleepInterval, int longPollInterval, string domain = null);
}
}
11 changes: 11 additions & 0 deletions src/ConductorSharp.Engine/Extensions/IExecutionManagerBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace ConductorSharp.Engine.Extensions
{
public interface IExecutionManagerBuilder
{
IExecutionManagerBuilder AddPipelines(Action<IPipelineBuilder> pipelines);
}
}
12 changes: 12 additions & 0 deletions src/ConductorSharp.Engine/Extensions/IPipelineBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace ConductorSharp.Engine.Extensions
{
public interface IPipelineBuilder
{
void AddRequestResponseLogging();
void AddValidation();
}
}
Loading