my ($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type) = @_;
my @workers_to_submit_by_analysis = (); # The down-to-analysis "plan" that may completely change by the time the Workers are born and specialized
my %workers_to_submit_by_meadow_type_rc_name = (); # Pre-pending-adjusted per-resource breakout
my $total_extra_workers_required = 0;
unless( @$pairs_sorted_by_suitability ) {
unless( @$log_buffer ) {
push @$log_buffer, "Could not find any suitable analyses to start scheduling.";
}
} else {
my $queen_capacity_limiter =
Bio::EnsEMBL::Hive::Limiter->
new(
'Total reciprocal capacity of the Hive', 1.0 - $queen->db->hive_pipeline->get_cached_hive_current_load() );
ANALYSIS: foreach my $pair (@$pairs_sorted_by_suitability) {
if( $submit_capacity_limiter->reached ) {
if( $meadow_capacity_limiter_hashed_by_type ) { # only add this message when scheduling and not during a Worker's specialization
push @$log_buffer, "Submission capacity (=".$submit_capacity_limiter->original_capacity.") has been reached.";
}
last ANALYSIS;
}
my ($analysis, $analysis_stats) = @$pair;
my $logic_name = $analysis->logic_name;
my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
if( $meadow_capacity_limiter_hashed_by_type && !$meadow_capacity_limiter_hashed_by_type->{$this_meadow_type} ) {
push @$log_buffer, "The Meadow '$this_meadow_type' is not reachable from here, skipping Analysis '$logic_name'.";
next ANALYSIS;
}
if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached ) {
push @$log_buffer, "Available capacity of '$this_meadow_type' Meadow (=".$meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->original_capacity.") has been reached, skipping Analysis '$logic_name'.";
next ANALYSIS;
}
push @$log_buffer, "Analysis '$logic_name' is ".$analysis_stats->status.", safe-synching it...";
# Do a (safe) sync to get up-to-date job-counts and status
if( $queen->safe_synchronize_AnalysisStats($analysis_stats) ) {
push @$log_buffer, "Safe-sync of Analysis '$logic_name' succeeded.";
} elsif (scalar(@{ $analysis->control_rules_collection() })) {
# The analysis is blockable and we haven't managed to sync it, so the status is unreliable
push @$log_buffer, "Safe-sync of Analysis '$logic_name' could not be run at this moment, cannot tell whether it is BLOCKED or not, skipping it.";
next ANALYSIS;
} else {
push @$log_buffer, "Safe-sync of Analysis '$logic_name' could not be run at this moment, will use old stats.";
}
if( ($analysis_stats->status eq 'BLOCKED') or (($analysis_stats->sync_lock) and scalar(@{ $analysis->control_rules_collection() }))) {
push @$log_buffer, "Analysis '$logic_name' is still ".$analysis_stats->status.", skipping it.";
next ANALYSIS;
}
# getting the initial worker requirement for this analysis (may be off if $analysis_stats has not been sync'ed recently)
my $extra_workers_this_analysis = $analysis_stats->estimate_num_required_workers;
if ($extra_workers_this_analysis <= 0) {
push @$log_buffer, "Analysis '$logic_name' doesn't require extra workers, skipping it.";
next ANALYSIS;
}
$total_extra_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (if nothing required we may need a resync later)
# setting up all negotiating limiters:
$queen_capacity_limiter->multiplier( $analysis->hive_capacity );
my @limiters = (
$submit_capacity_limiter,
$queen_capacity_limiter,
$meadow_capacity_limiter_hashed_by_type
? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
: (),
defined($analysis->analysis_capacity)
$analysis->analysis_capacity - $analysis_stats->num_running_workers )
: (),
);
my $hit_the_limit;
# negotiations:
foreach my $limiter (@limiters) {
($extra_workers_this_analysis, $hit_the_limit) = $limiter->preliminary_offer( $extra_workers_this_analysis );
if($hit_the_limit) {
if($extra_workers_this_analysis>0) {
push @$log_buffer, "Hit the limit of *** ".$limiter->description." ***, settling for $extra_workers_this_analysis Workers.";
} else {
push @$log_buffer, "Hit the limit of *** ".$limiter->description." ***, skipping this Analysis.";
next ANALYSIS;
}
}
}
# let all parties know the final decision of negotiations:
foreach my $limiter (@limiters) {
$limiter->final_decision( $extra_workers_this_analysis );
}
push @workers_to_submit_by_analysis, [ $analysis, $extra_workers_this_analysis];
push @$log_buffer, $analysis_stats->toString;
if($meadow_capacity_limiter_hashed_by_type) {
my $this_rc_name = $analysis->resource_class->name;
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
push @$log_buffer, sprintf("Before checking the Valley for pending jobs, the Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]",
$logic_name,
$queen_capacity_limiter->available_capacity,
);
}
} # /ANALYSIS : foreach my $pair (@$pairs_sorted_by_suitability)
}
return (\@workers_to_submit_by_analysis, \%workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer);
}