软件架构流水线设计模式
流水线模式是一种软件设计模式,它提供了构建和执行一系列操作的能力。
此模式最好与插件模式结合使用,以便在应用程序启动时动态构建流水线。
顺序流水线的最基本实现是一个简单的操作序列。
public interface IOperation { void Invoke(T data); }可以调用操作的接口来处理数据。
public class Pipeline : IOperation { private readonly List operations = new List(); // add operation at the end of the pipeline public void Register(IOperation operation) { operations.Add(operation); } // invoke every operations public void Invoke(T data) { foreach (var operation in operations) operation.Invoke(data); } }流水线一个一个地处理每个操作。流水线类还实现了 IOperation 接口,因此它们可以组合在一起。
public class ReverseOperation : IOperation{ public void Invoke(string data) => Console.WriteLine($" The string is reversed : {data.Reverse()}");}该操作可以写在专用类中。
public class Operation : IOperation{ private readonly Action action; public Operation(Action action) { this.action = action; } public void Invoke(T data) => action(data);}或者使用包装器从lambda自动创建操作。
// buildvar pipeline = new Pipeline();// lambdapipeline.Register(new Operation(str =>{ Console.WriteLine($"The string {str} contains {str.Length} characters.");}));// classpipeline.Register(new ReverseOperation());// executepipeline.Invoke("apple");应在调用流水线之前注册流水线操作。
断路器您要添加到流水线中的第一个功能是添加断路器。
public interface IOperation{ bool Invoke(T data);}每个操作都会返回结果:失败或成功。
class Pipeline : IOperation{ // invoke every operations public bool Invoke(T data) { foreach (var operation in operations) { if (!operation.Invoke(data)) { Console.WriteLine("Aborting pipeline.."); return false; } } return true; }}如果操作失败,流水线执行应该停止。
异步另一个要求可能是拥有一个可以处理异步操作的流水线。
public interface IOperation{ void SetNext(IOperation next); void Invoke(T data);}在完成数据处理后,每个操作现在都必须调用流水线中的下一个操作。
class Pipeline : IOperation{ // not the best private readonly List operations = new List(); private readonly IOperation terminate; public Pipeline() { terminate = new Operation(data => {}); // not the best } void IOperation.SetNext(IOperation next) { terminate.SetNext(next); } // append an operation at the end of the pipeline public void RegisterOperation(IOperation operation) { // when the operation is finished, it will call terminate operation.SetNext(terminate); if (operations.Any()) { // link the last registered operation with the newly registered one operations.Last().SetNext(operation); } operations.Add(operation); } public void Invoke(T data) { var operation = operations.Any() ? operations.First() : terminate; operation.Invoke(data); }}流水线稍微复杂一点,因为它需要在注册新操作时设置下一个操作。另一种解决方案是使用构建器。
public class WriteOperation : IOperation{ private IOperation next; public void SetNext(IOperation next) { this.next = next; } public void Invoke(string data) { Task.Run(() => { Console.WriteLine("Writing data to the disk..."); Thread.Sleep(100); // just kidding ! Console.WriteLine("Data successfully written to the disk !"); next?.Invoke(data); }); }}此操作是异步的,将在专用线程中运行,当时间到时,它将调用下一个操作以继续流水线。
class Operation : IOperation{ private readonly Func action; private IOperation next; public Operation(Func action) { this.action = action; } public Operation(Action action) { this.action = data => { action(data); return true; }; } public void SetNext(IOperation next) { this.next = next; } public void Invoke(T data) { if (action(data)) next.Invoke(data); }}通用操作既可以与简单操作一起使用,也可以在使用函数时使用内置断路器。
// the main pipelinevar pipeline = new Pipeline();pipeline.RegisterOperation(new Operation(data =>{ Console.WriteLine($"Everyone likes {data} !"); return true;}));pipeline.RegisterOperation(new WriteOperation());pipeline.RegisterOperation(new Operation(data =>{ if (data == "banana") { Console.WriteLine("This banana made the pipeline abort..."); return false; } return true;}));pipeline.RegisterOperation(new Operation(data => Console.WriteLine("This operation should not be called !")));// a verbose pipeline to wrap the main pipelinevar verbose = new Pipeline();verbose.RegisterOperation(new Operation(data => Console.WriteLine("Beginning of the pipeline...")));verbose.RegisterOperation(pipeline);verbose.RegisterOperation(new Operation(data => Console.WriteLine("End of the pipeline...")));verbose.Invoke("banana");Console.WriteLine("The pipeline is asynchronous, so we should have more messages after this one : ");这个简单的例子使用了我们实现的几个特性。
现在你知道如何让你的流水线异步了!
如果对之前的操作有另一个回调**,那就更好了,这样您就可以让结果逆流通过流水线。
插入使用流水线设计模式的主要原因通常是需要能够添加插件,这些插件可以将操作附加到现有流水线或在其中挂钩操作。
public class Pipeline: IOperation{ // exposes operations for hooking public readonly LinkedList Operations = new LinkedList(); // add operation at the end of the pipeline public void Register(IOperation operation) { Operations.AddLast(operation); } // invoke every operations public void Invoke() { foreach (var operation in Operations) operation.Invoke(); }}流水线确实很基础,但这一次,操作暴露了。
class Application{ internal abstract class Plugin { public abstract void Initialize(Application application); } private readonly List plugins = new List(); public readonly Pipeline Pipeline = new Pipeline(); public void RegisterPlugin(Plugin plugin) { plugins.Add(plugin); } public void Initialize() { Pipeline.Register(new Operation(() => Console.WriteLine("Step 1"))); Pipeline.Register(new Operation(() => Console.WriteLine("Step 2"))); Pipeline.Register(new Operation(() => Console.WriteLine("Step 3"))); foreach (var plugin in plugins) plugin.Initialize(this); } public void Run() { Pipeline.Invoke(); }}让我们来看一个带有流水线的简单应用程序,它只会在控制台中显示 3 个步骤。此应用程序还支持插件来修改流水线。
class HookPlugin : Application.Plugin{ public override void Initialize(Application application) { var operations = application.Pipeline.Operations; operations.AddAfter(operations.First, new Operation(() => Console.WriteLine("I really want to be second !"))); }}第一个插件将在流水线的第二个插槽中挂接另一个操作。
class LatePlugin : Application.Plugin{ public override void Initialize(Application application) { application.Pipeline.Register(new Operation(() => Console.WriteLine("Sorry guys, I am late..."))); }}第二个插件将在流水线末尾附加一个新操作。
var application = new Application();application.RegisterPlugin(new HookPlugin());application.RegisterPlugin(new LatePlugin());application.Initialize();application.Run();应用程序和插件放在一起,我们可以调用流水线。
批另一个有用的功能是能够在与单个项目相同的流水线中处理批处理数据。
class BatchPipeline : IOperation{ private readonly Pipeline pipeline; public BatchPipeline(Pipeline pipeline) { this.pipeline = pipeline; } // invoke each operation on each item public bool Invoke(T[] data) { // wrap items var items = data.Select(item => new Result(item)).ToArray(); foreach (var operation in pipeline.Operations) { // detects when every operation failed var failed = true; foreach (var item in items) { if(!item.Success) continue; if (!operation.Invoke(item.Data)) item.Fail(); else failed = false; } // circuit breaker if (failed) return false; Console.WriteLine("----------------------"); } return true; }}批处理流水线包装流水线并调用每个项目的每个操作。
class Result{ public readonly T Data; public bool Success { get; private set; } = true; public void Fail() => Success = false; public Result(T data) { Data = data; }}每个项目都被包裹起来,所以我们可以记住断路器的结果。
public class CheckMagnitudeOperation : IOperation{ private readonly int threshold; public CheckMagnitudeOperation(int magnitude) { threshold = (int) Math.Pow(10, magnitude); } public bool Invoke(int data) { if (data < threshold) { Console.WriteLine($"{data} < {threshold} -> ko"); return false; } Console.WriteLine($"{data} >= {threshold} -> ok"); return true; }}此操作检查整数是否具有所需的数量级。
// base pipelinevar pipeline = new Pipeline();pipeline.Register(new CheckMagnitudeOperation(1));pipeline.Register(new CheckMagnitudeOperation(2));pipeline.Register(new CheckMagnitudeOperation(3));pipeline.Register(new CheckMagnitudeOperation(4));// batch pipelinevar batch = new BatchPipeline(pipeline);batch.Invoke(new []{ 12, 345, 6789 });流水线将检查一批整数的数量级。 流水线只为没有失败的项目调用下一个操作。
高性能流水线流水线设计模式也可以指更具体和以性能为导向的软件架构。
一些项目使用流水线通过在专用线程中运行流水线的每个操作来优化大量数据的处理。
abstract class Processor : IOperation{ private readonly BlockingCollection queue = new BlockingCollection(); public IOperation Next { private get; set; } public IOperation Terminate { private get; set; } void IOperation.Invoke(T data) => queue.Add(data); protected Processor() { Task.Run(Run); } private void Run() { Console.WriteLine($"Thread {GetType().Name} Started !"); while (true) { var data = queue.Take(); var operation = Process(data) ? Next : Terminate; operation.Invoke(data); Sleep(); // hack to have random output ;) } } protected abstract bool Pr每个线程都将从充当缓冲区的并发队列中消费和生成数据。
public interface IOperation{ IOperation Next { set; } IOperation Terminate { set; } void Invoke(T data);}这一次,我们将使用带有断路器的异步操作。
class Operation : IOperation{ private readonly Func action; public Operation(Func action) { this.action = action; } public IOperation Next { private get; set; } public IOperation Terminate { private get; set; } public void Invoke(T data) { var operation = action(data) ? Next : Terminate; operation?.Invoke(data); }}如果操作成功,它应该调用next,如果失败则终止。
class Pipeline : IOperation{ private readonly List operations = new List(); public IOperation Next { private get; set; } public IOperation Terminate { private get; set; } private readonly IOperation success; private readonly IOperation fail; public Pipeline() { success = new Operation(Success); fail = new Operation(Fail); } // append an operation at the end of the pipeline public void RegisterOperation(IOperation operation) { // when the operation is finished, it will call either call success or fail operation.Next = success; operation.Terminate = fail; if (operations.Any()) { // link the last registered operation with the newly registered one operations.Last().Next = operation; } operations.Add(operation); } public void Invoke(T data) { var operation = operations.Any() ? operations.First() : fail; operation.Invoke(data); } private bool Success(T data) { Continue(data); return true; } private bool Fail(T data) { // we decide to bypass the circuit breaker Continue(data); return false; } private void Continue(T data) { Next?.Invoke(data); }}流水线被设计成绕过断路器。Success或Fail,它总是会继续父流水线序列并调用下一个操作。
// buildvar pipeline = new Pipeline();pipeline.RegisterOperation(new CreateOrderProcessor());pipeline.RegisterOperation(new PriceOrderProcessor());pipeline.RegisterOperation(new PaymentOrderProcessor(User.Users.ToDictionary(user => user.Id, user => user.InitialBalance)));pipeline.RegisterOperation(new DeliverOrderProcessor());var monitor = new Pipeline();monitor.RegisterOperation(pipeline);monitor.RegisterOperation(new Operation(order =>{ var report = order.Status == OrderStatus.Delivered ? "Success" : "Fail"; Console.WriteLine($"[IMPORTANT] Order {order.OrderId} Finished Processing : {report}"); return true;}));// processforeach (var product in GetOrders()) monitor.Invoke(product);这个场景有点复杂,所以我不会解释一切。这个想法是让不同的线程来处理传入的订单。订单处理完成后,我们会检查订单的状态。
class CreateOrderProcessor : Processor{ private readonly List orders = new List(); protected override bool Process(Order order) { order.OrderId = orders.Count; order.CreationTime = DateTime.UtcNow; order.Status = OrderStatus.Created; orders.Add(order); Console.WriteLine($"Create Order {order.OrderId}"); return true; }}每个订单处理器都隔离在一个专用线程中,因此您可以优化存储数据的方式并在不使用锁的情况下直接访问内存。
class PaymentOrderProcessor : Processor{ protected override bool Process(Order order) { var balance = GetBalance(order.UserId); var expected = balance - order.TotalPrice; if (expected >= 0) { Console.WriteLine($"Payment User {order.UserId} Order {order.OrderId} : {order.TotalPrice} USD | Balance {balance} -> {expected}"); SetBalance(order.UserId, expected); order.Status = OrderStatus.Payed; return true; } else { Console.WriteLine($"Insufficient Balance : User {order.UserId} Balance {balance} USD | Order {order.OrderId} : {order.TotalPrice} USD"); order.Status = OrderStatus.Canceled; return false; } }}支付订单处理器是唯一可以访问用户余额的线程。它可以获取或更新任何余额,而无需担心并发问题。
流水线正在尽可能快地处理操作序列。
结论流水线设计模式有很多非常不同的实现方式,从简单的命令链到更复杂的工作流。