ensembl-hive  2.6
JobFactory.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 SYNOPSIS
8 
10  --inputcmd 'cd ${ENSEMBL_CVS_ROOT_DIR}/ensembl-hive/modules/Bio/EnsEMBL/Hive/RunnableDB; ls -1 *.pm' \
11  --flow_into "{ 2 => { 'mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2914/lg4_compara_families_70/meta' => {'meta_key'=>'module_name','meta_value'=>'#_0#'} } }""
12 
13 =head1 DESCRIPTION
14 
15  This is a generic RunnableDB module for creating batches of similar jobs using dataflow mechanism
16  (a fan of jobs is created in one branch and the funnel in another).
17  Make sure you wire this buliding block properly from outside.
18 
19  You can supply as parameter one of 4 sources of ids from which the batches will be generated:
20 
21  param('inputlist'); The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']
22 
23  param('inputfile'); The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'
24 
25  param('inputquery'); The list is generated by an SQL query (against the production database by default) : 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
26 
27  param('inputcmd'); The list is generated by running a system command: 'inputcmd' => 'find /tmp/big_directory -type f'
28 
29  NB for developpers: fetch_input() method is intentionally missing from JobFactory.pm .
30  If JobFactory is subclassed (say, by a Compara RunnableDB) the child class's should use fetch_input()
31  to set $self->param('inputlist') to whatever list of ids specific to that particular type of data (slices, members, etc).
32  The rest functionality will be taken care for by the parent class code.
33 
34 =head1 LICENSE
35 
36  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
37  Copyright [2016-2024] EMBL-European Bioinformatics Institute
38 
39  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
40  You may obtain a copy of the License at
41 
42  http://www.apache.org/licenses/LICENSE-2.0
43 
44  Unless required by applicable law or agreed to in writing, software distributed under the License
45  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
46  See the License for the specific language governing permissions and limitations under the License.
47 
48 =head1 CONTACT
49 
50  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
51 
52 =cut
53 
54 
55 package Bio::EnsEMBL::Hive::RunnableDB::JobFactory;
56 
57 use strict;
58 use warnings;
59 
60 use base ('Bio::EnsEMBL::Hive::Process');
61 
62 
63 sub param_defaults {
64  return {
65  'column_names' => 0,
66  'delimiter' => undef,
67  'randomize' => 0,
68  'step' => 0,
69  'contiguous' => 0,
70  'key_column' => 0,
71  'input_id' => 0, # this parameter is no longer supported and should stay at 0
72 
73  'inputlist' => undef,
74  'inputfile' => undef,
75  'inputquery' => undef,
76  'inputcmd' => undef,
77 
78  'fan_branch_code' => 2,
79 
80  'use_bash_pipefail' => 0, # Boolean. When true, the command will be run with "bash -o pipefail -c $cmd". Useful to capture errors in a command that contains pipes
81  };
82 }
83 
84 
85 
86 =head2 run
87 
88  Description : Implements run() interface method of Bio::EnsEMBL::Hive::Process that is used to perform the main bulk of the job (minus input and output).
89 
90  param('column_names'): Controls the column names that come out of the parser: 0 = "no names", 1 = "parse names from data", arrayref = "take names from this array"
91 
92  param('delimiter'): If you set it your lines in file/cmd mode will be split into columns that you can use individually when constructing the input_id_template hash.
93 
94  param('randomize'): Shuffles the rows before creating jobs - can sometimes lead to better overall performance of the pipeline. Doesn't make any sence for minibatches (step>1).
95 
96  param('step'): The requested size of the minibatch (1 by default). The real size of a range may be smaller than the requested size.
97 
98  param('contiguous'): Whether the key_column range of each minibatch should be contiguous (0 by default).
99 
100  param('key_column'): If every line of your input is a list (it happens, for example, when your SQL returns multiple columns or you have set the 'delimiter' in file/cmd mode)
101  this is the way to say which column is undergoing 'ranging'
102 
103 
104  # The following 4 parameters are mutually exclusive and define the source of ids for the jobs:
105 
106  param('inputlist'); The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']
107 
108  param('inputfile'); The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'
109 
110  param('inputquery'); The list is generated by an SQL query (against the production database by default) : 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
111 
112  param('inputcmd'); The list is generated by running a system command: 'inputcmd' => 'find /tmp/big_directory -type f'
113 
114 =cut
115 
116 sub run {
117  my $self = shift @_;
118 
119  my $column_names = $self->param('column_names'); # can be 0 (no names), 1 (names from data) or an arrayref (names from this array)
120  my $delimiter = $self->param('delimiter');
121 
122  my $randomize = $self->param('randomize');
123 
124  # minibatching-related:
125  my $step = $self->param('step');
126  my $contiguous = $self->param('contiguous');
127  my $key_column = $self->param('key_column');
128 
129  my $inputlist = $self->param('inputlist');
130  my $inputfile = $self->param('inputfile');
131  my $inputquery = $self->param('inputquery');
132  my $inputcmd = $self->param('inputcmd');
133 
134  my $parse_column_names = $column_names && (ref($column_names) ne 'ARRAY');
135 
136  my ($rows, $column_names_from_data) =
137  $inputlist ? $self->_get_rows_from_list( $inputlist )
138  : $inputquery ? $self->_get_rows_from_query( $inputquery )
139  : $inputfile ? $self->_get_rows_from_open( $inputfile , '<', $delimiter, $parse_column_names )
140  : $inputcmd ? $self->_get_rows_from_open( ($self->param('use_bash_pipefail') ? 'set -o pipefail; ': '').$inputcmd, '-|', $delimiter, $parse_column_names )
141  : die "range of values should be defined by setting 'inputlist', 'inputquery', 'inputfile' or 'inputcmd'";
142 
143  if( $column_names_from_data # column data is available
144  and ( defined($column_names) ? (ref($column_names) ne 'ARRAY') : 1 ) # and is badly needed
145  ) {
146  $column_names = $column_names_from_data;
147  }
148  # after this point $column_names should either contain a list or be false
149 
150  if( $self->param('input_id') ) {
151  die "'input_id' is no longer supported, please reconfigure as the input_id_template of the dataflow_rule";
152  }
153 
154  if($randomize) {
155  _fisher_yates_shuffle_in_place($rows);
156  }
157 
158  my $output_ids = $step
159  ? $self->_substitute_minibatched_rows($rows, $column_names, $step, $contiguous, $key_column)
160  : $self->_substitute_rows($rows, $column_names);
161 
162  $self->param('output_ids', $output_ids);
163 }
164 
165 
166 =head2 write_output
167 
168  Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
169  Here we rely on the dataflow mechanism to create jobs.
170 
171  param('fan_branch_code'): defines the branch where the fan of jobs is created (2 by default).
172 
173 =cut
174 
175 sub write_output { # nothing to write out, but some dataflow to perform:
176  my $self = shift @_;
177 
178  my $output_ids = $self->param('output_ids');
179  my $fan_branch_code = $self->param('fan_branch_code');
180 
181  # "fan out" into fan_branch_code:
182  $self->dataflow_output_id($output_ids, $fan_branch_code);
183 }
184 
185 
186 ################################### main functionality starts here ###################
187 
188 
189 =head2 _get_rows_from_list
190 
191  Description: a private method that ensures the list is 2D
192 
193 =cut
194 
195 sub _get_rows_from_list {
196  my ($self, $inputlist) = @_;
197 
198  return ref($inputlist->[0])
199  ? $inputlist
200  : [ map { [ $_ ] } @$inputlist ];
201 }
202 
203 
204 =head2 _get_rows_from_query
205 
206  Description: a private method that loads ids from a given sql query
207 
208  param('db_conn'): An optional hash to pass in connection parameters to the database upon which the query will have to be run.
209 
210 =cut
211 
212 sub _get_rows_from_query {
213  my ($self, $inputquery) = @_;
214 
215  $self->say_with_header(qq{inputquery = "$inputquery"});
216  my @rows = ();
217  my $sth = $self->data_dbc()->prepare($inputquery);
218  $sth->execute();
219  my @column_names_from_data = @{$sth->dbi_sth()->{'NAME'}}; # tear it off the original reference to gain some freedom
220 
221  while (my @cols = $sth->fetchrow_array()) {
222  push @rows, \@cols;
223  }
224  $sth->finish();
225 
226  $self->data_dbc()->disconnect_if_idle();
227 
228  return (\@rows, \@column_names_from_data);
229 }
230 
231 
232 =head2 _get_rows_from_open
233 
234  Description: a private method that loads ids from a given file or command pipe
235 
236 =cut
237 
238 sub _get_rows_from_open {
239  my ($self, $input_file_or_command, $open_mode, $delimiter, $parse_header) = @_;
240 
241  $self->say_with_header(qq{input_file_or_command = "$input_file_or_command" [$open_mode]});
242  my @rows = ();
243  open(my $fh, $open_mode, $input_file_or_command) or die "Could not open '$input_file_or_command' because: $!";
244  while(my $line = <$fh>) {
245  chomp $line;
246 
247  push @rows, [ defined($delimiter) ? split(/$delimiter/, $line) : $line ];
248  }
249  close $fh
250  or die "Could not read from $open_mode '$input_file_or_command'. Received the error ".($! || $?);
251 
252  my $column_names_from_data = $parse_header ? shift @rows : 0;
253 
254  return (\@rows, $column_names_from_data);
255 }
256 
257 
258 =head2 _substitute_rows
259 
260  Description: a private method that goes through a list and transforms every row into a hash
261 
262 =cut
263 
264 sub _substitute_rows {
265  my ($self, $rows, $column_names) = @_;
266 
267  my @hashes = ();
268 
269  foreach my $row (@$rows) {
270  my $job_param_hash = $column_names
271  ? { map { ($column_names->[$_] => $row->[$_]) } (0..scalar(@$row)-1) }
272  : { '_' => $row, map { ("_$_" => $row->[$_]) } (0..scalar(@$row)-1) };
273 
274  push @hashes, $job_param_hash;
275  }
276  return \@hashes;
277 }
278 
279 
280 =head2 _substitute_minibatched_rows
281 
282  Description: a private method that minibatches a list and transforms every minibatch using param-substitution
283 
284 =cut
285 
286 sub _substitute_minibatched_rows {
287  my ($self, $rows, $column_names, $step, $contiguous, $key_column) = @_;
288 
289  my @ranges = ();
290 
291  while(@$rows) {
292  my $start_row = shift @$rows;
293  my $range_start = $start_row->[$key_column];
294  my @range_list = ( $range_start );
295 
296  my $range_end = $range_start;
297  my $range_count = 1;
298  my $next_row = $start_row; # safety, in case the internal while doesn't execute even once
299  my $last_row = $next_row; # last row in the current minibatch
300 
301  while($range_count<$step && @$rows) {
302  $next_row = shift @$rows;
303  my $next_value = $next_row->[$key_column];
304 
305  my $predicted_next = $range_end; # ++$predicted_next is used instead of ($range_end+1) as "string increment" to turn 'aa' into 'ab' into 'ac', etc.
306  if(!$contiguous or (++$predicted_next eq $next_value)) {
307 
308  push @range_list, $next_value;
309  $range_end = $next_value;
310  $last_row = $next_row;
311  $range_count++;
312  } else {
313  unshift @$rows, $next_row;
314  last;
315  }
316  }
317 
318  my $job_range = {
319  '_range_start' => $range_start,
320  '_range_end' => $range_end,
321  '_range_count' => $range_count,
322  '_range_list' => \@range_list,
323 
324  $column_names
325  ? map { ('_start_'.$column_names->[$_] => $start_row->[$_], '_end_'.$column_names->[$_] => $last_row->[$_]) } (0..scalar(@$start_row)-1)
326  : map { ("_start_$_" => $start_row->[$_], "_end_$_" => $last_row->[$_]) } (0..scalar(@$start_row)-1)
327  };
328  push @ranges, $job_range;
329  }
330  return \@ranges;
331 }
332 
333 
334 =head2 _fisher_yates_shuffle_in_place
335 
336  Description: a private function (not a method) that shuffles a list of ids
337 
338 =cut
339 
340 sub _fisher_yates_shuffle_in_place {
341  my $array = shift @_;
342 
343  for(my $upper=scalar(@$array);--$upper;) {
344  my $lower=int(rand($upper+1));
345  next if $lower == $upper;
346  @$array[$lower,$upper] = @$array[$upper,$lower];
347  }
348 }
349 
350 1;
Bio::EnsEMBL::Hive::RunnableDB::JobFactory
Definition: JobFactory.pm:35