9 Scheduler starts with the numbers of required workers
for unblocked analyses,
10 then goes through several kinds of restrictions (submit_limit, meadow_limits, hive_capacity, etc)
11 that act as limiters and may cap the original numbers in several ways.
12 The capped numbers are then grouped by meadow_type and rc_name and returned in a two-level hash.
16 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
17 Copyright [2016-2022] EMBL-European Bioinformatics Institute
19 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
20 You may obtain a copy of the License at
24 Unless required by applicable law or agreed to in writing, software distributed under the License
25 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26 See the License
for the specific language governing permissions and limitations under the License.
30 Please subscribe to the
Hive mailing list: http:
35 package Bio::EnsEMBL::Hive::Scheduler;
40 use List::Util (
'shuffle');
49 $msgs = [ $msgs ] unless( ref($msgs) eq
'ARRAY' );
51 foreach my $msg (@$msgs) {
52 print
"Scheduler : $msg\n";
57 sub schedule_workers_resync_if_necessary {
58 my ($queen, $valley, $list_of_analyses) = @_;
60 my $reconciled_worker_statuses = $valley->query_worker_statuses( $queen->registered_workers_attributes );
61 my $submit_capacity = $valley->config_get(
'SubmitWorkersMax');
62 my $default_meadow_type = $valley->get_default_meadow()->type;
63 my ($valley_running_worker_count,
64 $meadow_capacity_limiter_hashed_by_type)= $valley->generate_limiters( $reconciled_worker_statuses );
66 my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
67 = schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type);
69 scheduler_say( $log_buffer );
71 unless( $total_extra_workers_required ) {
72 scheduler_say(
"According to analysis_stats no workers are required... let's see if anything went out of sync." );
73 $queen->check_for_dead_workers($valley, 1);
75 scheduler_say(
"re-synchronizing..." );
76 $queen->synchronize_hive( $list_of_analyses );
78 if( $queen->check_nothing_to_run_but_semaphored( $list_of_analyses ) ) { #
double-check that we are really stuck
79 scheduler_say(
"looks like we may need re-balancing semaphore_counts..." );
80 if( $queen->db->hive_pipeline->hive_auto_rebalance_semaphores ) { # make sure rebalancing only ever happens
for the pipelines that asked
for it
81 if( my $rebalanced_jobs_counter = $queen->db->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ) ) {
82 scheduler_say(
"re-balanced $rebalanced_jobs_counter jobs, going through another re-synchronization..." );
83 $queen->synchronize_hive( $list_of_analyses );
85 scheduler_say(
"hmmm... managed to re-balance 0 jobs, you may need to investigate further." );
88 scheduler_say([
"automatic re-balancing of semaphore_counts is off by default.",
89 "If you think your pipeline might benefit from it, set hive_auto_rebalance_semaphores => 1 in the PipeConfig's hive_meta_table.",
90 "You can also manually rebalance semaphores this time by running beekeeper with the '--balance_semaphores' option",
94 scheduler_say(
"some READY jobs still in the queue. No need to consider re-balancing at this time." );
97 ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
98 = schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type);
100 scheduler_say( $log_buffer );
103 # adjustment for pending workers: 104 my $pending_worker_counts_by_meadow_type_rc_name = $queen->get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user(
Bio::EnsEMBL::Hive::Utils::whoami());
106 while( my ($this_meadow_type, $partial_workers_to_submit_by_rc_name) = each %$workers_to_submit_by_meadow_type_rc_name) {
107 while( my ($this_rc_name, $workers_to_submit_this_group) = each %$partial_workers_to_submit_by_rc_name) {
108 if(my $pending_this_group = $pending_worker_counts_by_meadow_type_rc_name->{ $this_meadow_type }{ $this_rc_name }) {
110 scheduler_say(
"The plan was to submit $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers when the Scheduler detected $pending_this_group pending in this group, " );
112 if( $workers_to_submit_this_group > $pending_this_group) {
113 $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name} -= $pending_this_group; # adjust the hashed value
114 scheduler_say(
"so I recommend submitting only ".$workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}.
" extra" );
116 delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}; # avoid leaving an empty group in the hash
117 scheduler_say(
"so I don't recommend submitting any extra" );
120 scheduler_say(
"I recommend submitting $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers" );
124 unless(keys %{ $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type} }) { #
if nothing has been scheduled
for a meadow,
125 delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}; #
do not mention the meadow in the hash
129 return $workers_to_submit_by_meadow_type_rc_name;
133 sub suggest_analysis_to_specialize_a_worker {
134 my ( $worker, $analyses_matching_pattern, $analyses_pattern ) = @_;
136 my $queen = $worker->adaptor;
137 my $worker_rc_id = $worker->resource_class_id;
138 my $worker_meadow_type = $worker->meadow_type;
140 if( ! @$analyses_matching_pattern ) {
142 return "Could not find any Analyses matching '$analyses_pattern' pattern";
146 $worker->worker_say(
"Found ".scalar(@$analyses_matching_pattern).
" analyses matching '$analyses_pattern' pattern" );
148 my @analyses_matching_worker = grep { !$worker_rc_id or $worker_rc_id==$_->resource_class_id}
149 grep { !$worker_meadow_type or !$_->meadow_type or ($worker_meadow_type eq $_->meadow_type) }
150 # if any other attributes of the worker are specifically constrained in the analysis (such as meadow_name), 151 # the corresponding checks should be added here 152 @$analyses_matching_pattern;
154 if( !@analyses_matching_worker ) {
156 return "Could not find any of the ".scalar(@$analyses_matching_pattern).
" '$analyses_pattern' Analyses that would suit this Worker";
160 my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
161 = schedule_workers( $queen, 1, $worker_meadow_type, \@analyses_matching_worker );
163 if( $worker->debug ) {
164 foreach my $msg (@$log_buffer) {
165 $worker->worker_say( $msg );
169 return scalar(@$workers_to_submit_by_analysis)
170 ? $workers_to_submit_by_analysis->[0][0] # take the first analysis from the
"plan" if the
"plan" was not empty
171 : pop @$log_buffer; # or
return the last line of the scheduling log
177 sub schedule_workers {
178 my ($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type) = @_;
180 my @workers_to_submit_by_analysis = (); # The down-to-analysis
"plan" that may completely change by the time the Workers are born and specialized
181 my %workers_to_submit_by_meadow_type_rc_name = (); # Pre-pending-adjusted per-resource breakout
182 my $total_extra_workers_required = 0;
185 unless( @$pairs_sorted_by_suitability ) {
187 unless( @$log_buffer ) {
188 push @$log_buffer,
"Could not find any suitable analyses to start scheduling.";
194 my $queen_capacity_limiter =
Bio::EnsEMBL::Hive::Limiter->
new(
'Total reciprocal capacity of the Hive', 1.0 - $queen->db->hive_pipeline->get_cached_hive_current_load() );
196 ANALYSIS:
foreach my $pair (@$pairs_sorted_by_suitability) {
197 if( $submit_capacity_limiter->reached ) {
198 if( $meadow_capacity_limiter_hashed_by_type ) { # only add
this message when scheduling and not during a
Worker's specialization 199 push @$log_buffer, "Submission capacity (=".$submit_capacity_limiter->original_capacity.") has been reached."; 204 my ($analysis, $analysis_stats) = @$pair; 206 my $logic_name = $analysis->logic_name; 207 my $this_meadow_type = $analysis->meadow_type || $default_meadow_type; 209 if( $meadow_capacity_limiter_hashed_by_type && !$meadow_capacity_limiter_hashed_by_type->{$this_meadow_type} ) { 210 push @$log_buffer, "The Meadow '$this_meadow_type
' is not reachable from here, skipping Analysis '$logic_name
'."; 213 if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached ) { 214 push @$log_buffer, "Available capacity of '$this_meadow_type
' Meadow (=".$meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->original_capacity.") has been reached, skipping Analysis '$logic_name
'."; 218 push @$log_buffer, "Analysis '$logic_name
' is ".$analysis_stats->status.", safe-synching it..."; 220 # Do a (safe) sync to get up-to-date job-counts and status 221 if( $queen->safe_synchronize_AnalysisStats($analysis_stats) ) { 222 push @$log_buffer, "Safe-sync of Analysis '$logic_name
' succeeded."; 223 } elsif (scalar(@{ $analysis->control_rules_collection() })) { 224 # The analysis is blockable and we haven't managed to sync it, so the status is unreliable
225 push @$log_buffer,
"Safe-sync of Analysis '$logic_name' could not be run at this moment, cannot tell whether it is BLOCKED or not, skipping it.";
228 push @$log_buffer,
"Safe-sync of Analysis '$logic_name' could not be run at this moment, will use old stats.";
231 if( ($analysis_stats->status eq
'BLOCKED') or (($analysis_stats->sync_lock) and scalar(@{ $analysis->control_rules_collection() }))) {
232 push @$log_buffer,
"Analysis '$logic_name' is still ".$analysis_stats->
status.
", skipping it.";
236 # getting the initial worker requirement for this analysis (may be off if $analysis_stats has not been sync'ed recently) 237 my $extra_workers_this_analysis = $analysis_stats->estimate_num_required_workers;
239 if ($extra_workers_this_analysis <= 0) {
240 push @$log_buffer,
"Analysis '$logic_name' doesn't require extra workers, skipping it.";
244 $total_extra_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (
if nothing required we may need a resync later)
246 # setting up all negotiating limiters: 247 $queen_capacity_limiter->multiplier( $analysis->hive_capacity );
249 $submit_capacity_limiter,
250 $queen_capacity_limiter,
251 $meadow_capacity_limiter_hashed_by_type
252 ? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
254 defined($analysis->analysis_capacity)
256 $analysis->analysis_capacity - $analysis_stats->num_running_workers )
263 foreach my $limiter (@limiters) {
264 ($extra_workers_this_analysis, $hit_the_limit) = $limiter->preliminary_offer( $extra_workers_this_analysis );
267 if($extra_workers_this_analysis>0) {
268 push @$log_buffer,
"Hit the limit of *** ".$limiter->description.
" ***, settling for $extra_workers_this_analysis Workers.";
270 push @$log_buffer,
"Hit the limit of *** ".$limiter->description.
" ***, skipping this Analysis.";
276 # let all parties know the final decision of negotiations: 277 foreach my $limiter (@limiters) {
278 $limiter->final_decision( $extra_workers_this_analysis );
281 push @workers_to_submit_by_analysis, [ $analysis, $extra_workers_this_analysis];
282 push @$log_buffer, $analysis_stats->toString;
284 if($meadow_capacity_limiter_hashed_by_type) {
285 my $this_rc_name = $analysis->resource_class->name;
286 $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
287 push @$log_buffer, sprintf(
"Before checking the Valley for pending jobs, the Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]",
289 $queen_capacity_limiter->available_capacity,
293 } # /ANALYSIS :
foreach my $pair (@$pairs_sorted_by_suitability)
296 return (\@workers_to_submit_by_analysis, \%workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer);
300 sub sort_pairs_by_suitability {
302 my @sorted_stats = map { [ $_, $_->stats] } # 3. pair analyses with their stats objects
303 sort { $b->priority <=> $a->priority } # 2. but ordered according to their priority levels
304 shuffle # 1. make sure analyses are well mixed within the same priority level
307 my (@primary_candidates, @secondary_candidates, $discarded_count, @log_buffer);
309 foreach my $pair ( @sorted_stats ) {
310 my ($analysis, $stats) = @$pair;
312 # assuming sync() is expensive, so first trying analyses that have already been sunk: 313 if( ($stats->estimate_num_required_workers > 0) and ($stats->status =~/^(READY|WORKING)$/) ) {
315 push @primary_candidates, $pair;
317 } elsif( $stats->status =~ /^(LOADING|BLOCKED|ALL_CLAIMED|SYNCHING)$/ ) {
319 push @secondary_candidates, $pair;
327 if( $discarded_count ) {
328 push @log_buffer,
"Discarded $discarded_count analyses because they do not need any Workers.";
331 return ( [@primary_candidates, @secondary_candidates], \@log_buffer );
public sort_pairs_by_suitability()