9 An
object that maintains counters
for jobs in different states. This data is used by the Scheduler.
13 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14 Copyright [2016-2022] EMBL-European Bioinformatics Institute
16 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
17 You may obtain a copy of the License at
21 Unless required by applicable law or agreed to in writing, software distributed under the License
22 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 See the License
for the specific language governing permissions and limitations under the License.
27 Please subscribe to the
Hive mailing list: http:
31 The rest of the documentation details each of the
object methods.
32 Internal methods are usually preceded with a _
37 package Bio::EnsEMBL::Hive::AnalysisStats;
45 use base (
'Bio::EnsEMBL::Hive::Storable' );
48 sub unikey { #
override the
default from
Cacheable parent
49 return [
'analysis' ];
53 ## Minimum amount of time in msec that a worker should run before reporting 54 ## back to the hive. This is used when setting the batch_size automatically. 62 analysis_id / analysis
70 return $self->analysis_id(@_);
76 $self->{
'_status'} = shift
if(@_);
77 return $self->{
'_status'};
82 $self->{
'_is_excluded'} = shift
if (@_);
83 return $self->{
'_is_excluded'};
86 ## counters of jobs in different states: 91 $self->{
'_total_job_count'} = shift
if(@_);
92 return $self->{
'_total_job_count'};
95 sub semaphored_job_count {
97 $self->{
'_semaphored_job_count'} = shift
if(@_);
98 return $self->{
'_semaphored_job_count'};
101 sub ready_job_count {
103 $self->{
'_ready_job_count'} = shift
if(@_);
104 return $self->{
'_ready_job_count'};
109 $self->{
'_done_job_count'} = shift
if(@_);
110 return $self->{
'_done_job_count'};
113 sub failed_job_count {
115 $self->{
'_failed_job_count'} = shift
if(@_);
116 $self->{
'_failed_job_count'} = 0 unless(defined($self->{
'_failed_job_count'}));
117 return $self->{
'_failed_job_count'};
120 sub num_running_workers {
122 $self->{
'_num_running_workers'} = shift
if(@_);
123 return $self->{
'_num_running_workers'};
130 sub avg_msec_per_job {
132 $self->{
'_avg_msec_per_job'} = shift
if(@_);
133 $self->{
'_avg_msec_per_job'}=0 unless($self->{
'_avg_msec_per_job'});
134 return $self->{
'_avg_msec_per_job'};
137 sub avg_input_msec_per_job {
139 $self->{
'_avg_input_msec_per_job'} = shift
if(@_);
140 $self->{
'_avg_input_msec_per_job'}=0 unless($self->{
'_avg_input_msec_per_job'});
141 return $self->{
'_avg_input_msec_per_job'};
144 sub avg_run_msec_per_job {
146 $self->{
'_avg_run_msec_per_job'} = shift
if(@_);
147 $self->{
'_avg_run_msec_per_job'}=0 unless($self->{
'_avg_run_msec_per_job'});
148 return $self->{
'_avg_run_msec_per_job'};
151 sub avg_output_msec_per_job {
153 $self->{
'_avg_output_msec_per_job'} = shift
if(@_);
154 $self->{
'_avg_output_msec_per_job'}=0 unless($self->{
'_avg_output_msec_per_job'});
155 return $self->{
'_avg_output_msec_per_job'};
159 ## other storable attributes: 161 sub when_updated { #
this method is called by the initial store() [at which point it returns undef]
163 $self->{
'_when_updated'} = shift
if(@_);
164 return $self->{
'_when_updated'};
167 sub seconds_since_when_updated { # we fetch the server difference, store local time in the memory object, and use the local difference
168 my( $self, $value ) = @_;
169 $self->{
'_when_updated'} = time() - $value
if(defined($value));
170 return defined($self->{
'_when_updated'}) ? time() - $self->{
'_when_updated'} : undef;
173 sub seconds_since_last_fetch { # track the freshness of the object (store local time, use the local difference)
174 my( $self, $value ) = @_;
175 $self->{
'_last_fetch'} = time() - $value
if(defined($value));
176 return defined($self->{
'_last_fetch'}) ? time() - $self->{
'_last_fetch'} : undef;
181 $self->{
'_sync_lock'} = shift
if(@_);
182 return $self->{
'_sync_lock'};
186 # non-storable attributes and other helper-methods: 190 my ($self, $seconds_fresh) = @_;
191 my $seconds_since_last_fetch = $self->seconds_since_last_fetch;
194 and (!defined($seconds_fresh) or !defined($seconds_since_last_fetch) or $seconds_fresh < $seconds_since_last_fetch) ) {
195 return $self->adaptor->refresh($self);
204 $self->adaptor->update_stats_and_monitor($self);
209 sub get_or_estimate_batch_size {
211 my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
213 my $batch_size = $self->analysis->batch_size;
215 if( $batch_size > 0 ) { #
set to positive or not
set (and
auto-initialized within $self->batch_size)
217 # otherwise it is a request
for dynamic estimation:
218 } elsif( my $avg_msec_per_job = $self->avg_msec_per_job ) { # further estimations from collected stats
220 $avg_msec_per_job = 100
if($avg_msec_per_job<100);
222 $batch_size = POSIX::ceil( $self->min_batch_time / $avg_msec_per_job );
224 }
else { # first estimation when no stats are available (take -$batch_size as first guess,
if not zero)
225 $batch_size = -$batch_size || 1;
228 # TailTrimming correction aims at meeting the requirement half way: 229 if( my $num_of_workers = POSIX::ceil( ($self->num_running_workers + $self->estimate_num_required_workers($remaining_job_count))/2 ) ) {
231 my $jobs_to_do = $self->ready_job_count + $remaining_job_count;
233 my $tt_batch_size = POSIX::floor( $jobs_to_do / $num_of_workers );
234 if( (0 < $tt_batch_size) && ($tt_batch_size < $batch_size) ) {
235 $batch_size = $tt_batch_size;
236 } elsif(!$tt_batch_size) {
237 $batch_size = POSIX::ceil( $jobs_to_do / $num_of_workers ); # essentially, 0 or 1
246 sub estimate_num_required_workers { #
this 'max allowed' total includes the ones that are currently running
248 my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
250 my $num_required_workers = $self->ready_job_count + $remaining_job_count; #
this 'max' estimation can still be zero
252 my $h_cap = $self->analysis->hive_capacity;
253 if( defined($h_cap) and $h_cap>=0) { # what is the currently attainable maximum defined via hive_capacity?
254 my $hive_current_load = $self->hive_pipeline->get_cached_hive_current_load();
255 my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
256 if($h_max < $num_required_workers) {
257 $num_required_workers = $h_max;
260 my $a_max = $self->analysis->analysis_capacity;
261 if( defined($a_max) and $a_max>=0 ) { # what is the currently attainable maximum defined via analysis_capacity?
262 if($a_max < $num_required_workers) {
263 $num_required_workers = $a_max;
267 return $num_required_workers;
271 sub inprogress_job_count { # includes CLAIMED
273 return $self->total_job_count
274 - $self->semaphored_job_count
275 - $self->ready_job_count
276 - $self->done_job_count
277 - $self->failed_job_count;
280 my %meta_status_2_color = (
281 'DONE' =>
'bright_cyan',
282 'RUNNING' =>
'bright_yellow',
283 'READY' =>
'bright_green',
284 'BLOCKED' =>
'black on_white',
289 # "Support for colors 8 through 15 (the bright_ variants) was added in 290 # Term::ANSIColor 3.00, included in Perl 5.13.3." 291 # http://perldoc.perl.org/Term/ANSIColor.html#COMPATIBILITY 292 if ($Term::ANSIColor::VERSION <
'3.00') {
293 foreach my $s (keys %meta_status_2_color) {
294 my $c = $meta_status_2_color{$s};
296 $meta_status_2_color{$s} = $c;
300 my %analysis_status_2_meta_status = (
301 'LOADING' =>
'READY',
302 'SYNCHING' =>
'READY',
303 'ALL_CLAIMED' =>
'BLOCKED',
304 'EXCLUDED' =>
'FAILED',
305 'WORKING' =>
'RUNNING',
308 my %count_method_2_meta_status = (
309 'semaphored_job_count' =>
'BLOCKED',
310 'ready_job_count' =>
'READY',
311 'inprogress_job_count' =>
'RUNNING',
312 'done_job_count' =>
'DONE',
313 'failed_job_count' =>
'FAILED',
316 sub _text_with_status_color {
317 my $field_size = shift;
318 my $color_enabled = shift;
320 my $padding = ($field_size and length($_[0]) < $field_size) ?
' ' x ($field_size - length($_[0])) :
'';
321 return $padding . ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color(
'reset') : $_[0]);
325 sub job_count_breakout {
327 my $field_size = shift;
328 my $color_enabled = shift;
333 my $total_job_count = $self->total_job_count();
334 foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
335 if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
336 $this_length += length(
"$count") + 1;
337 push @count_list, _text_with_status_color(undef, $color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
340 my $breakout_label = join(
'+', @count_list);
341 $this_length += scalar(@count_list)-1
if @count_list;
342 $breakout_label .=
'='.$total_job_count
if(scalar(@count_list)!=1); # only provide a total
if multiple or no categories available
343 $this_length += 1+length(
"$total_job_count")
if(scalar(@count_list)!=1);
345 $breakout_label =
' ' x ($field_size - $this_length) . $breakout_label
if $field_size and $this_length<$field_size;
347 return ($breakout_label, $total_job_count, \%count_hash);
350 sub friendly_avg_job_runtime {
353 my $avg = $self->avg_msec_per_job;
354 my @units = ([24*3600*1000,
'day'], [3600*1000,
'hr'], [60*1000,
'min'], [1000,
'sec']);
356 while (my $unit_description = shift @units) {
357 my $x = $avg / $unit_description->[0];
359 return ($x, $unit_description->[1]);
368 my $can_do_colour = (-t STDOUT ? 1 : 0);
369 my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout(24, $can_do_colour);
370 my $analysis = $self->analysis;
371 my ($avg_runtime, $avg_runtime_unit) = $self->friendly_avg_job_runtime;
372 my $max_logic_name_length = shift @_ || length($analysis->logic_name);
373 my $status_text = $self->status;
374 if ($self->is_excluded) {
375 $status_text =
'EXCLUDED';
378 my $output .= sprintf(
"%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Est.Required:%d) ",
379 $analysis->logic_name,
382 _text_with_status_color(11, $can_do_colour, $status_text, $analysis_status_2_meta_status{$status_text} || $status_text),
386 $avg_runtime, $avg_runtime_unit,
388 $self->num_running_workers,
389 $self->estimate_num_required_workers,
391 $output .=
' h.cap:' .( $analysis->hive_capacity
392 .
' a.cap:' .( $analysis->analysis_capacity
393 .
" (sync'd " .($self->seconds_since_when_updated
399 sub check_blocking_control_rules {
400 my ($self, $no_die) = @_;
402 my $ctrl_rules = $self->analysis->control_rules_collection();
404 my $all_conditions_satisfied = 1;
406 if(scalar @$ctrl_rules) { # there are blocking ctrl_rules to check
408 foreach my $ctrl_rule (@$ctrl_rules) {
410 my $condition_analysis = $ctrl_rule->condition_analysis(undef, $no_die);
411 unless ($condition_analysis) {
412 $all_conditions_satisfied = 0;
416 my $condition_stats = $condition_analysis->stats;
417 unless ($condition_stats) {
418 $all_conditions_satisfied = 0;
422 # Make sure we use fresh properties of the AnalysisStats object 423 # (especially relevant in the case of foreign pipelines, since 424 # local objects are periodically refreshed) 425 $condition_stats->refresh();
427 my $condition_status = $condition_stats->status;
428 my $condition_cbe = $condition_analysis->can_be_empty;
429 my $condition_tjc = $condition_stats->total_job_count;
431 my $this_condition_satisfied = ($condition_status eq
'DONE')
432 || ($condition_cbe && !$condition_tjc); # probably safer than saying ($condition_status eq
'EMPTY') because of the sync order
434 unless( $this_condition_satisfied ) {
435 $all_conditions_satisfied = 0;
439 if($all_conditions_satisfied) {
440 if($self->status eq
'BLOCKED') { # unblock, since all conditions are met
441 $self->status(
'LOADING'); # anything that is not
'BLOCKED' will
do, it will be redefined in the following subroutine
444 $self->status(
'BLOCKED');
448 return $all_conditions_satisfied;
452 sub determine_status {
455 if($self->status ne
'BLOCKED') {
456 if( !$self->total_job_count ) {
458 $self->status(
'EMPTY');
460 } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) { # all jobs of the analysis have been finished
461 my $analysis = $self->analysis;
462 my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
463 if ($self->failed_job_count > $absolute_tolerance) {
464 $self->status(
'FAILED');
466 $self->status(
'DONE');
468 } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running
470 $self->status(
'READY');
472 } elsif( !$self->ready_job_count ) { # there are no claimable jobs, possibly because some are semaphored
474 $self->status(
'ALL_CLAIMED');
476 } elsif( $self->inprogress_job_count ) {
478 $self->status(
'WORKING');
484 sub recalculate_from_job_counts {
485 my ($self, $job_counts) = @_;
487 # only update job_counts if given the hash: 489 $self->semaphored_job_count( $job_counts->{
'SEMAPHORED'} || 0 );
490 $self->ready_job_count( $job_counts->{
'READY'} || 0 );
491 $self->failed_job_count( $job_counts->{
'FAILED'} || 0 );
492 $self->done_job_count( ( $job_counts->{
'DONE'}
493 $self->total_job_count( sum( values %$job_counts ) || 0 );
496 $self->check_blocking_control_rules();
498 $self->determine_status();