Heaven is a halfpipe-line

Een proces kan je opdelen in kleinere taken. Deze worden achter elkaar uitgevoerd. Wanneer je het proces vaak moet herhalen, bijv. bij het verwerken van een datastream, kan dat lang duren.

Stel je een vliegveld voor, waarbij mensen de vertrekhal binnenlopen, moeten inchecken, bagage moeten afgeven, door de douanecontrole, en bij het vliegtuig hun ticket moeten tonen voordat ze instappen. Wanneer je dit proces in code omzet, krijg je een overzichtelijk proces met deze stappen, maar is niet efficiënt. Een vliegveld met maar één toegangsdeur en één medewerker die maar één passagier per keer kan behandelen levert een grote rij boze reizigers op.

Dit proces zou al kunnen worden versneld als je meerdere personen in dienst neemt die ieder een deel van het process uitvoert. En als je tussen iedere subtaak een wachtruimte hebt, zodat de ene subtaak niet hoeft te wachten op de volgende.

Pipeline Pattern

Dat is de oplossing die het Pipeline Pattern biedt. De  bestaat uit een TaskFactory, die de uitvoering van de taken coördineert en een BlockingCollection, een iteratie waarmee je wachtrijen tussen de stappen creëert en communiceert met de TaskFactory.

De taken worden toegevoegd aan de TaskFactory. Iedere taak is een method die een wachtij als aanlevering van objecten accepteert (consumer) en die de output weer naar de volgende wachtrij doorstuurt. De TaskFactory zorgt ervoor dat de taken tegelijkertijd hun werk doen.

Alsof je op ons vliegveld meerdere medewerkers kan inzetten. En waar voorheen een passagier nog niet weg kon bij de incheckbalie omdat de volgende stap, de bagageafgifte, nog bezet was, wordt deze ingecheckte passagier nu naar de wachtrij gestuurd zodat een nieuwe passagier alvast kan inchecken.

BlockingCollection

De BlockingCollection is geen gewone lijst: de lijst kan een object leveren aan de volgende taak en tegelijk kan de voorgaande taak er zijn bewerkte object aan toevoegen. Maar GetEnumerable() moet je niet gebruiken, want dan krijg je alleen een statische lijst met objecten die op dat moment aanwezig waren.

De BlockingCollection heeft hiervoor een ‘GetConsumingEnumerable()’ method. De lijst kan veranderen en hiermee wordt altijd gekeken naar de actuele objecten in de lijst (die nog niet zijn behandeld.

Wanneer een volgende taak in een pipeline nog bezig is, dan kan de eerste taak zijn objecten dus doorgeven aan de wachtij in de BlockingCollection en gewoon doorgaan met zijn werk. Maar een wachtrij is niet oneindig. Als de volgende wachtrij vol is, krijgt de GetConsumingEnumerable() van de eerste taak een seintje om even te pauzeren, net zolang totdat er weer ruimte in de volgende wachtrij ontstaat.

De CompleteAdding() methode moet worden aangeroepen door de eerste taak wanneer het laatste object is toegevoegd aan de pipeline. Hiermee weet iedere GetConsumingEnumerable() wanneer het werk er op zit zodat er geen resources eindeloos worden verspild door te blijven wachten.

Op ons vliegveld is het namelijk ook zo beleefd om de grondstewardess te informeren dat er geen passagiers meer gaan inchecken, zodat ze naar huis kan.

Omdat het werk parallel wordt gedaan bestaat er bij een fout het risico op bijv. een deadlock.

Dit kan worden beperkt door Exceptions goed af te vangen en aan iedere taak een CancellationTakenSource mee te geven en goed in te bouwen. Hiermee kan ook van buitenaf een proces op gecontroleerde wijze worden beëindigd. Het gebruik van de IDispose op de objecten is daarbij aan te raden.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s