ensembl-hive  2.5
AnalysisStats.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  An object that maintains counters for jobs in different states. This data is used by the Scheduler.
10 
11 =head1 LICENSE
12 
13  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14  Copyright [2016-2022] EMBL-European Bioinformatics Institute
15 
16  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
17  You may obtain a copy of the License at
18 
19  http://www.apache.org/licenses/LICENSE-2.0
20 
21  Unless required by applicable law or agreed to in writing, software distributed under the License
22  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23  See the License for the specific language governing permissions and limitations under the License.
24 
25 =head1 CONTACT
26 
27  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
28 
29 =head1 APPENDIX
30 
31  The rest of the documentation details each of the object methods.
32  Internal methods are usually preceded with a _
33 
34 =cut
35 
36 
37 package Bio::EnsEMBL::Hive::AnalysisStats;
38 
39 use strict;
40 use warnings;
41 use List::Util 'sum';
42 use POSIX;
43 use Term::ANSIColor;
44 
45 use base ( 'Bio::EnsEMBL::Hive::Storable' );
46 
47 
48 sub unikey { # override the default from Cacheable parent
49  return [ 'analysis' ];
50 }
51 
52 
53  ## Minimum amount of time in msec that a worker should run before reporting
54  ## back to the hive. This is used when setting the batch_size automatically.
55 sub min_batch_time {
56  return 2*60*1000;
57 }
58 
59 
60 =head1 AUTOLOADED
61 
62  analysis_id / analysis
63 
64 =cut
65 
66 
67 sub dbID {
68  my $self = shift;
69 
70  return $self->analysis_id(@_);
71 }
72 
73 
74 sub status {
75  my $self = shift;
76  $self->{'_status'} = shift if(@_);
77  return $self->{'_status'};
78 }
79 
80 sub is_excluded {
81  my $self = shift;
82  $self->{'_is_excluded'} = shift if (@_);
83  return $self->{'_is_excluded'};
84 }
85 
86 ## counters of jobs in different states:
87 
88 
89 sub total_job_count {
90  my $self = shift;
91  $self->{'_total_job_count'} = shift if(@_);
92  return $self->{'_total_job_count'};
93 }
94 
95 sub semaphored_job_count {
96  my $self = shift;
97  $self->{'_semaphored_job_count'} = shift if(@_);
98  return $self->{'_semaphored_job_count'};
99 }
100 
101 sub ready_job_count {
102  my $self = shift;
103  $self->{'_ready_job_count'} = shift if(@_);
104  return $self->{'_ready_job_count'};
105 }
106 
107 sub done_job_count {
108  my $self = shift;
109  $self->{'_done_job_count'} = shift if(@_);
110  return $self->{'_done_job_count'};
111 }
112 
113 sub failed_job_count {
114  my $self = shift;
115  $self->{'_failed_job_count'} = shift if(@_);
116  $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
117  return $self->{'_failed_job_count'};
118 }
119 
120 sub num_running_workers {
121  my $self = shift;
122  $self->{'_num_running_workers'} = shift if(@_);
123  return $self->{'_num_running_workers'};
124 }
125 
126 
127 ## runtime stats:
128 
129 
130 sub avg_msec_per_job {
131  my $self = shift;
132  $self->{'_avg_msec_per_job'} = shift if(@_);
133  $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
134  return $self->{'_avg_msec_per_job'};
135 }
136 
137 sub avg_input_msec_per_job {
138  my $self = shift;
139  $self->{'_avg_input_msec_per_job'} = shift if(@_);
140  $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
141  return $self->{'_avg_input_msec_per_job'};
142 }
143 
144 sub avg_run_msec_per_job {
145  my $self = shift;
146  $self->{'_avg_run_msec_per_job'} = shift if(@_);
147  $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
148  return $self->{'_avg_run_msec_per_job'};
149 }
150 
151 sub avg_output_msec_per_job {
152  my $self = shift;
153  $self->{'_avg_output_msec_per_job'} = shift if(@_);
154  $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
155  return $self->{'_avg_output_msec_per_job'};
156 }
157 
158 
159 ## other storable attributes:
160 
161 sub when_updated { # this method is called by the initial store() [at which point it returns undef]
162  my $self = shift;
163  $self->{'_when_updated'} = shift if(@_);
164  return $self->{'_when_updated'};
165 }
166 
167 sub seconds_since_when_updated { # we fetch the server difference, store local time in the memory object, and use the local difference
168  my( $self, $value ) = @_;
169  $self->{'_when_updated'} = time() - $value if(defined($value));
170  return defined($self->{'_when_updated'}) ? time() - $self->{'_when_updated'} : undef;
171 }
172 
173 sub seconds_since_last_fetch { # track the freshness of the object (store local time, use the local difference)
174  my( $self, $value ) = @_;
175  $self->{'_last_fetch'} = time() - $value if(defined($value));
176  return defined($self->{'_last_fetch'}) ? time() - $self->{'_last_fetch'} : undef;
177 }
178 
179 sub sync_lock {
180  my $self = shift;
181  $self->{'_sync_lock'} = shift if(@_);
182  return $self->{'_sync_lock'};
183 }
184 
185 
186 # non-storable attributes and other helper-methods:
187 
188 
189 sub refresh {
190  my ($self, $seconds_fresh) = @_;
191  my $seconds_since_last_fetch = $self->seconds_since_last_fetch;
192 
193  if( $self->adaptor
194  and (!defined($seconds_fresh) or !defined($seconds_since_last_fetch) or $seconds_fresh < $seconds_since_last_fetch) ) {
195  return $self->adaptor->refresh($self);
196  }
197 }
198 
199 
200 sub update {
201  my $self = shift;
202 
203  if($self->adaptor) {
204  $self->adaptor->update_stats_and_monitor($self);
205  }
206 }
207 
208 
209 sub get_or_estimate_batch_size {
210  my $self = shift @_;
211  my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
212 
213  my $batch_size = $self->analysis->batch_size;
214 
215  if( $batch_size > 0 ) { # set to positive or not set (and auto-initialized within $self->batch_size)
216 
217  # otherwise it is a request for dynamic estimation:
218  } elsif( my $avg_msec_per_job = $self->avg_msec_per_job ) { # further estimations from collected stats
219 
220  $avg_msec_per_job = 100 if($avg_msec_per_job<100);
221 
222  $batch_size = POSIX::ceil( $self->min_batch_time / $avg_msec_per_job );
223 
224  } else { # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
225  $batch_size = -$batch_size || 1;
226  }
227 
228  # TailTrimming correction aims at meeting the requirement half way:
229  if( my $num_of_workers = POSIX::ceil( ($self->num_running_workers + $self->estimate_num_required_workers($remaining_job_count))/2 ) ) {
230 
231  my $jobs_to_do = $self->ready_job_count + $remaining_job_count;
232 
233  my $tt_batch_size = POSIX::floor( $jobs_to_do / $num_of_workers );
234  if( (0 < $tt_batch_size) && ($tt_batch_size < $batch_size) ) {
235  $batch_size = $tt_batch_size;
236  } elsif(!$tt_batch_size) {
237  $batch_size = POSIX::ceil( $jobs_to_do / $num_of_workers ); # essentially, 0 or 1
238  }
239  }
240 
241 
242  return $batch_size;
243 }
244 
245 
246 sub estimate_num_required_workers { # this 'max allowed' total includes the ones that are currently running
247  my $self = shift @_;
248  my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
249 
250  my $num_required_workers = $self->ready_job_count + $remaining_job_count; # this 'max' estimation can still be zero
251 
252  my $h_cap = $self->analysis->hive_capacity;
253  if( defined($h_cap) and $h_cap>=0) { # what is the currently attainable maximum defined via hive_capacity?
254  my $hive_current_load = $self->hive_pipeline->get_cached_hive_current_load();
255  my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
256  if($h_max < $num_required_workers) {
257  $num_required_workers = $h_max;
258  }
259  }
260  my $a_max = $self->analysis->analysis_capacity;
261  if( defined($a_max) and $a_max>=0 ) { # what is the currently attainable maximum defined via analysis_capacity?
262  if($a_max < $num_required_workers) {
263  $num_required_workers = $a_max;
264  }
265  }
266 
267  return $num_required_workers;
268 }
269 
270 
271 sub inprogress_job_count { # includes CLAIMED
272  my $self = shift;
273  return $self->total_job_count
274  - $self->semaphored_job_count
275  - $self->ready_job_count
276  - $self->done_job_count
277  - $self->failed_job_count;
278 }
279 
280 my %meta_status_2_color = (
281  'DONE' => 'bright_cyan',
282  'RUNNING' => 'bright_yellow',
283  'READY' => 'bright_green',
284  'BLOCKED' => 'black on_white',
285  'EMPTY' => 'clear',
286  'FAILED' => 'red',
287 );
288 
289 # "Support for colors 8 through 15 (the bright_ variants) was added in
290 # Term::ANSIColor 3.00, included in Perl 5.13.3."
291 # http://perldoc.perl.org/Term/ANSIColor.html#COMPATIBILITY
292 if ($Term::ANSIColor::VERSION < '3.00') {
293  foreach my $s (keys %meta_status_2_color) {
294  my $c = $meta_status_2_color{$s};
295  $c =~ s/bright_//;
296  $meta_status_2_color{$s} = $c;
297  }
298 }
299 
300 my %analysis_status_2_meta_status = (
301  'LOADING' => 'READY',
302  'SYNCHING' => 'READY',
303  'ALL_CLAIMED' => 'BLOCKED',
304  'EXCLUDED' => 'FAILED',
305  'WORKING' => 'RUNNING',
306 );
307 
308 my %count_method_2_meta_status = (
309  'semaphored_job_count' => 'BLOCKED',
310  'ready_job_count' => 'READY',
311  'inprogress_job_count' => 'RUNNING',
312  'done_job_count' => 'DONE',
313  'failed_job_count' => 'FAILED',
314 );
315 
316 sub _text_with_status_color {
317  my $field_size = shift;
318  my $color_enabled = shift;
319 
320  my $padding = ($field_size and length($_[0]) < $field_size) ? ' ' x ($field_size - length($_[0])) : '';
321  return $padding . ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color('reset') : $_[0]);
322 }
323 
324 
325 sub job_count_breakout {
326  my $self = shift;
327  my $field_size = shift;
328  my $color_enabled = shift;
329 
330  my $this_length = 0;
331  my @count_list = ();
332  my %count_hash = ();
333  my $total_job_count = $self->total_job_count();
334  foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
335  if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
336  $this_length += length("$count") + 1;
337  push @count_list, _text_with_status_color(undef, $color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
338  }
339  }
340  my $breakout_label = join('+', @count_list);
341  $this_length += scalar(@count_list)-1 if @count_list;
342  $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
343  $this_length += 1+length("$total_job_count") if(scalar(@count_list)!=1);
344 
345  $breakout_label = ' ' x ($field_size - $this_length) . $breakout_label if $field_size and $this_length<$field_size;
346 
347  return ($breakout_label, $total_job_count, \%count_hash);
348 }
349 
350 sub friendly_avg_job_runtime {
351  my $self = shift;
352 
353  my $avg = $self->avg_msec_per_job;
354  my @units = ([24*3600*1000, 'day'], [3600*1000, 'hr'], [60*1000, 'min'], [1000, 'sec']);
355 
356  while (my $unit_description = shift @units) {
357  my $x = $avg / $unit_description->[0];
358  if ($x >= 1.) {
359  return ($x, $unit_description->[1]);
360  }
361  }
362  return ($avg, 'ms');
363 }
364 
365 sub toString {
366  my $self = shift @_;
367 
368  my $can_do_colour = (-t STDOUT ? 1 : 0);
369  my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout(24, $can_do_colour);
370  my $analysis = $self->analysis;
371  my ($avg_runtime, $avg_runtime_unit) = $self->friendly_avg_job_runtime;
372  my $max_logic_name_length = shift @_ || length($analysis->logic_name);
373  my $status_text = $self->status;
374  if ($self->is_excluded) {
375  $status_text = 'EXCLUDED';
376  }
377 
378  my $output .= sprintf("%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Est.Required:%d) ",
379  $analysis->logic_name,
380  $self->analysis_id // 0,
381 
382  _text_with_status_color(11, $can_do_colour, $status_text, $analysis_status_2_meta_status{$status_text} || $status_text),
383 
384  $breakout_label,
385 
386  $avg_runtime, $avg_runtime_unit,
387 
388  $self->num_running_workers,
389  $self->estimate_num_required_workers,
390  );
391  $output .= ' h.cap:' .( $analysis->hive_capacity // '-' )
392  .' a.cap:' .( $analysis->analysis_capacity // '-')
393  ." (sync'd " .($self->seconds_since_when_updated // 0)." sec ago)";
394 
395  return $output;
396 }
397 
398 
399 sub check_blocking_control_rules {
400  my ($self, $no_die) = @_;
401 
402  my $ctrl_rules = $self->analysis->control_rules_collection();
403 
404  my $all_conditions_satisfied = 1;
405 
406  if(scalar @$ctrl_rules) { # there are blocking ctrl_rules to check
407 
408  foreach my $ctrl_rule (@$ctrl_rules) {
409 
410  my $condition_analysis = $ctrl_rule->condition_analysis(undef, $no_die);
411  unless ($condition_analysis) {
412  $all_conditions_satisfied = 0;
413  last
414  }
415 
416  my $condition_stats = $condition_analysis->stats;
417  unless ($condition_stats) {
418  $all_conditions_satisfied = 0;
419  last
420  }
421 
422  # Make sure we use fresh properties of the AnalysisStats object
423  # (especially relevant in the case of foreign pipelines, since
424  # local objects are periodically refreshed)
425  $condition_stats->refresh();
426 
427  my $condition_status = $condition_stats->status;
428  my $condition_cbe = $condition_analysis->can_be_empty;
429  my $condition_tjc = $condition_stats->total_job_count;
430 
431  my $this_condition_satisfied = ($condition_status eq 'DONE')
432  || ($condition_cbe && !$condition_tjc); # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
433 
434  unless( $this_condition_satisfied ) {
435  $all_conditions_satisfied = 0;
436  }
437  }
438 
439  if($all_conditions_satisfied) {
440  if($self->status eq 'BLOCKED') { # unblock, since all conditions are met
441  $self->status('LOADING'); # anything that is not 'BLOCKED' will do, it will be redefined in the following subroutine
442  }
443  } else { # (re)block
444  $self->status('BLOCKED');
445  }
446  }
447 
448  return $all_conditions_satisfied;
449 }
450 
451 
452 sub determine_status {
453  my $self = shift;
454 
455  if($self->status ne 'BLOCKED') {
456  if( !$self->total_job_count ) {
457 
458  $self->status('EMPTY');
459 
460  } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) { # all jobs of the analysis have been finished
461  my $analysis = $self->analysis;
462  my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
463  if ($self->failed_job_count > $absolute_tolerance) {
464  $self->status('FAILED');
465  } else {
466  $self->status('DONE');
467  }
468  } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running
469 
470  $self->status('READY');
471 
472  } elsif( !$self->ready_job_count ) { # there are no claimable jobs, possibly because some are semaphored
473 
474  $self->status('ALL_CLAIMED');
475 
476  } elsif( $self->inprogress_job_count ) {
477 
478  $self->status('WORKING');
479  }
480  }
481 }
482 
483 
484 sub recalculate_from_job_counts {
485  my ($self, $job_counts) = @_;
486 
487  # only update job_counts if given the hash:
488  if($job_counts) {
489  $self->semaphored_job_count( $job_counts->{'SEMAPHORED'} || 0 );
490  $self->ready_job_count( $job_counts->{'READY'} || 0 );
491  $self->failed_job_count( $job_counts->{'FAILED'} || 0 );
492  $self->done_job_count( ( $job_counts->{'DONE'} // 0 ) + ($job_counts->{'PASSED_ON'} // 0 ) ); # done here or potentially done elsewhere
493  $self->total_job_count( sum( values %$job_counts ) || 0 );
494  }
495 
496  $self->check_blocking_control_rules();
497 
498  $self->determine_status();
499 }
500 
501 
502 1;