9 An
object that maintains counters
for jobs in different states. This data is used by the Scheduler.
13 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14 Copyright [2016-2024] EMBL-European Bioinformatics Institute
16 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
17 You may obtain a copy of the License at
21 Unless required by applicable law or agreed to in writing, software distributed under the License
22 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 See the License
for the specific language governing permissions and limitations under the License.
27 Please subscribe to the
Hive mailing list: http:
31 The rest of the documentation details each of the
object methods.
32 Internal methods are usually preceded with a _
37 package Bio::EnsEMBL::Hive::AnalysisStats;
45 use base (
'Bio::EnsEMBL::Hive::Storable' );
47 # How to map the job statuses to the counters
48 our %status2counter = (
'FAILED' =>
'failed_job_count',
'READY' =>
'ready_job_count',
'DONE' =>
'done_job_count',
'PASSED_ON' =>
'done_job_count',
'SEMAPHORED' =>
'semaphored_job_count');
51 sub unikey { #
override the
default from
Cacheable parent
52 return [
'analysis' ];
56 ## Minimum amount of time in msec that a worker should run before reporting
57 ## back to the hive. This is used when setting the batch_size automatically.
65 analysis_id / analysis
73 return $self->analysis_id(@_);
79 $self->{
'_status'} = shift
if(@_);
80 return $self->{
'_status'};
85 $self->{
'_is_excluded'} = shift
if (@_);
86 return $self->{
'_is_excluded'};
89 ## counters of jobs in different states:
94 $self->{
'_total_job_count'} = shift
if(@_);
95 return $self->{
'_total_job_count'};
98 sub semaphored_job_count {
100 $self->{
'_semaphored_job_count'} = shift
if(@_);
101 return $self->{
'_semaphored_job_count'};
104 sub ready_job_count {
106 $self->{
'_ready_job_count'} = shift
if(@_);
107 return $self->{
'_ready_job_count'};
112 $self->{
'_done_job_count'} = shift
if(@_);
113 return $self->{
'_done_job_count'};
116 sub failed_job_count {
118 $self->{
'_failed_job_count'} = shift
if(@_);
119 $self->{
'_failed_job_count'} = 0 unless(defined($self->{
'_failed_job_count'}));
120 return $self->{
'_failed_job_count'};
123 sub num_running_workers {
125 $self->{
'_num_running_workers'} = shift
if(@_);
126 return $self->{
'_num_running_workers'};
133 sub avg_msec_per_job {
135 $self->{
'_avg_msec_per_job'} = shift
if(@_);
136 $self->{
'_avg_msec_per_job'}=0 unless($self->{
'_avg_msec_per_job'});
137 return $self->{
'_avg_msec_per_job'};
140 sub avg_input_msec_per_job {
142 $self->{
'_avg_input_msec_per_job'} = shift
if(@_);
143 $self->{
'_avg_input_msec_per_job'}=0 unless($self->{
'_avg_input_msec_per_job'});
144 return $self->{
'_avg_input_msec_per_job'};
147 sub avg_run_msec_per_job {
149 $self->{
'_avg_run_msec_per_job'} = shift
if(@_);
150 $self->{
'_avg_run_msec_per_job'}=0 unless($self->{
'_avg_run_msec_per_job'});
151 return $self->{
'_avg_run_msec_per_job'};
154 sub avg_output_msec_per_job {
156 $self->{
'_avg_output_msec_per_job'} = shift
if(@_);
157 $self->{
'_avg_output_msec_per_job'}=0 unless($self->{
'_avg_output_msec_per_job'});
158 return $self->{
'_avg_output_msec_per_job'};
162 ## other storable attributes:
164 sub when_updated { #
this method is called by the initial store() [at which point it returns undef]
166 $self->{
'_when_updated'} = shift
if(@_);
167 return $self->{
'_when_updated'};
170 sub seconds_since_when_updated { # we fetch the server difference, store local time in the memory object, and use the local difference
171 my( $self, $value ) = @_;
172 $self->{
'_when_updated'} = time() - $value
if(defined($value));
173 return defined($self->{
'_when_updated'}) ? time() - $self->{
'_when_updated'} : undef;
176 sub seconds_since_last_fetch { # track the freshness of the object (store local time, use the local difference)
177 my( $self, $value ) = @_;
178 $self->{
'_last_fetch'} = time() - $value
if(defined($value));
179 return defined($self->{
'_last_fetch'}) ? time() - $self->{
'_last_fetch'} : undef;
184 $self->{
'_sync_lock'} = shift
if(@_);
185 return $self->{
'_sync_lock'};
189 # non-storable attributes and other helper-methods:
193 my ($self, $seconds_fresh) = @_;
194 my $seconds_since_last_fetch = $self->seconds_since_last_fetch;
197 and (!defined($seconds_fresh) or !defined($seconds_since_last_fetch) or $seconds_fresh < $seconds_since_last_fetch) ) {
198 return $self->adaptor->refresh($self);
207 $self->adaptor->update_stats_and_monitor($self);
212 # Only used by workers
213 sub get_or_estimate_batch_size {
215 my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
217 my $batch_size = $self->analysis->batch_size;
219 if( $batch_size > 0 ) { # set to positive or not set (and
auto-initialized within $self->batch_size)
221 # otherwise it is a request for dynamic estimation:
222 } elsif( my $avg_msec_per_job = $self->avg_msec_per_job ) { # further estimations from collected stats
224 $avg_msec_per_job = 100
if($avg_msec_per_job<100);
226 $batch_size = POSIX::ceil( $self->min_batch_time / $avg_msec_per_job );
228 }
else { # first estimation when no stats are available (take -$batch_size as first guess,
if not zero)
229 $batch_size = -$batch_size || 1;
232 # TailTrimming correction aims at meeting the requirement half way:
233 if( my $num_of_workers = POSIX::ceil( ($self->num_running_workers + $self->estimate_num_required_workers($remaining_job_count))/2 ) ) {
235 my $jobs_to_do = $self->ready_job_count + $remaining_job_count;
237 my $tt_batch_size = POSIX::floor( $jobs_to_do / $num_of_workers );
238 if( (0 < $tt_batch_size) && ($tt_batch_size < $batch_size) ) {
239 # More jobs to do than workers and default batch size too large
240 $batch_size = $tt_batch_size;
241 } elsif(!$tt_batch_size) {
242 # Fewer jobs than workers
243 $batch_size = POSIX::ceil( $jobs_to_do / $num_of_workers ); # essentially, 0 or 1
252 sub estimate_num_required_workers { #
this 'max allowed' total includes the ones that are currently running
254 my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
256 my $num_required_workers = $self->ready_job_count + $remaining_job_count; #
this 'max' estimation can still be zero
258 my $h_cap = $self->analysis->hive_capacity;
259 if( defined($h_cap) and $h_cap>=0) { # what is the currently attainable maximum defined via hive_capacity?
260 my $hive_current_load = $self->hive_pipeline->get_cached_hive_current_load();
261 my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
262 if($h_max < $num_required_workers) {
263 $num_required_workers = $h_max;
266 my $a_max = $self->analysis->analysis_capacity;
267 if( defined($a_max) and $a_max>=0 ) { # what is the currently attainable maximum defined via analysis_capacity?
268 if($a_max < $num_required_workers) {
269 $num_required_workers = $a_max;
273 return $num_required_workers;
277 sub inprogress_job_count { # includes CLAIMED
279 return $self->total_job_count
280 - $self->semaphored_job_count
281 - $self->ready_job_count
282 - $self->done_job_count
283 - $self->failed_job_count;
287 ##---------------------------- [stringification] -----------------------------
289 my %meta_status_2_color = (
290 'DONE' =>
'bright_cyan',
291 'RUNNING' =>
'bright_yellow',
292 'READY' =>
'bright_green',
293 'BLOCKED' =>
'black on_white',
298 # "Support for colors 8 through 15 (the bright_ variants) was added in
299 # Term::ANSIColor 3.00, included in Perl 5.13.3."
300 # http://perldoc.perl.org/Term/ANSIColor.html#COMPATIBILITY
301 if ($Term::ANSIColor::VERSION <
'3.00') {
302 foreach my $s (keys %meta_status_2_color) {
303 my $c = $meta_status_2_color{$s};
305 $meta_status_2_color{$s} = $c;
309 my %analysis_status_2_meta_status = (
310 'LOADING' =>
'READY',
311 'SYNCHING' =>
'READY',
312 'ALL_CLAIMED' =>
'BLOCKED',
313 'EXCLUDED' =>
'FAILED',
314 'WORKING' =>
'RUNNING',
317 my %count_method_2_meta_status = (
318 'semaphored_job_count' =>
'BLOCKED',
319 'ready_job_count' =>
'READY',
320 'inprogress_job_count' =>
'RUNNING',
321 'done_job_count' =>
'DONE',
322 'failed_job_count' =>
'FAILED',
325 sub _text_with_status_color {
326 my $color_enabled = shift;
328 return ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color(
'reset') : $_[0]);
332 sub job_count_breakout {
334 my $color_enabled = shift;
338 my $total_job_count = $self->total_job_count();
339 foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
340 if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
341 push @count_list, _text_with_status_color($color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
344 my $breakout_label = join(
'+', @count_list);
345 $breakout_label .=
'='.$total_job_count
if(scalar(@count_list)>1); # only provide a total
if multiple categories available
346 $breakout_label =
'0' if(scalar(@count_list)==0);
348 return ($breakout_label, $total_job_count, \%count_hash);
351 sub friendly_avg_job_runtime {
354 my $avg = $self->avg_msec_per_job;
355 my @units = ([24*3600*1000,
'day'], [3600*1000,
'hr'], [60*1000,
'min'], [1000,
'sec']);
357 while (my $unit_description = shift @units) {
358 my $x = $avg / $unit_description->[0];
360 return ($x, $unit_description->[1]);
367 # Very simple interpolation that doesn't need to align the fields
371 my $fields = $self->_toString_fields;
372 my $s = $self->_toString_template;
373 # Replace each named field with its value
374 $s =~ s/%\((-?)([a-zA-Z_]\w*)\)/$fields->{$2}/ge;
378 sub _toString_template {
381 return q{%(-logic_name)(%(analysis_id)) %(status), %(breakout_label) jobs, avg: %(avg_runtime)%(avg_runtime_unit) %(num_running_workers) worker%(worker_plural) (%(num_estimated_workers) required), h.cap:%(hive_capacity) a.cap:%(analysis_capacity) (sync
'd %(seconds_since_when_updated) sec ago)};
385 sub _toString_fields {
388 my $can_do_colour = (-t STDOUT ? 1 : 0);
389 my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout($can_do_colour);
390 my $analysis = $self->analysis;
391 my ($avg_runtime, $avg_runtime_unit) = $self->friendly_avg_job_runtime;
392 my $status_text = $self->status;
393 if ($self->is_excluded) {
394 $status_text = 'EXCLUDED
';
398 'logic_name
' => $analysis->logic_name,
399 'analysis_id
' => $self->analysis_id // 0,
400 'status
' => _text_with_status_color($can_do_colour, $status_text, $analysis_status_2_meta_status{$status_text} || $status_text),
401 'breakout_label
' => $breakout_label,
402 $avg_runtime_unit ? ( ## With trailing characters to have everything look nicely aligned
403 'avg_runtime
' => sprintf('%.1f
', $avg_runtime), # Notice the trailing space
404 'avg_runtime_unit
' => $avg_runtime_unit . ',
', # Notice the trailing comma
406 'avg_runtime
' => 'N/A,
', # Notice the trailing commma
407 'avg_runtime_unit
' => '',
409 'num_running_workers
' => $self->num_running_workers,
410 'worker_plural
' => $self->num_running_workers != 1 ? 's
' : ' ',
411 'num_estimated_workers
' => $self->estimate_num_required_workers,
412 'hive_capacity
' => $analysis->hive_capacity // '-
',
413 'analysis_capacity
' => $analysis->analysis_capacity // '-
',
414 'seconds_since_when_updated
' => $self->seconds_since_when_updated // 0,
419 ##------------------------- [status synchronization] --------------------------
422 sub check_blocking_control_rules {
423 my ($self, $no_die) = @_;
425 my $ctrl_rules = $self->analysis->control_rules_collection();
427 my $all_conditions_satisfied = 1;
429 if(scalar @$ctrl_rules) { # there are blocking ctrl_rules to check
431 foreach my $ctrl_rule (@$ctrl_rules) {
433 my $condition_analysis = $ctrl_rule->condition_analysis(undef, $no_die);
434 unless ($condition_analysis) {
435 $all_conditions_satisfied = 0;
439 my $condition_stats = $condition_analysis->stats;
440 unless ($condition_stats) {
441 $all_conditions_satisfied = 0;
445 # Make sure we use fresh properties of the AnalysisStats object
446 # (especially relevant in the case of foreign pipelines, since
447 # local objects are periodically refreshed)
448 $condition_stats->refresh();
450 my $condition_status = $condition_stats->status;
451 my $condition_cbe = $condition_analysis->can_be_empty;
452 my $condition_tjc = $condition_stats->total_job_count;
454 my $this_condition_satisfied = ($condition_status eq 'DONE
')
455 || ($condition_cbe && !$condition_tjc); # probably safer than saying ($condition_status eq 'EMPTY
') because of the sync order
457 unless( $this_condition_satisfied ) {
458 $all_conditions_satisfied = 0;
462 if($all_conditions_satisfied) {
463 if($self->status eq 'BLOCKED
') { # unblock, since all conditions are met
464 $self->status('LOADING
'); # anything that is not 'BLOCKED
' will do, it will be redefined in the following subroutine
467 $self->status('BLOCKED
');
471 return $all_conditions_satisfied;
475 sub determine_status {
478 if($self->status ne 'BLOCKED
') {
479 if( !$self->total_job_count ) {
481 $self->status('EMPTY
');
483 } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) { # all jobs of the analysis have been finished
484 my $analysis = $self->analysis;
485 my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
486 if ($self->failed_job_count > $absolute_tolerance) {
487 $self->status('FAILED
');
489 $self->status('DONE
');
491 } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running
493 $self->status('READY
');
495 } elsif( !$self->ready_job_count ) { # there are no claimable jobs, possibly because some are semaphored
497 $self->status('ALL_CLAIMED
');
499 } elsif( $self->inprogress_job_count ) {
501 $self->status('WORKING
');
507 sub recalculate_from_job_counts {
508 my ($self, $job_counts) = @_;
510 # only update job_counts if given the hash:
512 foreach my $counter ('semaphored_job_count
', 'ready_job_count
', 'failed_job_count
', 'done_job_count
') {
513 my $value = sum( map {$job_counts->{$_} // 0} grep {$status2counter{$_} eq $counter} keys %status2counter);
514 $self->$counter( $value );
516 $self->total_job_count( sum( values %$job_counts ) || 0 );
519 $self->check_blocking_control_rules();
521 $self->determine_status();