ensembl-hive  2.6
Scheduler.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  Scheduler starts with the numbers of required workers for unblocked analyses,
10  then goes through several kinds of restrictions (submit_limit, meadow_limits, hive_capacity, etc)
11  that act as limiters and may cap the original numbers in several ways.
12  The capped numbers are then grouped by meadow_type and rc_name and returned in a two-level hash.
13 
14 =head1 LICENSE
15 
16  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
17  Copyright [2016-2024] EMBL-European Bioinformatics Institute
18 
19  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
20  You may obtain a copy of the License at
21 
22  http://www.apache.org/licenses/LICENSE-2.0
23 
24  Unless required by applicable law or agreed to in writing, software distributed under the License
25  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26  See the License for the specific language governing permissions and limitations under the License.
27 
28 =head1 CONTACT
29 
30  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
31 
32 =cut
33 
34 
35 package Bio::EnsEMBL::Hive::Scheduler;
36 
37 use strict;
38 use warnings;
39 
40 use List::Util ('shuffle');
41 
44 
45 
46 sub scheduler_say {
47  my ($msgs) = @_;
48 
49  $msgs = [ $msgs ] unless( ref($msgs) eq 'ARRAY' );
50 
51  foreach my $msg (@$msgs) {
52  print "Scheduler : $msg\n";
53  }
54 }
55 
56 
57 sub schedule_workers_resync_if_necessary {
58  my ($queen, $valley, $list_of_analyses) = @_;
59 
60  my $reconciled_worker_statuses = $valley->query_worker_statuses( $queen->registered_workers_attributes );
61  my $submit_capacity = $valley->config_get('SubmitWorkersMax');
62  my $default_meadow_type = $valley->get_default_meadow()->type;
63  my ($valley_running_worker_count,
64  $meadow_capacity_limiter_hashed_by_type)= $valley->generate_limiters( $reconciled_worker_statuses );
65 
66  my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
67  = schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type);
68 
69  scheduler_say( $log_buffer );
70 
71  unless( $total_extra_workers_required ) {
72  scheduler_say( "According to analysis_stats no workers are required... let's see if anything went out of sync." );
73  $queen->check_for_dead_workers($valley, 1);
74 
75  scheduler_say( "re-synchronizing..." );
76  $queen->synchronize_hive( $list_of_analyses );
77 
78  if( $queen->check_nothing_to_run_but_semaphored( $list_of_analyses ) ) { # double-check that we are really stuck
79  scheduler_say( "looks like we may need re-balancing semaphore_counts..." );
80  if( $queen->db->hive_pipeline->hive_auto_rebalance_semaphores ) { # make sure rebalancing only ever happens for the pipelines that asked for it
81  if( my $rebalanced_jobs_counter = $queen->db->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ) ) {
82  scheduler_say( "re-balanced $rebalanced_jobs_counter jobs, going through another re-synchronization..." );
83  $queen->synchronize_hive( $list_of_analyses );
84  } else {
85  scheduler_say( "hmmm... managed to re-balance 0 jobs, you may need to investigate further." );
86  }
87  } else {
88  scheduler_say([ "automatic re-balancing of semaphore_counts is off by default.",
89  "If you think your pipeline might benefit from it, set hive_auto_rebalance_semaphores => 1 in the PipeConfig's hive_meta_table.",
90  "You can also manually rebalance semaphores this time by running beekeeper with the '--balance_semaphores' option",
91  ]);
92  }
93  } else {
94  scheduler_say( "some READY jobs still in the queue. No need to consider re-balancing at this time." );
95  }
96 
97  ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
98  = schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type);
99 
100  scheduler_say( $log_buffer );
101  }
102 
103  # adjustment for pending workers:
104  my $pending_worker_counts_by_meadow_type_rc_name = $queen->get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user(Bio::EnsEMBL::Hive::Utils::whoami());
105 
106  while( my ($this_meadow_type, $partial_workers_to_submit_by_rc_name) = each %$workers_to_submit_by_meadow_type_rc_name) {
107  while( my ($this_rc_name, $workers_to_submit_this_group) = each %$partial_workers_to_submit_by_rc_name) {
108  if(my $pending_this_group = $pending_worker_counts_by_meadow_type_rc_name->{ $this_meadow_type }{ $this_rc_name }) {
109 
110  my $msg_intro = "The plan was to submit $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers when the Scheduler detected $pending_this_group pending in this group, ";
111 
112  if( $workers_to_submit_this_group > $pending_this_group) {
113  $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name} -= $pending_this_group; # adjust the hashed value
114  scheduler_say( $msg_intro . "so I recommend submitting only ".$workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}." extra" );
115  } else {
116  delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}; # avoid leaving an empty group in the hash
117  scheduler_say( $msg_intro . "so I don't recommend submitting any extra" );
118  }
119  } else {
120  scheduler_say( "I recommend submitting $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers" );
121  }
122  }
123 
124  unless(keys %{ $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type} }) { # if nothing has been scheduled for a meadow,
125  delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}; # do not mention the meadow in the hash
126  }
127  }
128 
129  return $workers_to_submit_by_meadow_type_rc_name;
130 }
131 
132 
133 sub suggest_analysis_to_specialize_a_worker {
134  my ( $worker, $analyses_matching_pattern, $analyses_pattern ) = @_;
135 
136  my $queen = $worker->adaptor;
137  my $worker_rc_id = $worker->resource_class_id;
138  my $worker_meadow_type = $worker->meadow_type;
139 
140  if( ! @$analyses_matching_pattern ) {
141 
142  return "Could not find any Analyses matching '$analyses_pattern' pattern";
143 
144  } else {
145 
146  $worker->worker_say( "Found ".scalar(@$analyses_matching_pattern)." analyses matching '$analyses_pattern' pattern" );
147 
148  my @analyses_matching_worker = grep { !$worker_rc_id or $worker_rc_id==$_->resource_class_id}
149  grep { !$worker_meadow_type or !$_->meadow_type or ($worker_meadow_type eq $_->meadow_type) }
150  # if any other attributes of the worker are specifically constrained in the analysis (such as meadow_name),
151  # the corresponding checks should be added here
152  @$analyses_matching_pattern;
153 
154  if( !@analyses_matching_worker ) {
155 
156  return "Could not find any of the ".scalar(@$analyses_matching_pattern)." '$analyses_pattern' Analyses that would suit this Worker";
157 
158  } else {
159 
160  my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
161  = schedule_workers( $queen, 1, $worker_meadow_type, \@analyses_matching_worker );
162 
163  if( $worker->debug ) {
164  foreach my $msg (@$log_buffer) {
165  $worker->worker_say( $msg );
166  }
167  }
168 
169  return scalar(@$workers_to_submit_by_analysis)
170  ? $workers_to_submit_by_analysis->[0][0] # take the first analysis from the "plan" if the "plan" was not empty
171  : pop @$log_buffer; # or return the last line of the scheduling log
172  }
173  }
174 }
175 
176 
177 sub schedule_workers {
178  my ($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type) = @_;
179 
180  my @workers_to_submit_by_analysis = (); # The down-to-analysis "plan" that may completely change by the time the Workers are born and specialized
181  my %workers_to_submit_by_meadow_type_rc_name = (); # Pre-pending-adjusted per-resource breakout
182  my $total_extra_workers_required = 0;
183  my ($pairs_sorted_by_suitability, $log_buffer) = Bio::EnsEMBL::Hive::Scheduler::sort_pairs_by_suitability( $list_of_analyses );
184 
185  unless( @$pairs_sorted_by_suitability ) {
186 
187  unless( @$log_buffer ) {
188  push @$log_buffer, "Could not find any suitable analyses to start scheduling.";
189  }
190 
191  } else {
192 
193  my $submit_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity );
194  my $queen_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->db->hive_pipeline->get_cached_hive_current_load() );
195 
196  ANALYSIS: foreach my $pair (@$pairs_sorted_by_suitability) {
197  if( $submit_capacity_limiter->reached ) {
198  if( $meadow_capacity_limiter_hashed_by_type ) { # only add this message when scheduling and not during a Worker's specialization
199  push @$log_buffer, "Submission capacity (=".$submit_capacity_limiter->original_capacity.") has been reached.";
200  }
201  last ANALYSIS;
202  }
203 
204  my ($analysis, $analysis_stats) = @$pair;
205 
206  my $logic_name = $analysis->logic_name;
207  my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
208 
209  if( $meadow_capacity_limiter_hashed_by_type && !$meadow_capacity_limiter_hashed_by_type->{$this_meadow_type} ) {
210  push @$log_buffer, "The Meadow '$this_meadow_type' is not reachable from here, skipping Analysis '$logic_name'.";
211  next ANALYSIS;
212  }
213  if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached ) {
214  push @$log_buffer, "Available capacity of '$this_meadow_type' Meadow (=".$meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->original_capacity.") has been reached, skipping Analysis '$logic_name'.";
215  next ANALYSIS;
216  }
217 
218  push @$log_buffer, "Analysis '$logic_name' is ".$analysis_stats->status.", safe-synching it...";
219 
220  # Do a (safe) sync to get up-to-date job-counts and status
221  if( $queen->safe_synchronize_AnalysisStats($analysis_stats) ) {
222  $log_buffer->[-1] .= " succeeded";
223  push @$log_buffer, $analysis_stats->toString;
224  } elsif (scalar(@{ $analysis->control_rules_collection() })) {
225  # The analysis is blockable and we haven't managed to sync it, so the status is unreliable
226  $log_buffer->[-1] .= " failed: cannot tell whether it is BLOCKED or not, skipping it.";
227  next ANALYSIS;
228  } else {
229  $log_buffer->[-1] .= " failed: will use old stats.";
230  }
231 
232  if( ($analysis_stats->status eq 'BLOCKED') or (($analysis_stats->sync_lock) and scalar(@{ $analysis->control_rules_collection() }))) {
233  push @$log_buffer, "Analysis '$logic_name' is still ".$analysis_stats->status.", skipping it.";
234  next ANALYSIS;
235  }
236 
237  # getting the initial worker requirement for this analysis (may be off if $analysis_stats has not been sync'ed recently)
238  my $extra_workers_this_analysis = $analysis_stats->estimate_num_required_workers;
239 
240  if ($extra_workers_this_analysis <= 0) {
241  push @$log_buffer, "Analysis '$logic_name' doesn't require extra workers, skipping it.";
242  next ANALYSIS;
243  }
244 
245  $total_extra_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (if nothing required we may need a resync later)
246 
247  # setting up all negotiating limiters:
248  $queen_capacity_limiter->multiplier( $analysis->hive_capacity );
249  my @limiters = (
250  $submit_capacity_limiter,
251  $queen_capacity_limiter,
252  $meadow_capacity_limiter_hashed_by_type
253  ? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
254  : (),
255  defined($analysis->analysis_capacity)
256  ? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '$logic_name' analysis",
257  $analysis->analysis_capacity - $analysis_stats->num_running_workers )
258  : (),
259  );
260 
261  my $hit_the_limit;
262 
263  # negotiations:
264  foreach my $limiter (@limiters) {
265  ($extra_workers_this_analysis, $hit_the_limit) = $limiter->preliminary_offer( $extra_workers_this_analysis );
266 
267  if($hit_the_limit) {
268  if($extra_workers_this_analysis>0) {
269  push @$log_buffer, "Hit the limit of *** ".$limiter->description." ***, settling for $extra_workers_this_analysis Workers.";
270  } else {
271  push @$log_buffer, "Hit the limit of *** ".$limiter->description." ***, skipping this Analysis.";
272  next ANALYSIS;
273  }
274  }
275  }
276 
277  # let all parties know the final decision of negotiations:
278  foreach my $limiter (@limiters) {
279  $limiter->final_decision( $extra_workers_this_analysis );
280  }
281 
282  push @workers_to_submit_by_analysis, [ $analysis, $extra_workers_this_analysis];
283 
284  if($meadow_capacity_limiter_hashed_by_type) {
285  my $this_rc_name = $analysis->resource_class->name;
286  $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
287  push @$log_buffer, sprintf("The Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]",
288  $logic_name,
289  $queen_capacity_limiter->available_capacity,
290  );
291  }
292 
293  } # /ANALYSIS : foreach my $pair (@$pairs_sorted_by_suitability)
294  }
295 
296  return (\@workers_to_submit_by_analysis, \%workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer);
297 }
298 
299 
300 sub sort_pairs_by_suitability {
301 
302  my @sorted_stats = map { [ $_, $_->stats] } # 3. pair analyses with their stats objects
303  sort { $b->priority <=> $a->priority } # 2. but ordered according to their priority levels
304  shuffle # 1. make sure analyses are well mixed within the same priority level
305  @{ shift @_ };
306 
307  my (@primary_candidates, @secondary_candidates, $discarded_count, @log_buffer);
308 
309  foreach my $pair ( @sorted_stats ) {
310  my ($analysis, $stats) = @$pair;
311 
312  # assuming sync() is expensive, so first trying analyses that have already been sunk:
313  if( ($stats->estimate_num_required_workers > 0) and ($stats->status =~/^(READY|WORKING)$/) ) {
314 
315  push @primary_candidates, $pair;
316 
317  } elsif( $stats->status =~ /^(LOADING|BLOCKED|ALL_CLAIMED|SYNCHING)$/ ) {
318 
319  push @secondary_candidates, $pair;
320 
321  } else {
322 
323  $discarded_count++;
324  }
325  }
326 
327  if( $discarded_count ) {
328  push @log_buffer, "Discarded $discarded_count analyses because they do not need any Workers.";
329  }
330 
331  return ( [@primary_candidates, @secondary_candidates], \@log_buffer );
332 }
333 
334 1;
335 
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
Bio::EnsEMBL::Hive::Limiter::new
public new()
map
public map()
Bio::EnsEMBL::Hive::Scheduler::sort_pairs_by_suitability
public sort_pairs_by_suitability()
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::Limiter
Definition: Limiter.pm:10
Bio::EnsEMBL::Hive::Utils::whoami
public whoami()
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::Scheduler
Definition: Scheduler.pm:15