Dataflow targets

Analysis

In eHive, a Job can create another Job via a Dataflow event by wiring the branch to another Analysis.

Dataflow to one Analysis

To direct dataflow events to seed a Job for another Analysis, simply name the target Analysis after the =>.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ 'Beta' ],
    },
},
{   -logic_name => 'Beta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
}
}

Dataflow to multiple Analyses

A single branch can be connected to seed Jobs for multiple Analyses. When a dataflow event happens, it will create a Job for each Analysis.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ 'Beta', 'Gamma' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Gamma;
}
}

Multiple dataflows to the same Analysis

Reciprocally, an Analysis can be the target of several branches coming from the same Analysis. Here, Jobs are created in Beta whenever there is an event on branch #2, in Gamma when there is an event on branch #2 or #3, and Delta when there is an event on branch #1.

{   -logic_name => 'Alpha',
    -flow_into  => {
       2 => [ 'Beta', 'Gamma' ],
       3 => [ 'Gamma' ],
       1 => [ 'Delta' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
{   -logic_name => 'Delta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#3\n"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Delta;
	analysis_Gamma;
}
}

Table

A Job can store data in a table via the dataflow mechanism, without the need to use raw SQL.

Dataflow to one table

To insert data passed in a dataflow event into a table, set the target after the => to a URL that contains the table_name key. URLs can be degenerate, i.e. skipping the part before the question mark (like below) or completely defined, i.e. starting with driver://user@host/database_name. Degenerate urls will default to the eHive database.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ '?table_name=Results_1' ],
    },
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	table_Results_1 [fillcolor="orange", fontcolor="black", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Results_1</td></tr></table>>, shape="tab", style="filled"];
	analysis_Alpha -> table_Results_1 [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	table_Results_1;
}
}

The parameters passed along with the dataflow event will determine how data is inserted into the table. Parameter values will be inserted as a row, in columns corresponding to the parameter names. For example, if the dataflow event on branch #1 has parameters foo = 42 and bar = “hello, world”, then the above example would work like the following SQL:

INSERT INTO Results_1 (foo, bar)
VALUES (42, "hello, world");

Dataflow to multiple tables

A branch can be connected to multiple tables. When a dataflow event happens, it will create a row in each of them.

