Ein kleiner Codeauszug aus unser
Multithread Pipeline:
TPipeline.Configure( )
{} .Throttle( 10 ) // max. 10 Items in der OutputCollection
{} .Stage<Integer>(
procedure( Output: IBlockingCollection<Integer> )
var
i: Integer;
begin
for i := 1 to 100 do
Output.Add( GetDataFromWebservice( i ) )
end )
{} .NumTasks( 5 ) // Anzahl der Tasks für die nächste Stage
{} .Stage(
procedure( const Input: Integer; out Output: Integer )
begin
Output := CalculateData( Input );
end )
{} .Stage(
procedure( Input, Output: IBlockingCollection<Integer> )
var
v: Integer;
begin
for v in Input do
begin
WriteDataInDatabase( v );
end;
end )
{} .Run( );
{} .Throttle( 10 ) // max. 10 Items in der OutputCollection
{} .Stage<Integer>(
procedure( Output: IBlockingCollection<Integer> )
var
i: Integer;
begin
for i := 1 to 100 do
Output.Add( GetDataFromWebservice( i ) )
end )
{} .NumTasks( 5 ) // Anzahl der Tasks für die nächste Stage
{} .Stage(
procedure( const Input: Integer; out Output: Integer )
begin
Output := CalculateData( Input );
end )
{} .Stage(
procedure( Input, Output: IBlockingCollection<Integer> )
var
v: Integer;
begin
for v in Input do
begin
WriteDataInDatabase( v );
end;
end )
{} .Run( );
Der abgebildete Arbeitsprozess
holt in diesem Beispiel 100 Daten (hier zu Demonstration Integer-Werte) von
einem theoretischen Webserver (Im Thread). Maximal ( 10 ) Werte werden hierbei
in die Verarbeitungspipeline eingestellt.
Wenn die nachfolgende Stage ( In 5 Threads ) diese Werte nicht schnell
genug abarbeiten kann, werden die Threads die die Daten von Webserver holen,
nach Erreichen des Throttlewertes „schlafen“ gelegt. Das „Aufwachen“ passiert
natürlich sofort wenn wieder Platz in der Eingangsqueue ist. Dieses Verhalten
kontrolliert die BlockingCollection für alle Stages. Falls die Anzahl der
Threads nicht angegeben ist, gilt der default Wert.
No comments:
Post a Comment