Simple Pipe and Filters Implementation in C# with Fluent Interface Behavior

by jmorris 16. September 2009 21:51

Background

I am working on a project that requires a series of actions to be executed against an object and I immediatly thought: pipe and filters! Pipe and Filters is architectural pattern in which an event triggers a series of processing steps on a component, transforming it uniquely on each step. Each step is a called a filter component and the entire sequence of called the pipeline. The filter components take a message as input, do some sort of transformation on it and then send it to the next filter component for further processing. The most common implementations of a pipe and filter architecture are Unix programs where the output of one program can be linked to the output of another program, processing XML via a series of XSLT transformations, compilers (lexical analysis, parsing, semantic analysis, optomization and source code generation) and many, many other uses (think ASP.NET Http stack).

 

 
 Pipe and Filters Conceptual

The diagram above depicts the actors and message flow. The pump is the event originator which pushes messages into the pipeline to be processed by the individual filter components. The messages flow through the pipeline and become inputs for each filter component. Each filter performs it's processing/transformation/whatever on the message and then pushes the message back into the pipeline for further processing on the next filter component. The sink is the destination of the message after it has been processed by each filter component.

Pipe and Filters Implemented

For me, the best way for me to implement a design pattern is to see how others have implemented it. A quick google search and I came up with the two good examples (actually there are many, many, many examples!):

Both examples take advantage of the newer features of C# such as the Yield keyword (the purpose behind their posts?), which did not apply exactly to my needs. A little meddling however, and I came up with the following:

 
Simple Pipe and Filters Implementation

Here is the final code for the FilterBase<T>:

And the code for the Pipeline<T> class:


Here is rather weak unit test illustrating the usage.

Note that I added a fluent-interface type behavior to the Pipeline<T> class so that you can chain together the registration of the filters and finally execute the chain.

References:

 

Tags: , , ,

Comments (8) -

Peter
Peter
4/15/2010 7:51:25 AM #

What if we wanted to have parallel filters?

From your class diagrams, it looks as though it restricted to having one serial pipeline since it is done recursively.

To modify it into one which can work in parallel, do you think we can just traverse through the filters iteratively?

jmorris
jmorris
5/12/2010 9:28:11 PM #

@Peter - to be honest, I haven't thought of that scenario. Pipe and filters is inherently sequential since the outputs of the previous filter are the inputs of the next filter. I would think the way to parallel process this would be to act on the entire set of inputs in parallel.

Frank
Frank
8/26/2010 6:35:04 PM #

Hi, great post ... In order to do it multi-threaded I have just prepended an Observer pattern which instantiates a new filter chain for every new message to be sent through the pipeline and triggers the Execute on a background thread. Works for me at least. Wink

Just one question, how do you do error handling? What if one of the filters throws an exception.
Cheers, Frank

jmorris
jmorris
8/26/2010 10:19:59 PM #

@Frank - Thanks, your comments are much appreciated Smile

The only problem I see with taking a multithreaded approach is that the pattern itself is a value added proposition in that (as I stated in a comment above) the output of each filter is the input of the next, the synchronous nature cannot be guaranteed if each filter is a thread. However, you can (i have) made the action perform multithreaded operations (uploading a series of images for instance) that block the execution of the pipeline until the set of operations is completed....then continue to the next filter. However, as long as you make the pattern work for you in your situation, the better!

In terms of error handling, it really depends upon your strategy; do you want the execution to continue if an error occurs or do you want stop the whole process. You may even want a combination of the two...for example if one filter fails all fails (i.e. database transaction) or you could just handle and log an error and continue (i.e. one image fails uploading of many). I use both strategies in production code.

-Jeff

Frank
Frank
8/27/2010 2:05:08 PM #

Hi again,
what I meant to say about the multithreading was that I decorated my implementation of FilterBase<T> with an IOberserver<T> interface. Whenever the oberserver gets triggered (e.g. a new file has been found in a directory) it will do the  _root.Execute(T input) asynchronously. Only the root, all sucessors of the root will still be synchronous. So the method signatures of FilterBase do not change.

As for the error handling, unfortunately I will need both in combination as well ... Have to experiment a bit with results and states like exception counts etc.
Cheers, Frank

jmorris
jmorris
8/27/2010 6:28:29 PM #

@Frank - yeah, I gotcha Smile

IUnknown
IUnknown
6/27/2011 9:01:23 PM #

This can be parallel if the value returned to the pipe before going to the next filter. The pipe would buffer inputs and synchronize passing filter results. For example it would process the first buffered input against one filter, pass that return value to the next filter, and unbuffer the next input to the first filter while the second filter was also processing.

It would do this by scheduling threads from a thread pool of some sort. Because each filter is its own object, if you synchronize correctly the fitlers could run sequential or parallel with no code change. Only the pipe has to manage threading.

Hope that makes sense.

jmorris
jmorris
7/4/2011 6:02:53 AM #

In a real world application I have a filter that does a number of asynch operations. The next filter is run after signaling that the total operations have completed. In this case it's a workflow and each filter is an atomic piece of that workflow; some are asynch, some aren't. It works very well Smile

Jeff Morris

Tag cloud

Month List

Page List