{   -logic_name => 'Alpha',
    -flow_into  => {
       1 => [ '?table_name=Results_1', '?table_name=Results_2' ],
    },
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	table_Results_1 [fillcolor="orange", fontcolor="black", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Results_1</td></tr></table>>, shape="tab", style="filled"];
	table_Results_2 [fillcolor="orange", fontcolor="black", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Results_2</td></tr></table>>, shape="tab", style="filled"];
	analysis_Alpha -> table_Results_1 [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> table_Results_2 [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	table_Results_1;
	table_Results_2;
}
}

Multiple dataflows to tables and Analyses

An Analysis can flow data to multiple targets, with Analysis and table types being freely mixed.

Rows inserted by table-dataflows are usually not linked to the emitting job_id. In the example below, a row from the table Results_1 will typically not have information about the Analysis (Job) that generated it. This can however be enabled by explicitly adding the job_id to the dataflow payload.

{   -logic_name => 'Alpha',
    -flow_into  => {
       2 => [ 'Beta', '?table_name=Results_1' ],
       1 => [ 'Gamma' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
    -flow_into  => {
       3 => [ '?table_name=Results_1' ],
    },
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	table_Results_1 [fillcolor="orange", fontcolor="black", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Results_1</td></tr></table>>, shape="tab", style="filled"];
	analysis_Alpha -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Alpha -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#1\n"];
	analysis_Alpha -> table_Results_1 [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Gamma -> table_Results_1 [color="blue", fontcolor="blue", fontname="Helvetica", label="#3\n"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Beta;
	analysis_Gamma;
	table_Results_1;
	table_Results_1;
}
}

Accumulator

The last type of dataflow target is the “Accumulator”. It is a way of passing data from fan Jobs to their funnel.

Single Accumulator

An Accumulator is defined with a special URL that contains the accu_name key. There are five types of Accumulators (scalar, pile, multiset, array and hash), all described in Accumulators.

Accumulators can only be connected to fan Analyses of a semaphore group. All the data flown into them is accumulated for the funnel to consume after it is released.

{   -logic_name => 'Alpha',
    -flow_into  => {
       '2->A' => [ 'Beta' ],
       'A->1' => [ 'Delta' ],
    },
},
{   -logic_name => 'Beta',
    -flow_into  => {
       1 => [ '?accu_name=pile_accu&accu_input_variable=pile_content&accu_address=[]' ],
    },
},
{   -logic_name => 'Delta',
},
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	dfr_p1_mp [fixedsize="1", height="0.01", label="dfr_p1_mp", shape="point", width="0.01"];
	dfr_p2_mp [fixedsize="1", height="0.01", label="dfr_p2_mp", shape="point", width="0.01"];
	sink_dfr_p1_mp [fillcolor="darkgreen", fontcolor="white", fontname="Courier", label="Accu", shape="invhouse", style="filled"];
	analysis_Alpha -> dfr_p1_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#1"];
	analysis_Alpha -> dfr_p2_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	analysis_Beta -> sink_dfr_p1_mp [arrowtail="crow", color="darkgreen", dir="both", fontcolor="darkgreen", fontname="Helvetica", label="#1\n=> pile_accu[]:=pile_content", style="dashed"];
	dfr_p1_mp -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Delta;
	subgraph "cluster_cl_dfr_p1_mp" {
		label="";
		style="filled";
		colorscheme="blues9";
		fillcolor="2";
		color="2";
		analysis_Beta;
		sink_dfr_p1_mp;
	}
	dfr_p1_mp;
	dfr_p2_mp;
}
}

Multiple Accumulators and semaphore propagation

During the semaphore propagation, more Jobs are added to the current semaphore-group in order to block the current funnel. Similarly a funnel may receive data from multiple Accumulators (possibly fed by different Analyses) of a semaphore-group.

{   -logic_name => 'Alpha',
    -flow_into  => {
       '2->A' => [ 'Beta' ],
       'A->1' => [ 'Delta' ],
    },
},
{   -logic_name => 'Beta',
    -flow_into  => {
       2 => [ 'Gamma' ],
       1 => [ '?accu_name=pile_accu&accu_input_variable=pile_content&accu_address=[]' ],
    },
},
{   -logic_name => 'Gamma',
    -flow_into  => {
       1 => [ '?accu_name=multiset_accu&accu_input_variable=set_content&accu_address={}' ],
    },
},
{   -logic_name => 'Delta',
}
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	dfr_p1_mp [fixedsize="1", height="0.01", label="dfr_p1_mp", shape="point", width="0.01"];
	dfr_p2_mp [fixedsize="1", height="0.01", label="dfr_p2_mp", shape="point", width="0.01"];
	sink_dfr_p1_mp [fillcolor="darkgreen", fontcolor="white", fontname="Courier", label="Accu", shape="invhouse", style="filled"];
	analysis_Alpha -> dfr_p1_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#1"];
	analysis_Alpha -> dfr_p2_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	analysis_Beta -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="#2\n"];
	analysis_Beta -> sink_dfr_p1_mp [arrowtail="crow", color="darkgreen", dir="both", fontcolor="darkgreen", fontname="Helvetica", label="#1\n=> pile_accu[]:=pile_content", style="dashed"];
	analysis_Gamma -> sink_dfr_p1_mp [arrowtail="crow", color="darkgreen", dir="both", fontcolor="darkgreen", fontname="Helvetica", label="#1\n=> multiset_accu{}:=set_content", style="dashed"];
	dfr_p1_mp -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Delta;
	subgraph "cluster_cl_dfr_p1_mp" {
		label="";
		style="filled";
		colorscheme="blues9";
		fillcolor="2";
		color="2";
		analysis_Beta;
		analysis_Gamma;
		sink_dfr_p1_mp;
		sink_dfr_p1_mp;
	}
	dfr_p1_mp;
	dfr_p2_mp;
}
}

