Dataflows

Dataflow patterns

Autoflow

“Autoflow” is the default event that happens between consecutive Analyses

Autoflow

Upon success, each Job from Alpha will generate a Dataflow event on branch #1, which is connected to Analysis Beta. This is called autoflow as events seem to automatically flow from Alpha to Beta.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ 'Beta' ],
    },
},
{   -logic_name => 'Beta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
}
}

Concise autoflow

Same as above, but more concise.

{   -logic_name => 'Alpha',
    -flow_into  => [ 'Beta' ],
},
{   -logic_name => 'Beta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
}
}

Compact autoflow

Same as above, but even more concise

{   -logic_name => 'Alpha',
    -flow_into  => 'Beta'
},
{   -logic_name => 'Beta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
}
}

Custom, independent, dataflows

The autoflow mechanism only triggers one event, and only upon completion. To create more events, or under different circumstances, you can use “factory” patterns.

Factory

Analysis Alpha triggers zero, one or many dataflow events on branch #2 (this is the convention for non-autoflow events). In this pattern, Alpha is called the “factory”, Beta the “fan”.

{   -logic_name => 'Alpha',
    -flow_into  => {
       2 => [ 'Beta' ],
    },
},
{   -logic_name => 'Beta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
}
}

Factory in parallel of the autoflow

In the above example, nothing was connected to the branch #1 of Analysis Alpha. The default autoflow event was thus lost. You can in fact have both branches connected.

An Analysis can use multiple branches at the same time and for instance produce a fan of Jobs on branch #2 and still a Job on branch #1. Both stream of Jobs (Beta and Gamma) are executed in parallel.

