10 --inputcmd
'cd ${ENSEMBL_CVS_ROOT_DIR}/ensembl-hive/modules/Bio/EnsEMBL/Hive/RunnableDB; ls -1 *.pm' \
11 --flow_into
"{ 2 => { 'mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2914/lg4_compara_families_70/meta' => {'meta_key'=>'module_name','meta_value'=>'#_0#'} } }""
15 This is a generic RunnableDB module for creating batches of similar jobs using dataflow mechanism
16 (a fan of jobs is created in one branch and the funnel in another).
17 Make sure you wire this buliding block properly from outside.
19 You can supply as parameter one of 4 sources of ids from which the batches will be generated:
21 param('inputlist'); The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']
23 param('inputfile'); The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'
25 param('inputquery'); The list is generated by an SQL query (against the production database by default) : 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
27 param('inputcmd'); The list is generated by running a system command: 'inputcmd' => 'find /tmp/big_directory -type f'
29 NB for developpers: fetch_input() method is intentionally missing from JobFactory.pm .
30 If JobFactory is subclassed (say, by a Compara RunnableDB) the child class's should use fetch_input()
31 to set $self->param('inputlist') to whatever list of ids specific to that particular type of data (slices, members, etc).
32 The rest functionality will be taken care for by the parent class code.
36 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
37 Copyright [2016-2024] EMBL-European Bioinformatics Institute
39 Licensed under the Apache License, Version 2.0 (the "License
"); you may not use this file except in compliance with the License.
40 You may obtain a copy of the License at
42 http://www.apache.org/licenses/LICENSE-2.0
44 Unless required by applicable law or agreed to in writing, software distributed under the License
45 is distributed on an "AS IS
" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
46 See the License for the specific language governing permissions and limitations under the License.
50 Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
55 package Bio::EnsEMBL::Hive::RunnableDB::JobFactory;
60 use base ('Bio::EnsEMBL::Hive::Process');
71 'input_id' => 0, # this parameter is no longer supported and should stay at 0
75 'inputquery' => undef,
78 'fan_branch_code' => 2,
80 'use_bash_pipefail' => 0, # Boolean. When true, the command will be run with "bash -o pipefail -c $cmd
". Useful to capture errors in a command that contains pipes
88 Description : Implements run() interface method of Bio::EnsEMBL::Hive::Process that is used to perform the main bulk of the job (minus input and output).
90 param('column_names'): Controls the column names that come out of the parser: 0 = "no names
", 1 = "parse names from data
", arrayref = "take names from
this array
"
92 param('delimiter'): If you set it your lines in file/cmd mode will be split into columns that you can use individually when constructing the input_id_template hash.
94 param('randomize'): Shuffles the rows before creating jobs - can sometimes lead to better overall performance of the pipeline. Doesn't make any sence for minibatches (step>1).
96 param('step'): The requested size of the minibatch (1 by default). The real size of a range may be smaller than the requested size.
98 param('contiguous'): Whether the key_column range of each minibatch should be contiguous (0 by default).
100 param('key_column'): If every line of your input is a list (it happens, for example, when your SQL returns multiple columns or you have set the 'delimiter' in file/cmd mode)
101 this is the way to say which column is undergoing 'ranging'
104 # The following 4 parameters are mutually exclusive and define the source of ids for the jobs:
106 param('inputlist'); The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']
108 param('inputfile'); The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'
110 param('inputquery'); The list is generated by an SQL query (against the production database by default) : 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
112 param('inputcmd'); The list is generated by running a system command: 'inputcmd' => 'find /tmp/big_directory -type f'
119 my $column_names = $self->param('column_names'); # can be 0 (no names), 1 (names from data) or an arrayref (names from this array)
120 my $delimiter = $self->param('delimiter');
122 my $randomize = $self->param('randomize');
124 # minibatching-related:
125 my $step = $self->param('step');
126 my $contiguous = $self->param('contiguous');
127 my $key_column = $self->param('key_column');
129 my $inputlist = $self->param('inputlist');
130 my $inputfile = $self->param('inputfile');
131 my $inputquery = $self->param('inputquery');
132 my $inputcmd = $self->param('inputcmd');
134 my $parse_column_names = $column_names && (ref($column_names) ne 'ARRAY');
136 my ($rows, $column_names_from_data) =
137 $inputlist ? $self->_get_rows_from_list( $inputlist )
138 : $inputquery ? $self->_get_rows_from_query( $inputquery )
139 : $inputfile ? $self->_get_rows_from_open( $inputfile , '<', $delimiter, $parse_column_names )
140 : $inputcmd ? $self->_get_rows_from_open( ($self->param('use_bash_pipefail') ? 'set -o pipefail; ': '').$inputcmd, '-|', $delimiter, $parse_column_names )
141 : die "range of values should be defined by setting
'inputlist',
'inputquery',
'inputfile' or
'inputcmd'";
143 if( $column_names_from_data # column data is available
144 and ( defined($column_names) ? (ref($column_names) ne 'ARRAY') : 1 ) # and is badly needed
146 $column_names = $column_names_from_data;
148 # after this point $column_names should either contain a list or be false
150 if( $self->param('input_id') ) {
151 die "'input_id' is no longer supported, please reconfigure as the input_id_template of the dataflow_rule
";
155 _fisher_yates_shuffle_in_place($rows);
158 my $output_ids = $step
159 ? $self->_substitute_minibatched_rows($rows, $column_names, $step, $contiguous, $key_column)
160 : $self->_substitute_rows($rows, $column_names);
162 $self->param('output_ids', $output_ids);
168 Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
169 Here we rely on the dataflow mechanism to create jobs.
171 param('fan_branch_code'): defines the branch where the fan of jobs is created (2 by default).
175 sub write_output { # nothing to write out, but some dataflow to perform:
178 my $output_ids = $self->param('output_ids');
179 my $fan_branch_code = $self->param('fan_branch_code');
181 # "fan out
" into fan_branch_code:
182 $self->dataflow_output_id($output_ids, $fan_branch_code);
186 ################################### main functionality starts here ###################
189 =head2 _get_rows_from_list
191 Description: a private method that ensures the list is 2D
195 sub _get_rows_from_list {
196 my ($self, $inputlist) = @_;
198 return ref($inputlist->[0])
200 : [ map { [ $_ ] } @$inputlist ];
204 =head2 _get_rows_from_query
206 Description: a private method that loads ids from a given sql query
208 param('db_conn'): An optional hash to pass in connection parameters to the database upon which the query will have to be run.
212 sub _get_rows_from_query {
213 my ($self, $inputquery) = @_;
215 $self->say_with_header(qq{inputquery = "$inputquery
"});
217 my $sth = $self->data_dbc()->prepare($inputquery);
219 my @column_names_from_data = @{$sth->dbi_sth()->{'NAME'}}; # tear it off the original reference to gain some freedom
221 while (my @cols = $sth->fetchrow_array()) {
226 $self->data_dbc()->disconnect_if_idle();
228 return (\@rows, \@column_names_from_data);
232 =head2 _get_rows_from_open
234 Description: a private method that loads ids from a given file or command pipe
238 sub _get_rows_from_open {
239 my ($self, $input_file_or_command, $open_mode, $delimiter, $parse_header) = @_;
241 $self->say_with_header(qq{input_file_or_command = "$input_file_or_command
" [$open_mode]});
243 open(my $fh, $open_mode, $input_file_or_command) or die "Could not open
'$input_file_or_command' because: $!
";
244 while(my $line = <$fh>) {
247 push @rows, [ defined($delimiter) ? split(/$delimiter/, $line) : $line ];
250 or die "Could not read from $open_mode
'$input_file_or_command'. Received the error
".($! || $?);
252 my $column_names_from_data = $parse_header ? shift @rows : 0;
254 return (\@rows, $column_names_from_data);
258 =head2 _substitute_rows
260 Description: a private method that goes through a list and transforms every row into a hash
264 sub _substitute_rows {
265 my ($self, $rows, $column_names) = @_;
269 foreach my $row (@$rows) {
270 my $job_param_hash = $column_names
271 ? { map { ($column_names->[$_] => $row->[$_]) } (0..scalar(@$row)-1) }
272 : { '_' => $row, map { ("_$_
" => $row->[$_]) } (0..scalar(@$row)-1) };
274 push @hashes, $job_param_hash;
280 =head2 _substitute_minibatched_rows
282 Description: a private method that minibatches a list and transforms every minibatch using param-substitution
286 sub _substitute_minibatched_rows {
287 my ($self, $rows, $column_names, $step, $contiguous, $key_column) = @_;
292 my $start_row = shift @$rows;
293 my $range_start = $start_row->[$key_column];
294 my @range_list = ( $range_start );
296 my $range_end = $range_start;
298 my $next_row = $start_row; # safety, in case the internal while doesn't execute even once
299 my $last_row = $next_row; # last row in the current minibatch
301 while($range_count<$step && @$rows) {
302 $next_row = shift @$rows;
303 my $next_value = $next_row->[$key_column];
305 my $predicted_next = $range_end; # ++$predicted_next is used instead of ($range_end+1) as "string increment
" to turn 'aa' into 'ab' into 'ac', etc.
306 if(!$contiguous or (++$predicted_next eq $next_value)) {
308 push @range_list, $next_value;
309 $range_end = $next_value;
310 $last_row = $next_row;
313 unshift @$rows, $next_row;
319 '_range_start' => $range_start,
320 '_range_end' => $range_end,
321 '_range_count' => $range_count,
322 '_range_list' => \@range_list,
325 ? map { ('_start_'.$column_names->[$_] => $start_row->[$_], '_end_'.$column_names->[$_] => $last_row->[$_]) } (0..scalar(@$start_row)-1)
326 : map { ("_start_$_
" => $start_row->[$_], "_end_$_
" => $last_row->[$_]) } (0..scalar(@$start_row)-1)
328 push @ranges, $job_range;
334 =head2 _fisher_yates_shuffle_in_place
336 Description: a private function (not a method) that shuffles a list of ids
340 sub _fisher_yates_shuffle_in_place {
341 my $array = shift @_;
343 for(my $upper=scalar(@$array);--$upper;) {
344 my $lower=int(rand($upper+1));
345 next if $lower == $upper;
346 @$array[$lower,$upper] = @$array[$upper,$lower];