Conditional dataflows

eHive provides a mechanism to filter dataflow events. It allows mapping a given branch number to some targets on certain conditions.

The filtering happens based on the values of the parameters. It uses a WHEN-ELSE syntax. It is similar to traditional IF-THEN conditions but with some important differences:

  1. WHEN happens when a condition is true.

  2. There can be multiple WHEN cases, and more than one WHEN can flow (as long as they are true).

  3. ELSE is the catch-all if none of the WHEN cases are true.

{   -logic_name => 'Alpha',
    -flow_into  => {
       '2->A' => WHEN(
                    '#a# > 3' => [ 'Beta' ],
                    '#a# > 5' => [ 'Gamma' ],
                    ELSE         [ 'Delta' ],
                 ),
       'A->1' => [ 'Epsilon' ],
    },
},
{   -logic_name => 'Beta',
},
{   -logic_name => 'Gamma',
},
{   -logic_name => 'Delta',
},
{   -logic_name => 'Epsilon',
}
digraph test {
	ratio="compress"; concentrate = "true"; name = "AnalysisWorkflow"; pad = "0";
	analysis_Alpha [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Alpha</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Beta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Beta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Delta [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Delta</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Epsilon [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Epsilon</td></tr></table>>, shape="Mrecord", style="filled"];
	analysis_Gamma [fillcolor="white", fontname="Times-Roman", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1"><tr><td colspan="1">Gamma</td></tr></table>>, shape="Mrecord", style="filled"];
	dfr_p1_mp [fixedsize="1", height="0.01", label="dfr_p1_mp", shape="point", width="0.01"];
	dfr_p2_mp [fillcolor="blueviolet", fontcolor="white", fontname="Courier", label=<<table border="0" cellborder="0" cellspacing="0" cellpadding="1">i<tr><td></td></tr><tr><td port="cond_0">WHEN #a# &gt; 5</td></tr><tr><td port="cond_1">WHEN #a# &gt; 3</td></tr><tr><td port="cond_2">ELSE</td></tr></table>>, shape="egg", style="filled"];
	analysis_Alpha -> dfr_p1_mp [arrowhead="none", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#1"];
	analysis_Alpha -> dfr_p2_mp [arrowhead="normal", color="black", fontcolor="black", fontname="Helvetica", headport="n", label="#2"];
	dfr_p1_mp -> analysis_Epsilon [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="s"];
	dfr_p2_mp -> analysis_Beta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="cond_1"];
	dfr_p2_mp -> analysis_Delta [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="cond_2"];
	dfr_p2_mp -> analysis_Gamma [color="blue", fontcolor="blue", fontname="Helvetica", label="\n", tailport="cond_0"];
	dfr_p2_mp -> dfr_p1_mp [arrowhead="tee", arrowtail="crow", color="red", dir="both", style="dashed"];

subgraph "cluster_tmpl5m9wtpx" {
	label="";
	style="filled";
	colorscheme="blues9";
	fillcolor="1";
	color="1";
	analysis_Alpha;
	analysis_Epsilon;
	subgraph "cluster_cl_dfr_p1_mp" {
		label="";
		style="filled";
		colorscheme="blues9";
		fillcolor="2";
		color="2";
		analysis_Beta;
		analysis_Delta;
		analysis_Gamma;
	}
	dfr_p1_mp;
	dfr_p2_mp;
}
}

This examples shows how single and multiple WHEN cases are handled, together with their ELSE clause.

Value of a

Active targets

2

Delta

4

Beta

6

Beta, Gamma