{   -logic_name => 'Alpha',
    -flow_into  => {
       2 => [ 'Beta' ],
       1 => [ 'Gamma' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Gamma;
}
}

Many factories and an autoflow

There are virtually no restrictions on the number of branches that can be used. However, they have to be identified using integers, preferably positive integers for the sake of this tutorial, as negative branch numbers have a special meaning (which is addressed in Special Dataflow when Jobs Exceed Resource Limits).

{   -logic_name => 'Alpha',
    -flow_into  => {
       2 => [ 'Beta' ],
       3 => [ 'Gamma' ],
       4 => [ 'Delta' ],
       1 => [ 'Epsilon' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
{   -logic_name => 'Delta',
},
{   -logic_name => 'Epsilon',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Epsilon [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Epsilon</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="#4\n"];
	analysis_Alpha -> analysis_Epsilon [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#3\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Delta;
	analysis_Epsilon;
	analysis_Gamma;
}
}

Events connected to multiple targets

Events on a single dataflow branch can be connected to multiple targets.

{    -logic_name => 'Alpha',
     -flow_into  => {
        2 => [ 'Beta' , 'Gamma' ],
        1 => [ 'Delta', 'Epsilon' ],
     },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
{   -logic_name => 'Delta',
},
{   -logic_name => 'Epsilon',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Epsilon [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Epsilon</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> analysis_Epsilon [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Delta;
	analysis_Epsilon;
	analysis_Gamma;
}
}

Dependent dataflows and semaphores

eHive allows grouping of multiple branch definitions to create Job dependencies. For more detail, please see the section covering semaphores. Here is a typical example of a “factory”, “fan”, and “funnel” combining to form a “semaphore group”.

{   -logic_name => 'Alpha',
    -flow_into  => {
       '2->A' => [ 'Beta', 'Gamma' ],
       'A->1' => [ 'Delta' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
{   -logic_name => 'Delta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	dfr_p1_mp [fixedsize="1", height="0.01", label="dfr_p1_mp", shape="point", width="0.01"];
	dfr_p2_mp [fixedsize="1", height="0.01", label="dfr_p2_mp", shape="point", width="0.01"];
	dfr_p3_mp [fixedsize="1", height="0.01", label="dfr_p3_mp", shape="point", width="0.01"];
	analysis_Alpha -> dfr_p1_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#1"];
	analysis_Alpha -> dfr_p2_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	analysis_Alpha -> dfr_p3_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	dfr_p1_mp -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];
	dfr_p3_mp -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p3_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Delta;
	subgraph cluster_cl_dfr_p1_mp {
		label="";
		style="filled";
		colorscheme="blues9";
		fillcolor="2";
		color="2";
		analysis_Beta;
		analysis_Gamma;
	}
	dfr_p1_mp;
	dfr_p2_mp;
	dfr_p3_mp;
}
}
  • The -> operator groups the dataflow events together.
  • 2->A means that all the dataflow events on branch #2 will be grouped together in a group named “A”. Note that this name “A” is not related to the names of the Analyses, or the names of semaphore groups of other Analyses. Group names are single-letter codes, meaning that eHive allows up to 26 groups for each Analysis.
  • A->1 means that the Job resulting from the Dataflow event on branch #1 (the autoflow) has to wait for all the Jobs in group A before it can start. Delta is called the “funnel” Analysis.

Semaphore propagation

Dataflow from Jobs in a fan can be directed to seed additional Jobs. These seeded Jobs become part of the same fan, controlling the same semaphore and funnel. In eHive, this process is referred to as “semaphore propagation”.

{   -logic_name => 'Alpha',
    -flow_into  => {
       '2->A' => [ 'Beta' ],
       'A->1' => [ 'Delta' ],
    },
},
{   -logic_name => 'Beta',
    -flow_into  => {
        1 => [ 'Gamma' ],
    },
},
{   -logic_name => 'Gamma',
},
{   -logic_name => 'Delta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	dfr_p1_mp [fixedsize="1", height="0.01", label="dfr_p1_mp", shape="point", width="0.01"];
	dfr_p2_mp [fixedsize="1", height="0.01", label="dfr_p2_mp", shape="point", width="0.01"];
	analysis_Alpha -> dfr_p1_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#1"];
	analysis_Alpha -> dfr_p2_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	analysis_Beta -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	dfr_p1_mp -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Delta;
	subgraph cluster_cl_dfr_p1_mp {
		label="";
		style="filled";
		colorscheme="blues9";
		fillcolor="2";
		color="2";
		analysis_Beta;
		analysis_Gamma;
	}
	dfr_p1_mp;
	dfr_p2_mp;
}
}

Here, Delta will be blocked until all Beta Jobs have completed. It will also be blocked until any child Gamma Jobs that may have been seeded by the Beta Jobs are complete.

Dataflow using special error handling branches

The eHive system implements a limited exception handling mechanism that creates special dataflow when Jobs exceed resource limits. These events are generated on special branch -1 (if a MEMLIMIT error is detected), -2 (if a RUNLIMIT error is detected), or 0 (any other failure, see the detailed description below). Here, if Job Low_mem_Alpha fails due to MEMLIMIT, a High_mem_Alpha Job is seeded. Otherwise, a Beta Job is seeded.

{    -logic_name => 'Low_mem_Alpha',
     -flow_into  => {
        -1 => [ 'High_mem_Alpha' ],
         1 => [ 'Beta' ],
     },
},
{    -logic_name => 'High_mem_Alpha',
     -flow_into  => {
        1 => [ 'Beta' ],
     },
},
{    -logic_name => 'Beta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_High_mem_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">High_mem_Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Low_mem_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Low_mem_Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_High_mem_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Low_mem_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Low_mem_Alpha -> analysis_High_mem_Alpha [color="blue", fontcolor="blue", fontname="Helvetica", label="#-1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Beta;
	analysis_High_mem_Alpha;
	analysis_Low_mem_Alpha;
}
}

Note

In PipeConfig files you can use MEMLIMIT or RUNLIMIT as aliases of -1 and -2, or even MAIN instead of 1. They will automatically be transformed to numbers in the database and on diagrams (e.g. guiHive).

There is a generic event named ANYFAILURE (branch 0) that is triggered when the Worker disappears:

  • because of RUNLIMIT or MEMLIMIT, but these branches are not defined,
  • or for other reasons (KILLED_BY_USER, for instance)

Dataflow targets

Analysis

In eHive, a Job can create another Job via a Dataflow event by wiring the branch to another Analysis.

Dataflow to one Analysis

To direct dataflow events to seed a Job for another Analysis, simply name the target Analysis after the =>.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ 'Beta' ],
    },
},
{   -logic_name => 'Beta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
}
}

Dataflow to multiple Analyses

A single branch can be connected to seed Jobs for multiple Analyses. When a dataflow event happens, it will create a Job for each Analysis.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ 'Beta', 'Gamma' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Gamma;
}
}

