ensembl-hive  2.6
AnalysisStats.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  An object that maintains counters for jobs in different states. This data is used by the Scheduler.
10 
11 =head1 LICENSE
12 
13  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14  Copyright [2016-2024] EMBL-European Bioinformatics Institute
15 
16  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
17  You may obtain a copy of the License at
18 
19  http://www.apache.org/licenses/LICENSE-2.0
20 
21  Unless required by applicable law or agreed to in writing, software distributed under the License
22  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23  See the License for the specific language governing permissions and limitations under the License.
24 
25 =head1 CONTACT
26 
27  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
28 
29 =head1 APPENDIX
30 
31  The rest of the documentation details each of the object methods.
32  Internal methods are usually preceded with a _
33 
34 =cut
35 
36 
37 package Bio::EnsEMBL::Hive::AnalysisStats;
38 
39 use strict;
40 use warnings;
41 use List::Util 'sum';
42 use POSIX;
43 use Term::ANSIColor;
44 
45 use base ( 'Bio::EnsEMBL::Hive::Storable' );
46 
47 # How to map the job statuses to the counters
48 our %status2counter = ('FAILED' => 'failed_job_count', 'READY' => 'ready_job_count', 'DONE' => 'done_job_count', 'PASSED_ON' => 'done_job_count', 'SEMAPHORED' => 'semaphored_job_count');
49 
50 
51 sub unikey { # override the default from Cacheable parent
52  return [ 'analysis' ];
53 }
54 
55 
56  ## Minimum amount of time in msec that a worker should run before reporting
57  ## back to the hive. This is used when setting the batch_size automatically.
58 sub min_batch_time {
59  return 2*60*1000;
60 }
61 
62 
63 =head1 AUTOLOADED
64 
65  analysis_id / analysis
66 
67 =cut
68 
69 
70 sub dbID {
71  my $self = shift;
72 
73  return $self->analysis_id(@_);
74 }
75 
76 
77 sub status {
78  my $self = shift;
79  $self->{'_status'} = shift if(@_);
80  return $self->{'_status'};
81 }
82 
83 sub is_excluded {
84  my $self = shift;
85  $self->{'_is_excluded'} = shift if (@_);
86  return $self->{'_is_excluded'};
87 }
88 
89 ## counters of jobs in different states:
90 
91 
92 sub total_job_count {
93  my $self = shift;
94  $self->{'_total_job_count'} = shift if(@_);
95  return $self->{'_total_job_count'};
96 }
97 
98 sub semaphored_job_count {
99  my $self = shift;
100  $self->{'_semaphored_job_count'} = shift if(@_);
101  return $self->{'_semaphored_job_count'};
102 }
103 
104 sub ready_job_count {
105  my $self = shift;
106  $self->{'_ready_job_count'} = shift if(@_);
107  return $self->{'_ready_job_count'};
108 }
109 
110 sub done_job_count {
111  my $self = shift;
112  $self->{'_done_job_count'} = shift if(@_);
113  return $self->{'_done_job_count'};
114 }
115 
116 sub failed_job_count {
117  my $self = shift;
118  $self->{'_failed_job_count'} = shift if(@_);
119  $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
120  return $self->{'_failed_job_count'};
121 }
122 
123 sub num_running_workers {
124  my $self = shift;
125  $self->{'_num_running_workers'} = shift if(@_);
126  return $self->{'_num_running_workers'};
127 }
128 
129 
130 ## runtime stats:
131 
132 
133 sub avg_msec_per_job {
134  my $self = shift;
135  $self->{'_avg_msec_per_job'} = shift if(@_);
136  $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
137  return $self->{'_avg_msec_per_job'};
138 }
139 
140 sub avg_input_msec_per_job {
141  my $self = shift;
142  $self->{'_avg_input_msec_per_job'} = shift if(@_);
143  $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
144  return $self->{'_avg_input_msec_per_job'};
145 }
146 
147 sub avg_run_msec_per_job {
148  my $self = shift;
149  $self->{'_avg_run_msec_per_job'} = shift if(@_);
150  $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
151  return $self->{'_avg_run_msec_per_job'};
152 }
153 
154 sub avg_output_msec_per_job {
155  my $self = shift;
156  $self->{'_avg_output_msec_per_job'} = shift if(@_);
157  $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
158  return $self->{'_avg_output_msec_per_job'};
159 }
160 
161 
162 ## other storable attributes:
163 
164 sub when_updated { # this method is called by the initial store() [at which point it returns undef]
165  my $self = shift;
166  $self->{'_when_updated'} = shift if(@_);
167  return $self->{'_when_updated'};
168 }
169 
170 sub seconds_since_when_updated { # we fetch the server difference, store local time in the memory object, and use the local difference
171  my( $self, $value ) = @_;
172  $self->{'_when_updated'} = time() - $value if(defined($value));
173  return defined($self->{'_when_updated'}) ? time() - $self->{'_when_updated'} : undef;
174 }
175 
176 sub seconds_since_last_fetch { # track the freshness of the object (store local time, use the local difference)
177  my( $self, $value ) = @_;
178  $self->{'_last_fetch'} = time() - $value if(defined($value));
179  return defined($self->{'_last_fetch'}) ? time() - $self->{'_last_fetch'} : undef;
180 }
181 
182 sub sync_lock {
183  my $self = shift;
184  $self->{'_sync_lock'} = shift if(@_);
185  return $self->{'_sync_lock'};
186 }
187 
188 
189 # non-storable attributes and other helper-methods:
190 
191 
192 sub refresh {
193  my ($self, $seconds_fresh) = @_;
194  my $seconds_since_last_fetch = $self->seconds_since_last_fetch;
195 
196  if( $self->adaptor
197  and (!defined($seconds_fresh) or !defined($seconds_since_last_fetch) or $seconds_fresh < $seconds_since_last_fetch) ) {
198  return $self->adaptor->refresh($self);
199  }
200 }
201 
202 
203 sub update {
204  my $self = shift;
205 
206  if($self->adaptor) {
207  $self->adaptor->update_stats_and_monitor($self);
208  }
209 }
210 
211 
212 # Only used by workers
213 sub get_or_estimate_batch_size {
214  my $self = shift @_;
215  my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
216 
217  my $batch_size = $self->analysis->batch_size;
218 
219  if( $batch_size > 0 ) { # set to positive or not set (and auto-initialized within $self->batch_size)
220 
221  # otherwise it is a request for dynamic estimation:
222  } elsif( my $avg_msec_per_job = $self->avg_msec_per_job ) { # further estimations from collected stats
223 
224  $avg_msec_per_job = 100 if($avg_msec_per_job<100);
225 
226  $batch_size = POSIX::ceil( $self->min_batch_time / $avg_msec_per_job );
227 
228  } else { # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
229  $batch_size = -$batch_size || 1;
230  }
231 
232  # TailTrimming correction aims at meeting the requirement half way:
233  if( my $num_of_workers = POSIX::ceil( ($self->num_running_workers + $self->estimate_num_required_workers($remaining_job_count))/2 ) ) {
234 
235  my $jobs_to_do = $self->ready_job_count + $remaining_job_count;
236 
237  my $tt_batch_size = POSIX::floor( $jobs_to_do / $num_of_workers );
238  if( (0 < $tt_batch_size) && ($tt_batch_size < $batch_size) ) {
239  # More jobs to do than workers and default batch size too large
240  $batch_size = $tt_batch_size;
241  } elsif(!$tt_batch_size) {
242  # Fewer jobs than workers
243  $batch_size = POSIX::ceil( $jobs_to_do / $num_of_workers ); # essentially, 0 or 1
244  }
245  }
246 
247 
248  return $batch_size;
249 }
250 
251 
252 sub estimate_num_required_workers { # this 'max allowed' total includes the ones that are currently running
253  my $self = shift @_;
254  my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
255 
256  my $num_required_workers = $self->ready_job_count + $remaining_job_count; # this 'max' estimation can still be zero
257 
258  my $h_cap = $self->analysis->hive_capacity;
259  if( defined($h_cap) and $h_cap>=0) { # what is the currently attainable maximum defined via hive_capacity?
260  my $hive_current_load = $self->hive_pipeline->get_cached_hive_current_load();
261  my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
262  if($h_max < $num_required_workers) {
263  $num_required_workers = $h_max;
264  }
265  }
266  my $a_max = $self->analysis->analysis_capacity;
267  if( defined($a_max) and $a_max>=0 ) { # what is the currently attainable maximum defined via analysis_capacity?
268  if($a_max < $num_required_workers) {
269  $num_required_workers = $a_max;
270  }
271  }
272 
273  return $num_required_workers;
274 }
275 
276 
277 sub inprogress_job_count { # includes CLAIMED
278  my $self = shift;
279  return $self->total_job_count
280  - $self->semaphored_job_count
281  - $self->ready_job_count
282  - $self->done_job_count
283  - $self->failed_job_count;
284 }
285 
286 
287 ##---------------------------- [stringification] -----------------------------
288 
289 my %meta_status_2_color = (
290  'DONE' => 'bright_cyan',
291  'RUNNING' => 'bright_yellow',
292  'READY' => 'bright_green',
293  'BLOCKED' => 'black on_white',
294  'EMPTY' => 'clear',
295  'FAILED' => 'red',
296 );
297 
298 # "Support for colors 8 through 15 (the bright_ variants) was added in
299 # Term::ANSIColor 3.00, included in Perl 5.13.3."
300 # http://perldoc.perl.org/Term/ANSIColor.html#COMPATIBILITY
301 if ($Term::ANSIColor::VERSION < '3.00') {
302  foreach my $s (keys %meta_status_2_color) {
303  my $c = $meta_status_2_color{$s};
304  $c =~ s/bright_//;
305  $meta_status_2_color{$s} = $c;
306  }
307 }
308 
309 my %analysis_status_2_meta_status = (
310  'LOADING' => 'READY',
311  'SYNCHING' => 'READY',
312  'ALL_CLAIMED' => 'BLOCKED',
313  'EXCLUDED' => 'FAILED',
314  'WORKING' => 'RUNNING',
315 );
316 
317 my %count_method_2_meta_status = (
318  'semaphored_job_count' => 'BLOCKED',
319  'ready_job_count' => 'READY',
320  'inprogress_job_count' => 'RUNNING',
321  'done_job_count' => 'DONE',
322  'failed_job_count' => 'FAILED',
323 );
324 
325 sub _text_with_status_color {
326  my $color_enabled = shift;
327 
328  return ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color('reset') : $_[0]);
329 }
330 
331 
332 sub job_count_breakout {
333  my $self = shift;
334  my $color_enabled = shift;
335 
336  my @count_list = ();
337  my %count_hash = ();
338  my $total_job_count = $self->total_job_count();
339  foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
340  if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
341  push @count_list, _text_with_status_color($color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
342  }
343  }
344  my $breakout_label = join('+', @count_list);
345  $breakout_label .= '='.$total_job_count if(scalar(@count_list)>1); # only provide a total if multiple categories available
346  $breakout_label = '0' if(scalar(@count_list)==0);
347 
348  return ($breakout_label, $total_job_count, \%count_hash);
349 }
350 
351 sub friendly_avg_job_runtime {
352  my $self = shift;
353 
354  my $avg = $self->avg_msec_per_job;
355  my @units = ([24*3600*1000, 'day'], [3600*1000, 'hr'], [60*1000, 'min'], [1000, 'sec']);
356 
357  while (my $unit_description = shift @units) {
358  my $x = $avg / $unit_description->[0];
359  if ($x >= 1.) {
360  return ($x, $unit_description->[1]);
361  }
362  }
363  return ($avg, 'ms');
364 }
365 
366 
367 # Very simple interpolation that doesn't need to align the fields
368 sub toString {
369  my $self = shift @_;
370 
371  my $fields = $self->_toString_fields;
372  my $s = $self->_toString_template;
373  # Replace each named field with its value
374  $s =~ s/%\((-?)([a-zA-Z_]\w*)\)/$fields->{$2}/ge;
375  return $s;
376 }
377 
378 sub _toString_template {
379  my $self = shift @_;
380 
381  return q{%(-logic_name)(%(analysis_id)) %(status), %(breakout_label) jobs, avg: %(avg_runtime)%(avg_runtime_unit) %(num_running_workers) worker%(worker_plural) (%(num_estimated_workers) required), h.cap:%(hive_capacity) a.cap:%(analysis_capacity) (sync'd %(seconds_since_when_updated) sec ago)};
382 }
383 
384 
385 sub _toString_fields {
386  my $self = shift @_;
387 
388  my $can_do_colour = (-t STDOUT ? 1 : 0);
389  my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout($can_do_colour);
390  my $analysis = $self->analysis;
391  my ($avg_runtime, $avg_runtime_unit) = $self->friendly_avg_job_runtime;
392  my $status_text = $self->status;
393  if ($self->is_excluded) {
394  $status_text = 'EXCLUDED';
395  }
396 
397  return {
398  'logic_name' => $analysis->logic_name,
399  'analysis_id' => $self->analysis_id // 0,
400  'status' => _text_with_status_color($can_do_colour, $status_text, $analysis_status_2_meta_status{$status_text} || $status_text),
401  'breakout_label' => $breakout_label,
402  $avg_runtime_unit ? ( ## With trailing characters to have everything look nicely aligned
403  'avg_runtime' => sprintf('%.1f ', $avg_runtime), # Notice the trailing space
404  'avg_runtime_unit' => $avg_runtime_unit . ',', # Notice the trailing comma
405  ) : (
406  'avg_runtime' => 'N/A,', # Notice the trailing commma
407  'avg_runtime_unit' => '',
408  ),
409  'num_running_workers' => $self->num_running_workers,
410  'worker_plural' => $self->num_running_workers != 1 ? 's' : ' ',
411  'num_estimated_workers' => $self->estimate_num_required_workers,
412  'hive_capacity' => $analysis->hive_capacity // '-',
413  'analysis_capacity' => $analysis->analysis_capacity // '-',
414  'seconds_since_when_updated' => $self->seconds_since_when_updated // 0,
415  };
416 }
417 
418 
419 ##------------------------- [status synchronization] --------------------------
420 
421 
422 sub check_blocking_control_rules {
423  my ($self, $no_die) = @_;
424 
425  my $ctrl_rules = $self->analysis->control_rules_collection();
426 
427  my $all_conditions_satisfied = 1;
428 
429  if(scalar @$ctrl_rules) { # there are blocking ctrl_rules to check
430 
431  foreach my $ctrl_rule (@$ctrl_rules) {
432 
433  my $condition_analysis = $ctrl_rule->condition_analysis(undef, $no_die);
434  unless ($condition_analysis) {
435  $all_conditions_satisfied = 0;
436  last
437  }
438 
439  my $condition_stats = $condition_analysis->stats;
440  unless ($condition_stats) {
441  $all_conditions_satisfied = 0;
442  last
443  }
444 
445  # Make sure we use fresh properties of the AnalysisStats object
446  # (especially relevant in the case of foreign pipelines, since
447  # local objects are periodically refreshed)
448  $condition_stats->refresh();
449 
450  my $condition_status = $condition_stats->status;
451  my $condition_cbe = $condition_analysis->can_be_empty;
452  my $condition_tjc = $condition_stats->total_job_count;
453 
454  my $this_condition_satisfied = ($condition_status eq 'DONE')
455  || ($condition_cbe && !$condition_tjc); # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
456 
457  unless( $this_condition_satisfied ) {
458  $all_conditions_satisfied = 0;
459  }
460  }
461 
462  if($all_conditions_satisfied) {
463  if($self->status eq 'BLOCKED') { # unblock, since all conditions are met
464  $self->status('LOADING'); # anything that is not 'BLOCKED' will do, it will be redefined in the following subroutine
465  }
466  } else { # (re)block
467  $self->status('BLOCKED');
468  }
469  }
470 
471  return $all_conditions_satisfied;
472 }
473 
474 
475 sub determine_status {
476  my $self = shift;
477 
478  if($self->status ne 'BLOCKED') {
479  if( !$self->total_job_count ) {
480 
481  $self->status('EMPTY');
482 
483  } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) { # all jobs of the analysis have been finished
484  my $analysis = $self->analysis;
485  my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
486  if ($self->failed_job_count > $absolute_tolerance) {
487  $self->status('FAILED');
488  } else {
489  $self->status('DONE');
490  }
491  } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running
492 
493  $self->status('READY');
494 
495  } elsif( !$self->ready_job_count ) { # there are no claimable jobs, possibly because some are semaphored
496 
497  $self->status('ALL_CLAIMED');
498 
499  } elsif( $self->inprogress_job_count ) {
500 
501  $self->status('WORKING');
502  }
503  }
504 }
505 
506 
507 sub recalculate_from_job_counts {
508  my ($self, $job_counts) = @_;
509 
510  # only update job_counts if given the hash:
511  if($job_counts) {
512  foreach my $counter ('semaphored_job_count', 'ready_job_count', 'failed_job_count', 'done_job_count') {
513  my $value = sum( map {$job_counts->{$_} // 0} grep {$status2counter{$_} eq $counter} keys %status2counter);
514  $self->$counter( $value );
515  }
516  $self->total_job_count( sum( values %$job_counts ) || 0 );
517  }
518 
519  $self->check_blocking_control_rules();
520 
521  $self->determine_status();
522 }
523 
524 
525 1;
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::AnalysisStats
Definition: AnalysisStats.pm:12
Bio::EnsEMBL::Hive::Cacheable
Definition: Cacheable.pm:6
Bio::EnsEMBL::Hive
Definition: Hive.pm:38