ensembl-hive  2.8.1
SLURM.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  This is the 'SLURM' implementation of an EnsEMBL eHive Meadow.
10 
11 =head1 Compatibility
12 
13  Module version 5.5 is compatible with SLURM version slurm 23.02.7
14 
15 =head1 LICENSE
16 
17  See the NOTICE file distributed with this work for additional information
18  regarding copyright ownership.
19 
20  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
21  You may obtain a copy of the License at
22 
23  http://www.apache.org/licenses/LICENSE-2.0
24 
25  Unless required by applicable law or agreed to in writing, software distributed under the License
26  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27  See the License for the specific language governing permissions and limitations under the License.
28 
29 =head1 CONTACT
30 
31  Please subscribe to the Hive mailing list:
32  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users
33  to discuss Hive-related questions or to be notified of our updates
34 
35 =cut
36 
37 package Bio::EnsEMBL::Hive::Meadow::SLURM;
38 
39 use strict;
40 use warnings;
41 
42 use Bio::EnsEMBL::Hive::Utils qw(split_for_bash timeout);
43 use DateTime::Format::ISO8601;
44 use File::Temp qw(tempdir);
45 use Scalar::Util qw(looks_like_number);
46 use Time::Piece;
47 use Time::Seconds;
48 use IPC::Cmd qw(run);
49 
50 use parent 'Bio::EnsEMBL::Hive::Meadow';
51 
52 # Semantic version of the Meadow interface:
53 # change the Major version whenever an incompatible change is introduced,
54 # change the Minor version whenever the interface is extended, but compatibility is retained.
55 # Module version 5.5 is compatible with Slurm 23.02.7
56 our $VERSION = '5.5';
57 
58 =head name
59 
60  Args: : None
61  Description : Return cluster name
62  Return : "slurm" if available, else undef
63  Returntype : String
64 
65 =cut
66 
67 # For SLURM, a cluster name will only be available when running in a federation,
68 # so we always return "slurm".
69 # Since this is also called to check for availability, assume Slurm is available
70 # if sinfo gives a version and a non-zero node count.
71 sub name {
72  # List the slurm version and the cluster node count like "23.02.7:197"
73  my $sversion = `sinfo -V 2>/dev/null`;
74  $sversion =~ /^slurm (\d+)(?:\.\d+)*$/i;
75  my $slurm_version = $1;
76 
77  my $sinfo = `sinfo -ho "%D" 2>/dev/null`;
78  $sinfo =~ /^(\d+)$/;
79  my $node_count = $1;
80 
81  if ($slurm_version and $node_count and $slurm_version >= 23 and $node_count > 0) {
82  return "slurm";
83  }
84 }
85 
86 # Override method
87 sub _init_meadow {
88  my ($self, $config) = @_;
89  $self->SUPER::_init_meadow($config);
90 
91  # Override user config. On Slurm, the tmp dirs are always cleaned up. It is
92  # not necessary to set this. To clean up, beekeeper will try an SSH log in
93  # to each node. This does not work and only produces error messages.
94  $config and $config->set('CleanupTempDirectoryKilledWorkers', 0);
95 }
96 
97 
98 =head count_pending_workers_by_rc_name
99 
100  Args: : None
101  Description : Counts the number of pending workers of the user
102  Exception : Dies if command to retrieve pending workers fails.
103  Returntype : String
104 
105 =cut
106 
107 sub count_pending_workers_by_rc_name {
108  my ($self) = @_;
109 
110  my $jnp = $self->job_name_prefix();
111 
112  # Prefix for job is not implemented in Slurm, so need to get all
113  # and parse it out
114  my $cmd = "squeue --array --noheader --me --states='PENDING' --format='%j'";
115 
116  my @output = execute_command($cmd);
117 
118  my %pending_this_meadow_by_rc_name = ();
119  my $total_pending_this_meadow = 0;
120 
121  foreach my $line (@output) {
122  if ($line =~ /\b\Q$jnp\E(\S+)\-\d+(\[\d+\])?\b/) {
123  $pending_this_meadow_by_rc_name{$1}++;
124  $total_pending_this_meadow++;
125  }
126  }
127 
128  return (\%pending_this_meadow_by_rc_name, $total_pending_this_meadow);
129 } ## end sub count_pending_workers_by_rc_name
130 
131 =head count_running_workers
132 
133  Args: : None
134  Description : Counts the number of pending workers of the user
135  Exception : Dies if command to retrieve pending workers fails.
136  Returntype : String
137 
138 =cut
139 
140 sub count_running_workers {
141  my $self = shift;
142 
143  my $jnp = $self->job_name_prefix();
144 
145  my $cmd = "squeue --array --noheader --me --states='RUNNING' --format='%j' | grep ^$jnp | wc -l";
146  my $output = execute_command($cmd);
147 
148  $output =~ /(\d+)/;
149 
150  return $1;
151 }
152 
153 =head status_of_all_our_workers()
154 
155  Args: : None
156  Description : Counts the number of pending workers of the user
157  Exception : Dies if command to retrieve pending workers fails.
158  Returntype : Array [ worker_pid, user, status ]
159 
160 =cut
161 
162 sub status_of_all_our_workers {
163  my $self = shift;
164 
165  my $jnp = $self->job_name_prefix(); # reederj_ehive_rosaprd_278-Hive-
166  my @status_list = ();
167 
168  # PENDING, RUNNING, SUSPENDED, CANCELLED, COMPLETING, COMPLETED, CONFIGURING, FAILED,
169  # TIMEOUT, PREEMPTED, NODE_FAIL, REVOKED and SPECIAL_EXIT
170 
171  my $cmd = "squeue --array --noheader --me --format='%i|%T|%u'";
172 
173  my @output = execute_command($cmd);
174 
175  foreach my $line (@output) {
176  my ($worker_pid, $status, $user) = split(/\|/, $line);
177 
178  # TODO: not exactly sure what these are used for in the external code -
179  # this is based on the LSF status codes that were ignored
180  # Do not count COMPLETED or FAILED jobs.
181  next if (($status eq 'COMPLETED') or ($status eq 'FAILED'));
182 
183  push @status_list, [$worker_pid, $user, $status];
184  }
185  return \@status_list;
186 } ## end sub status_of_all_our_workers
187 
188 =head check_worker_is_alive_and_mine()
189 
191  Description : Checks if a worker is registered under the current logged in user.
192  Exception : Dies if command to retrieve pending workers fails.
193  Returntype : String (value of squeue command )
194 
195 =cut
196 
197 sub check_worker_is_alive_and_mine {
198  my ($self, $worker) = @_;
199 
200  my $wpid = $worker->process_id();
201  my $this_user = $ENV{'USER'};
202 
203  my $cmd = "squeue --noheader --me --job=$wpid";
204 
205  my $output = eval{execute_command($cmd)};
206 
207  return $output ? 1 : 0;
208 }
209 
210 =head kill_worker()
211 
212  Args: : None
213  Description : Counts the number of pending workers of the user
214  Exception : Dies if command to retrieve pending workers fails.
215  Returntype : Array [ worker_pid, user, status ]
216 
217 =cut
218 
219 sub kill_worker {
220  my ($self, $worker, $fast) = @_;
221 
222  my $cmd = 'scancel ' . $worker->process_id();
223  execute_command($cmd);
224 }
225 
226 =head get_report_entries_for_process_ids()
227 
228  Args: : None
229  Description : Gathers stats for a specific set of workers via sacct.
230  Exception : Dies if command to retrieve pending workers fails.
231  Returntype : Complex.
232 
233 =cut
234 
235 sub get_report_entries_for_process_ids {
236  my $self = shift @_; # make sure we get if off the way before splicing
237 
238  my %combined_report_entries = ();
239 
240  while (my $pid_batch = join(',', splice(@_, 0, 20))) {
241  # can't fit too many pids on one shell cmdline
242 
243  # sacct -j 19661,19662,19663
244  # --units=M Display values in specified unit type. [KMGTP]
245  my $cmd =
246  "sacct -n -p --units=M --format JobName,JobID,ExitCode,MaxRSS," .
247  "MaxDiskRead,CPUTimeRAW,ElapsedRAW,State,DerivedExitCode,End " .
248  "-j $pid_batch";
249 
250  # print "DEBUG get_report_entries_for_process_ids() running cmd:\n\t$cmd\n";
251  my $batch_of_report_entries = $self->parse_report_source_line($cmd);
252 
253  %combined_report_entries = (%combined_report_entries, %$batch_of_report_entries);
254  }
255 
256  return \%combined_report_entries;
257 }
258 
259 =head get_report_entries_for_process_ids()
260 
261  Args: : None
262  Description : Gathers statistics for jobs for a time interval by running sacct.
263  Exception : Dies if command to retrieve stats fails.
264  Returntype : Complex.
265  Caller : Gets called from load_resource_usage.pl
266 
267 =cut
268 
269 sub get_report_entries_for_time_interval {
270  my ($self, $from_time, $to_time, $username) = @_;
271 
272  my $from_timepiece = Time::Piece->strptime($from_time, '%Y-%m-%d %H:%M:%S');
273  $from_time = $from_timepiece->strftime('%Y-%m-%dT%H:%M');
274 
275  my $to_timepiece = Time::Piece->strptime($to_time, '%Y-%m-%d %H:%M:%S') + 2 * ONE_MINUTE;
276  $to_time = $to_timepiece->strftime('%Y-%m-%dT%H:%M');
277 
278  # sacct -s CA,CD,CG,F -S 2018-02-27T16:48 -E 2018-02-27T16:50
279  my $cmd = "sacct -n -p --units=M -s CA,CD,F,OOM --format JobName,JobID,ExitCode,MaxRSS,MaxDiskRead," .
280  "CPUTimeRAW,ElapsedRAW,State,DerivedExitCode,End -S $from_time -E $to_time";
281 
282  # print "DEBUG get_report_entries_for_time_interval() running cmd:\n\t$cmd\n";
283 
284  my $batch_of_report_entries = $self->parse_report_source_line($cmd);
285 
286  return $batch_of_report_entries;
287 }
288 
289 =head parse_report_source_line()
290 
291  Args: : Command ( sacct )
292  Description : Parses the resource counts the number of pending workers of the user
293  Exception : Dies if command to retrieve pending workers fails.
294  Returntype : Href
295 
296 =cut
297 
298 sub parse_report_source_line {
299  my ($self, $bacct_source_line) = @_;
300 
301  my $jnp = $self->job_name_prefix(); # reederj_ehive_rosaprd_278-Hive-
302 
303  my @lines = execute_command($bacct_source_line);
304 
305  my %report_entry = ();
306  my %job_id_to_state;
307 
308  for my $row (@lines) {
309  my @col = split(/\|/, $row);
310 
311  my $job_name = $col[0]; # JobName - for explanation please look at sacct command
312  my $job_id = $col[1]; # JobID
313  my $exit_code = $col[2]; # ExitCode
314  my $mem_used = $col[3]; # MaxRSS in units=M
315  #my $reserved_time = $col[4]; # Reserved / pending time ...
316  my $reserved_time = 0;
317  my $max_disk_read = $col[4]; # MaxDiskRead
318  my $total_cpu = $col[5]; # CPUTimeRAW
319  my $elapsed = $col[6]; # ElapsedRAW
320  my $state = $col[7]; # State
321  my $exception_status = $col[8]; # DerivedExitCode
322  my $endtime = $col[9];
323 
324  my $datetime = DateTime::Format::ISO8601->parse_datetime($endtime);
325 
326  # print "DEBUG: $job_id\t$state\n";
327 
328  # parse the correct state
329  if ($job_name =~ m/$jnp/) {
330  $job_id_to_state{$job_id} = $state;
331  }
332 
333  next unless $job_name =~ m/batch/;
334  $job_id =~ s/\.batch//;
335 
336  $mem_used =~ s/M$//; # results are reported in Megabytes
337  $max_disk_read =~ s/M$//; # results are reported in Megabytes
338 
339  if ($reserved_time =~ /:/) {
340  my $reserved_timepiece = Time::Piece->strptime($reserved_time, '%H:%M:%S');
341  $reserved_time =
342  $reserved_timepiece->hour*3600 +
343  $reserved_timepiece->minute*60 +
344  $reserved_timepiece->second;
345  }
346 
347  # get previously parsed status ( slurm returns 3 rows of statuses which are different ! )
348  my $cause_of_death = get_cause_of_death($job_id_to_state{$job_id});
349  $exception_status = $cause_of_death;
350 
351  $report_entry{$job_id} = {
352  # entries for 'worker' table:
353  'when_died' => $datetime->date . ' ' . $datetime->hms,
354  'cause_of_death' => $cause_of_death,
355 
356  # entries for 'worker_resource_usage' table:
357  'exit_status' => $exit_code,
358  'exception_status' => $exception_status,
359  'mem_megs' => $mem_used, # mem_in_units, returnd by sacct with --units=M
360  'swap_megs' => $max_disk_read, # swap_in_units
361  'pending_sec' => $reserved_time || 0,
362  'cpu_sec' => $total_cpu,
363  'lifespan_sec' => $elapsed,
364  };
365  } ## end for my $row (@$lines)
366  return \%report_entry;
367 } ## end sub parse_report_source_line
368 
369 =head submit_workers_return_meadow_pids()
370 
371  Args: : Command ( sacct )
372  Comment : # Works with Slurm 17.02.9
373  Description : Runs sbatch to submit workers to SLURM
374  Exception : Dies if command to retrieve pending workers fails.
375  Returntype : Href
376 
377 =cut
378 
379 sub submit_workers_return_meadow_pids {
380  my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name,
381  $rc_specific_submission_cmd_args, $submit_log_subdir) = @_;
382 
383  my $job_array_common_name = $self->job_array_common_name($rc_name, $iteration);
384 
385  # Flag if we should submit a job array or not
386  my $array_required = $required_worker_count > 1;
387 
388  my $job_array_spec = "1-${required_worker_count}";
389  my $meadow_specific_submission_cmd_args = $self->config_get('SubmissionOptions');
390 
391  my ($submit_stdout_file, $submit_stderr_file);
392 
393  if ($submit_log_subdir) {
394  $submit_stdout_file = $submit_log_subdir . "/log_${rc_name}_%A_%a.out";
395  $submit_stderr_file = $submit_log_subdir . "/log_${rc_name}_%A_%a.err";
396  } else {
397  $submit_stdout_file = '/dev/null';
398  $submit_stderr_file = '/dev/null';
399  }
400 
401  #No equivalent in sbatch, but can be accomplished with stdbuf -oL -eL
402  #$ENV{'LSB_STDOUT_DIRECT'} = 'y'; # unbuffer the output of the bsub command
403 
404  #Note: job arrays share the same name in slurm and are 0-based, but this may still work
405  my @cmd;
406  if ($array_required eq "1") {
407  # We have to submit a job array - this will change the job ids in slurm to <job_id>_ARRAY
408 
409  @cmd = (
410  'sbatch',
411  '--parsable',
412  '-o', $submit_stdout_file,
413  '-e', $submit_stderr_file,
414  # inform SLURM we submit an ARRAY
415  # jobs submitted with 'sbatch -a ' get different ids back when executing
416  # 'squeue --array -h -u <user> -o '%i|%T|%u|%A' later
417  '-a', $job_array_spec,
418  '-J', $job_array_common_name,
419  split_for_bash($rc_specific_submission_cmd_args),
420  split_for_bash($meadow_specific_submission_cmd_args),
421  '--wrap',
422  $worker_cmd,
423  );
424  } else {
425  @cmd = (
426  'sbatch',
427  '--parsable',
428  '-o', $submit_stdout_file,
429  '-e', $submit_stderr_file,
430  '-J', $job_array_common_name,
431  split_for_bash($rc_specific_submission_cmd_args),
432  split_for_bash($meadow_specific_submission_cmd_args),
433  '--wrap',
434  $worker_cmd,
435  );
436 
437  }
438 
439  print "Executing [ " . $self->name . " ]: " . join(' ', @cmd) . "\n";
440 
441  my @output = execute_command(\@cmd);
442 
443  unless ($output[0] =~ m/(\d+)/) {
444  die "Could not find job id in sbatch output";
445  }
446 
447  my $slurm_job_id = $output[0];
448 
449  if (looks_like_number($slurm_job_id)) {
450  my $return_value;
451 
452  # Modify the return values depending on the submission type: single job vs job array
453  # as these have different indexes:
454  # array jobs: [54729315_1, 54729315_2]
455  # single job: [54728975]
456  if ($array_required) {
457  $return_value = [map {$slurm_job_id . '_' . $_ . ''} (1 .. $required_worker_count)];
458  } else {
459  $return_value = [$slurm_job_id];
460  }
461 
462  return $return_value;
463  } else {
464  die "Unexpected output for sbatch command. Command: '@cmd'; STDOUT: '" . (join "\n", @output) . "'";
465  }
466 } ## end sub submit_workers_return_meadow_pids
467 
468 sub execute_command {
469  # command can be an arrayref or a string
470  my $cmd = shift;
471 
472  # Timeout for slurm commands
473  my $timeout = 120;
474 
475  my ($success, $error_message, undef, $stdout_buf, $stderr_buf) =
476  run(command => $cmd, verbose => 0, timeout => $timeout);
477 
478  my $output;
479  if ($success) {
480  $output = join("", @$stdout_buf);
481  } else {
482  die "Failed to execute command '$cmd'. Error: '$error_message', " .
483  "STDOUT: '@$stdout_buf', STDERR: '@$stderr_buf'";
484  }
485  return wantarray ? split(/\n/, $output // "") : $output;
486 }
487 
488 sub get_current_worker_process_id {
489  my ($self) = @_;
490 
491  my $slurm_jobid = $ENV{'SLURM_JOB_ID'};
492  my $slurm_array_job_id = $ENV{'SLURM_ARRAY_JOB_ID'};
493  my $slurm_array_task_id = $ENV{'SLURM_ARRAY_TASK_ID'};
494 
495  # We have a slurm job
496  if (defined($slurm_jobid)) {
497  # We have an array job
498  if (defined($slurm_array_job_id) and defined($slurm_array_task_id)) {
499  return "$slurm_array_job_id\_$slurm_array_task_id";
500  } else {
501  return $slurm_jobid;
502  }
503  } else {
504  die "Could not establish the process_id";
505  }
506 }
507 
508 sub deregister_local_process {
509  my ($self) = @_;
510 
511  delete $ENV{'SLURM_JOB_ID'};
512  delete $ENV{'SLURM_ARRAY_JOB_ID'};
513  delete $ENV{'SLURM_ARRAY_TASK_ID'};
514 }
515 
516 =head get_cause_of_death()
517 
518  Args: : Slurm job state
519  Description : Translates a SLURM job state into an eHive cause-of-death
520  Exception : None
521  Returntype : String
522 
523 =cut
524 
525 sub get_cause_of_death {
526  my ($state) = @_;
527 
528  my %slurm_status_2_cod = (
529  'TIMEOUT' => 'RUNLIMIT',
530  'FAILED' => 'CONTAMINATED',
531  'OUT_OF_MEMORY' => 'MEMLIMIT',
532  'CANCELLED' => 'KILLED_BY_USER',
533  );
534  my $cod = $slurm_status_2_cod{$state};
535  unless ($cod) {
536  $cod = "UNKNOWN";
537  }
538  return $cod;
539 }
540 
541 1;
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
EnsEMBL
Definition: Filter.pm:1
map
public map()
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::Meadow::SLURM
Definition: SLURM.pm:17
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
Bio::EnsEMBL::Hive::Meadow::job_array_common_name
public job_array_common_name()
run
public run()
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::Worker::process_id
public process_id()