Multiple dataflows to the same Analysis

Reciprocally, an Analysis can be the target of several branches coming from the same Analysis. Here, Jobs are created in Beta whenever there is an event on branch #2, in Gamma when there is an event on branch #2 or #3, and Delta when there is an event on branch #1.

{   -logic_name => 'Alpha',
    -flow_into  => {
       2 => [ 'Beta', 'Gamma' ],
       3 => [ 'Gamma' ],
       1 => [ 'Delta' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
{   -logic_name => 'Delta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#3\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Delta;
	analysis_Gamma;
}
}

Table

A Job can store data in a table via the dataflow mechanism, without the need to use raw SQL.

Dataflow to one table

To insert data passed in a dataflow event into a table, set the target after the => to a URL that contains the table_name key. URLs can be degenerate, i.e. skipping the part before the question mark (like below) or completely defined, i.e. starting with driver://user@host/database_name. Degenerate urls will default to the eHive database.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ '?table_name=Results_1' ],
    },
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	table_Results_1 [fillcolor="orange", fontcolor="black", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Results_1</td></tr></table>>, shape="tab", style="filled"];
	analysis_Alpha -> table_Results_1 [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	table_Results_1;
}
}

The parameters passed along with the dataflow event will determine how data is inserted into the table. Parameter values will be inserted as a row, in columns corresponding to the parameter names. For example, if the dataflow event on branch #1 has parameters foo = 42 and bar = “hello, world”, then the above example would work like the following SQL:

INSERT INTO Results_1 (foo, bar)
VALUES (42, "hello, world");

Dataflow to multiple tables

A branch can be connected to multiple tables. When a dataflow event happens, it will create a row in each of them.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ '?table_name=Results_1', '?table_name=Results_2' ],
    },
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	table_Results_1 [fillcolor="orange", fontcolor="black", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Results_1</td></tr></table>>, shape="tab", style="filled"];
	table_Results_2 [fillcolor="orange", fontcolor="black", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Results_2</td></tr></table>>, shape="tab", style="filled"];
	analysis_Alpha -> table_Results_1 [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> table_Results_2 [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	table_Results_1;
	table_Results_2;
}
}

Multiple dataflows to tables and Analyses

An Analysis can flow data to multiple targets, with Analysis and table types being freely mixed.

Rows inserted by table-dataflows are usually not linked to the emitting job_id. In the example below, a row from the table Results_1 will typically not have information about the Analysis (Job) that generated it. This can however be enabled by explicitly adding the job_id to the dataflow payload.

{   -logic_name => 'Alpha',
    -flow_into  => {
       2 => [ 'Beta', '?table_name=Results_1' ],
       1 => [ 'Gamma' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
    -flow_into  => {
       3 => [ '?table_name=Results_1' ],
    },
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	table_Results_1 [fillcolor="orange", fontcolor="black", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Results_1</td></tr></table>>, shape="tab", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> table_Results_1 [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Gamma -> table_Results_1 [color="blue", fontcolor="blue", fontname="Helvetica", label="#3\n"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Gamma;
	table_Results_1;
	table_Results_1;
}
}

Accumulator

The last type of dataflow target is the “Accumulator”. It is a way of passing data from fan Jobs to their funnel.

Single Accumulator

An Accumulator is defined with a special URL that contains the accu_name key. There are five types of Accumulators (scalar, pile, multiset, array and hash), all described in Accumulators.

Accumulators can only be connected to fan Analyses of a semaphore group. All the data flown into them is accumulated for the funnel to consume after it is released.

{   -logic_name => 'Alpha',
    -flow_into  => {
       '2->A' => [ 'Beta' ],
       'A->1' => [ 'Delta' ],
    },
},
{   -logic_name => 'Beta',
    -flow_into  => {
       1 => [ '?accu_name=pile_accu&accu_input_variable=pile_content&accu_address=[]' ],
    },
},
{   -logic_name => 'Delta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	dfr_p1_mp [fixedsize="1", height="0.01", label="dfr_p1_mp", shape="point", width="0.01"];
	dfr_p2_mp [fixedsize="1", height="0.01", label="dfr_p2_mp", shape="point", width="0.01"];
	sink_dfr_p1_mp [fillcolor="darkgreen", fontcolor="white", fontname="Courier", label="Accu", shape="invhouse", style="filled"];
	analysis_Alpha -> dfr_p1_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#1"];
	analysis_Alpha -> dfr_p2_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	analysis_Beta -> sink_dfr_p1_mp [arrowtail="crow", color="darkgreen", dir="both", fontcolor="darkgreen", fontname="Helvetica", label="#1\n=> pile_accu[]:=pile_content", style="dashed"];
	dfr_p1_mp -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Delta;
	subgraph cluster_cl_dfr_p1_mp {
		label="";
		style="filled";
		colorscheme="blues9";
		fillcolor="2";
		color="2";
		analysis_Beta;
		sink_dfr_p1_mp;
	}
	dfr_p1_mp;
	dfr_p2_mp;
}
}

Multiple Accumulators and semaphore propagation

During the semaphore propagation, more Jobs are added to the current semaphore-group in order to block the current funnel. Similarly a funnel may receive data from multiple Accumulators (possibly fed by different Analyses) of a semaphore-group.

{   -logic_name => 'Alpha',
    -flow_into  => {
       '2->A' => [ 'Beta' ],
       'A->1' => [ 'Delta' ],
    },
},
{   -logic_name => 'Beta',
    -flow_into  => {
       2 => [ 'Gamma' ],
       1 => [ '?accu_name=pile_accu&accu_input_variable=pile_content&accu_address=[]' ],
    },
},
{   -logic_name => 'Gamma',
    -flow_into  => {
       1 => [ '?accu_name=multiset_accu&accu_input_variable=set_content&accu_address={}' ],
    },
},
{   -logic_name => 'Delta',
}
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	dfr_p1_mp [fixedsize="1", height="0.01", label="dfr_p1_mp", shape="point", width="0.01"];
	dfr_p2_mp [fixedsize="1", height="0.01", label="dfr_p2_mp", shape="point", width="0.01"];
	sink_dfr_p1_mp [fillcolor="darkgreen", fontcolor="white", fontname="Courier", label="Accu", shape="invhouse", style="filled"];
	analysis_Alpha -> dfr_p1_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#1"];
	analysis_Alpha -> dfr_p2_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	analysis_Beta -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Beta -> sink_dfr_p1_mp [arrowtail="crow", color="darkgreen", dir="both", fontcolor="darkgreen", fontname="Helvetica", label="#1\n=> pile_accu[]:=pile_content", style="dashed"];
	analysis_Gamma -> sink_dfr_p1_mp [arrowtail="crow", color="darkgreen", dir="both", fontcolor="darkgreen", fontname="Helvetica", label="#1\n=> multiset_accu{}:=set_content", style="dashed"];
	dfr_p1_mp -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Delta;
	subgraph cluster_cl_dfr_p1_mp {
		label="";
		style="filled";
		colorscheme="blues9";
		fillcolor="2";
		color="2";
		analysis_Beta;
		analysis_Gamma;
		sink_dfr_p1_mp;
		sink_dfr_p1_mp;
	}
	dfr_p1_mp;
	dfr_p2_mp;
}
}

Conditional dataflows

eHive provides a mechanism to filter dataflow events. It allows mapping a given branch number to some targets on certain conditions.

The filtering happens based on the values of the parameters. It uses a WHEN-ELSE syntax. It is similar to traditional IF-THEN conditions but with some important differences:

  1. WHEN happens when a condition is true.
  2. There can be multiple WHEN cases, and more than one WHEN can flow (as long as they are true).
  3. ELSE is the catch-all if none of the WHEN cases are true.
{   -logic_name => 'Alpha',
    -flow_into  => {
       '2->A' => WHEN(
                    '#a# > 3' => [ 'Beta' ],
                    '#a# > 5' => [ 'Gamma' ],
                    ELSE         [ 'Delta' ],
                 ),
       'A->1' => [ 'Epsilon' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
{   -logic_name => 'Delta',
},
{   -logic_name => 'Epsilon',
}
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Epsilon [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Epsilon</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	dfr_p1_mp [fixedsize="1", height="0.01", label="dfr_p1_mp", shape="point", width="0.01"];
	dfr_p2_mp [fillcolor="blueviolet", fontcolor="white", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1">i<tr><td></td></tr><tr><td port="cond_0">WHEN #a# &gt; 5</td></tr><tr><td port="cond_1">WHEN #a# &gt; 3</td></tr><tr><td port="cond_2">ELSE</td></tr></table>>, shape="egg", style="filled"];
	analysis_Alpha -> dfr_p1_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#1"];
	analysis_Alpha -> dfr_p2_mp [arrowhead="normal", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	dfr_p1_mp -> analysis_Epsilon [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="cond_1"];
	dfr_p2_mp -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="cond_2"];
	dfr_p2_mp -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="cond_0"];
	dfr_p2_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];

subgraph cluster_tmp60i3x12j {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Epsilon;
	subgraph cluster_cl_dfr_p1_mp {
		label="";
		style="filled";
		colorscheme="blues9";
		fillcolor="2";
		color="2";
		analysis_Beta;
		analysis_Delta;
		analysis_Gamma;
	}
	dfr_p1_mp;
	dfr_p2_mp;
}
}

This examples shows how single and multiple WHEN cases are handled, together with their ELSE clause.

Value of a Active targets
2 Delta
4 Beta
6 Beta, Gamma

Dataflow syntax

  • At the highest level, the -flow_into is either a hash associating branch tags to targets, or a target directly, in which case the branch tag is assumed to be 1.
  • Branch tags are branch numbers (integers, the same as you would use in a Runnable when calling dataflow_output_id) that may be grouped into semaphores by adding an arrow and a letter code that identifies the group.
  • Essentially, targets are most of the time (local) Analysis names, but can also be remote Analysis names, or accumulator URLs (local or remote).
  • Dataflows to these targets can be further controlled in two manners:
    • They can be made conditional using a WHEN group and a condition. A WHEN group can have as many conditions as you wish, which can overlap, and an optional ELSE clause that acts as a catch-all (i.e. is activated when no conditions are met).
    • The hash of parameters passed to dataflow_output_id can be transformed before reaching the target with a template, which defines a new hash of parameters that will be evaluated using eHive’s parameter substitution mechanism.

Here is a pseudo-BNF definition of the syntax used to model dataflows in PipeConfig files.

flow-into              = <dataflow-hash> | <target-group>

dataflow-hash          = "{" <branch-tag> "=>" <target-group> "," * "}"

branch-tag             = <integer>
                       | <letter> "->" <integer>
                       | <integer> "->" <letter>

target-group           = <conditional-flow>
                       | <target-names>
                       | <targets-with-template>

conditional-flow       = "WHEN(" <condition-clause> * <else-clause> ")"

condition-clause       = <condition> "=>" (<target-names> | <targets-with-template>) ","

else-clause            = "ELSE" "=>" (<target-names> | <targets-with-template>)

target-names           = "[" <target-name> * "]"

targets-with-template  = "{" <target-name> "=>" (<template> | "[" <template> "," * "]" ) "}"

template               = "undef"
                       | "{" <param-name> "=> "<param-value> "," * "}"

target-name            = <analysis-name>
                       | <accumulator-url>
                       | <remote-analysis-url>