ensembl-hive  2.7.0
SLURM.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  This is the 'SLURM' implementation of an EnsEMBL eHive Meadow.
10 
11 =head1 Compatibility
12 
13  Module version 5.5 is compatible with SLURM version slurm 23.02.7
14 
15 =head1 LICENSE
16 
17  See the NOTICE file distributed with this work for additional information
18  regarding copyright ownership.
19 
20  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
21  You may obtain a copy of the License at
22 
23  http://www.apache.org/licenses/LICENSE-2.0
24 
25  Unless required by applicable law or agreed to in writing, software distributed under the License
26  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27  See the License for the specific language governing permissions and limitations under the License.
28 
29 =head1 CONTACT
30 
31  Please subscribe to the Hive mailing list:
32  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users
33  to discuss Hive-related questions or to be notified of our updates
34 
35 =cut
36 
37 package Bio::EnsEMBL::Hive::Meadow::SLURM;
38 
39 use strict;
40 use warnings;
41 
42 use Bio::EnsEMBL::Hive::Utils qw(split_for_bash timeout);
43 use DateTime::Format::ISO8601;
44 use File::Temp qw(tempdir);
45 use Scalar::Util qw(looks_like_number);
46 use Time::Piece;
47 use Time::Seconds;
48 use IPC::Cmd qw(run);
49 
50 use parent 'Bio::EnsEMBL::Hive::Meadow';
51 
52 # Semantic version of the Meadow interface:
53 # change the Major version whenever an incompatible change is introduced,
54 # change the Minor version whenever the interface is extended, but compatibility is retained.
55 # Module version 5.5 is compatible with Slurm 23.02.7
56 our $VERSION = '5.5';
57 
58 =head name
59 
60  Args: : None
61  Description : Return cluster name
62  Return : "slurm" if available, else undef
63  Returntype : String
64 
65 =cut
66 
67 # For SLURM, a cluster name will only be available when running in a federation,
68 # so we always return "slurm".
69 # Since this is also called to check for availability, assume Slurm is available
70 # if sinfo gives a version and a non-zero node count.
71 sub name {
72  # List the slurm version and the cluster node count like "23.02.7:197"
73  my $sinfo = `sinfo -ho "%v:%D" 2>/dev/null`;
74  $sinfo =~ /^(\d+)(?:\.\d+)*:(\d+)$/;
75  my $slurm_version = $1;
76  my $node_count = $2;
77 
78  if ($slurm_version and $node_count and $slurm_version >= 23 and $node_count > 0) {
79  return "slurm";
80  }
81 }
82 
83 # Override method
84 sub _init_meadow {
85  my ($self, $config) = @_;
86  $self->SUPER::_init_meadow($config);
87 
88  # Override user config. On Slurm, the tmp dirs are always cleaned up. It is
89  # not necessary to set this. To clean up, beekeeper will try an SSH log in
90  # to each node. This does not work and only produces error messages.
91  $config and $config->set('CleanupTempDirectoryKilledWorkers', 0);
92 }
93 
94 
95 =head count_pending_workers_by_rc_name
96 
97  Args: : None
98  Description : Counts the number of pending workers of the user
99  Exception : Dies if command to retrieve pending workers fails.
100  Returntype : String
101 
102 =cut
103 
104 sub count_pending_workers_by_rc_name {
105  my ($self) = @_;
106 
107  my $jnp = $self->job_name_prefix();
108 
109  # Prefix for job is not implemented in Slurm, so need to get all
110  # and parse it out
111  my $cmd = "squeue --array --noheader --me --states='PENDING' --format='%j'";
112 
113  my @output = execute_command($cmd);
114 
115  my %pending_this_meadow_by_rc_name = ();
116  my $total_pending_this_meadow = 0;
117 
118  foreach my $line (@output) {
119  if ($line =~ /\b\Q$jnp\E(\S+)\-\d+(\[\d+\])?\b/) {
120  $pending_this_meadow_by_rc_name{$1}++;
121  $total_pending_this_meadow++;
122  }
123  }
124 
125  return (\%pending_this_meadow_by_rc_name, $total_pending_this_meadow);
126 } ## end sub count_pending_workers_by_rc_name
127 
128 =head count_running_workers
129 
130  Args: : None
131  Description : Counts the number of pending workers of the user
132  Exception : Dies if command to retrieve pending workers fails.
133  Returntype : String
134 
135 =cut
136 
137 sub count_running_workers {
138  my $self = shift;
139 
140  my $jnp = $self->job_name_prefix();
141 
142  my $cmd = "squeue --array --noheader --me --states='RUNNING' --format='%j' | grep ^$jnp | wc -l";
143  my $output = execute_command($cmd);
144 
145  $output =~ /(\d+)/;
146 
147  return $1;
148 }
149 
150 =head status_of_all_our_workers()
151 
152  Args: : None
153  Description : Counts the number of pending workers of the user
154  Exception : Dies if command to retrieve pending workers fails.
155  Returntype : Array [ worker_pid, user, status ]
156 
157 =cut
158 
159 sub status_of_all_our_workers {
160  my $self = shift;
161 
162  my $jnp = $self->job_name_prefix(); # reederj_ehive_rosaprd_278-Hive-
163  my @status_list = ();
164 
165  # PENDING, RUNNING, SUSPENDED, CANCELLED, COMPLETING, COMPLETED, CONFIGURING, FAILED,
166  # TIMEOUT, PREEMPTED, NODE_FAIL, REVOKED and SPECIAL_EXIT
167 
168  my $cmd = "squeue --array --noheader --me --format='%i|%T|%u'";
169 
170  my @output = execute_command($cmd);
171 
172  foreach my $line (@output) {
173  my ($worker_pid, $status, $user) = split(/\|/, $line);
174 
175  # TODO: not exactly sure what these are used for in the external code -
176  # this is based on the LSF status codes that were ignored
177  # Do not count COMPLETED or FAILED jobs.
178  next if (($status eq 'COMPLETED') or ($status eq 'FAILED'));
179 
180  push @status_list, [$worker_pid, $user, $status];
181  }
182  return \@status_list;
183 } ## end sub status_of_all_our_workers
184 
185 =head check_worker_is_alive_and_mine()
186 
188  Description : Checks if a worker is registered under the current logged in user.
189  Exception : Dies if command to retrieve pending workers fails.
190  Returntype : String (value of squeue command )
191 
192 =cut
193 
194 sub check_worker_is_alive_and_mine {
195  my ($self, $worker) = @_;
196 
197  my $wpid = $worker->process_id();
198  my $this_user = $ENV{'USER'};
199 
200  my $cmd = "squeue --noheader --me --job=$wpid";
201 
202  my $output = eval{execute_command($cmd)};
203 
204  return $output ? 1 : 0;
205 }
206 
207 =head kill_worker()
208 
209  Args: : None
210  Description : Counts the number of pending workers of the user
211  Exception : Dies if command to retrieve pending workers fails.
212  Returntype : Array [ worker_pid, user, status ]
213 
214 =cut
215 
216 sub kill_worker {
217  my ($self, $worker, $fast) = @_;
218 
219  my $cmd = 'scancel ' . $worker->process_id();
220  execute_command($cmd);
221 }
222 
223 =head get_report_entries_for_process_ids()
224 
225  Args: : None
226  Description : Gathers stats for a specific set of workers via sacct.
227  Exception : Dies if command to retrieve pending workers fails.
228  Returntype : Complex.
229 
230 =cut
231 
232 sub get_report_entries_for_process_ids {
233  my $self = shift @_; # make sure we get if off the way before splicing
234 
235  my %combined_report_entries = ();
236 
237  while (my $pid_batch = join(',', splice(@_, 0, 20))) {
238  # can't fit too many pids on one shell cmdline
239 
240  # sacct -j 19661,19662,19663
241  # --units=M Display values in specified unit type. [KMGTP]
242  my $cmd =
243  "sacct -n -p --units=M --format JobName,JobID,ExitCode,MaxRSS," .
244  "MaxDiskRead,CPUTimeRAW,ElapsedRAW,State,DerivedExitCode,End " .
245  "-j $pid_batch";
246 
247  # print "DEBUG get_report_entries_for_process_ids() running cmd:\n\t$cmd\n";
248  my $batch_of_report_entries = $self->parse_report_source_line($cmd);
249 
250  %combined_report_entries = (%combined_report_entries, %$batch_of_report_entries);
251  }
252 
253  return \%combined_report_entries;
254 }
255 
256 =head get_report_entries_for_process_ids()
257 
258  Args: : None
259  Description : Gathers statistics for jobs for a time interval by running sacct.
260  Exception : Dies if command to retrieve stats fails.
261  Returntype : Complex.
262  Caller : Gets called from load_resource_usage.pl
263 
264 =cut
265 
266 sub get_report_entries_for_time_interval {
267  my ($self, $from_time, $to_time, $username) = @_;
268 
269  my $from_timepiece = Time::Piece->strptime($from_time, '%Y-%m-%d %H:%M:%S');
270  $from_time = $from_timepiece->strftime('%Y-%m-%dT%H:%M');
271 
272  my $to_timepiece = Time::Piece->strptime($to_time, '%Y-%m-%d %H:%M:%S') + 2 * ONE_MINUTE;
273  $to_time = $to_timepiece->strftime('%Y-%m-%dT%H:%M');
274 
275  # sacct -s CA,CD,CG,F -S 2018-02-27T16:48 -E 2018-02-27T16:50
276  my $cmd = "sacct -n -p --units=M -s CA,CD,F,OOM --format JobName,JobID,ExitCode,MaxRSS,MaxDiskRead," .
277  "CPUTimeRAW,ElapsedRAW,State,DerivedExitCode,End -S $from_time -E $to_time";
278 
279  # print "DEBUG get_report_entries_for_time_interval() running cmd:\n\t$cmd\n";
280 
281  my $batch_of_report_entries = $self->parse_report_source_line($cmd);
282 
283  return $batch_of_report_entries;
284 }
285 
286 =head parse_report_source_line()
287 
288  Args: : Command ( sacct )
289  Description : Parses the resource counts the number of pending workers of the user
290  Exception : Dies if command to retrieve pending workers fails.
291  Returntype : Href
292 
293 =cut
294 
295 sub parse_report_source_line {
296  my ($self, $bacct_source_line) = @_;
297 
298  my $jnp = $self->job_name_prefix(); # reederj_ehive_rosaprd_278-Hive-
299 
300  my @lines = execute_command($bacct_source_line);
301 
302  my %report_entry = ();
303  my %job_id_to_state;
304 
305  for my $row (@lines) {
306  my @col = split(/\|/, $row);
307 
308  my $job_name = $col[0]; # JobName - for explanation please look at sacct command
309  my $job_id = $col[1]; # JobID
310  my $exit_code = $col[2]; # ExitCode
311  my $mem_used = $col[3]; # MaxRSS in units=M
312  #my $reserved_time = $col[4]; # Reserved / pending time ...
313  my $reserved_time = 0;
314  my $max_disk_read = $col[4]; # MaxDiskRead
315  my $total_cpu = $col[5]; # CPUTimeRAW
316  my $elapsed = $col[6]; # ElapsedRAW
317  my $state = $col[7]; # State
318  my $exception_status = $col[8]; # DerivedExitCode
319  my $endtime = $col[9];
320 
321  my $datetime = DateTime::Format::ISO8601->parse_datetime($endtime);
322 
323  # print "DEBUG: $job_id\t$state\n";
324 
325  # parse the correct state
326  if ($job_name =~ m/$jnp/) {
327  $job_id_to_state{$job_id} = $state;
328  }
329 
330  next unless $job_name =~ m/batch/;
331  $job_id =~ s/\.batch//;
332 
333  $mem_used =~ s/M$//; # results are reported in Megabytes
334  $max_disk_read =~ s/M$//; # results are reported in Megabytes
335 
336  if ($reserved_time =~ /:/) {
337  my $reserved_timepiece = Time::Piece->strptime($reserved_time, '%H:%M:%S');
338  $reserved_time =
339  $reserved_timepiece->hour*3600 +
340  $reserved_timepiece->minute*60 +
341  $reserved_timepiece->second;
342  }
343 
344  # get previously parsed status ( slurm returns 3 rows of statuses which are different ! )
345  my $cause_of_death = get_cause_of_death($job_id_to_state{$job_id});
346  $exception_status = $cause_of_death;
347 
348  $report_entry{$job_id} = {
349  # entries for 'worker' table:
350  'when_died' => $datetime->date . ' ' . $datetime->hms,
351  'cause_of_death' => $cause_of_death,
352 
353  # entries for 'worker_resource_usage' table:
354  'exit_status' => $exit_code,
355  'exception_status' => $exception_status,
356  'mem_megs' => $mem_used, # mem_in_units, returnd by sacct with --units=M
357  'swap_megs' => $max_disk_read, # swap_in_units
358  'pending_sec' => $reserved_time || 0,
359  'cpu_sec' => $total_cpu,
360  'lifespan_sec' => $elapsed,
361  };
362  } ## end for my $row (@$lines)
363  return \%report_entry;
364 } ## end sub parse_report_source_line
365 
366 =head submit_workers_return_meadow_pids()
367 
368  Args: : Command ( sacct )
369  Comment : # Works with Slurm 17.02.9
370  Description : Runs sbatch to submit workers to SLURM
371  Exception : Dies if command to retrieve pending workers fails.
372  Returntype : Href
373 
374 =cut
375 
376 sub submit_workers_return_meadow_pids {
377  my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name,
378  $rc_specific_submission_cmd_args, $submit_log_subdir) = @_;
379 
380  my $job_array_common_name = $self->job_array_common_name($rc_name, $iteration);
381 
382  # Flag if we should submit a job array or not
383  my $array_required = $required_worker_count > 1;
384 
385  my $job_array_spec = "1-${required_worker_count}";
386  my $meadow_specific_submission_cmd_args = $self->config_get('SubmissionOptions');
387 
388  my ($submit_stdout_file, $submit_stderr_file);
389 
390  if ($submit_log_subdir) {
391  $submit_stdout_file = $submit_log_subdir . "/log_${rc_name}_%A_%a.out";
392  $submit_stderr_file = $submit_log_subdir . "/log_${rc_name}_%A_%a.err";
393  } else {
394  $submit_stdout_file = '/dev/null';
395  $submit_stderr_file = '/dev/null';
396  }
397 
398  #No equivalent in sbatch, but can be accomplished with stdbuf -oL -eL
399  #$ENV{'LSB_STDOUT_DIRECT'} = 'y'; # unbuffer the output of the bsub command
400 
401  #Note: job arrays share the same name in slurm and are 0-based, but this may still work
402  my @cmd;
403  if ($array_required eq "1") {
404  # We have to submit a job array - this will change the job ids in slurm to <job_id>_ARRAY
405 
406  @cmd = (
407  'sbatch',
408  '--parsable',
409  '-o', $submit_stdout_file,
410  '-e', $submit_stderr_file,
411  # inform SLURM we submit an ARRAY
412  # jobs submitted with 'sbatch -a ' get different ids back when executing
413  # 'squeue --array -h -u <user> -o '%i|%T|%u|%A' later
414  '-a', $job_array_spec,
415  '-J', $job_array_common_name,
416  split_for_bash($rc_specific_submission_cmd_args),
417  split_for_bash($meadow_specific_submission_cmd_args),
418  '--wrap',
419  $worker_cmd,
420  );
421  } else {
422  @cmd = (
423  'sbatch',
424  '--parsable',
425  '-o', $submit_stdout_file,
426  '-e', $submit_stderr_file,
427  '-J', $job_array_common_name,
428  split_for_bash($rc_specific_submission_cmd_args),
429  split_for_bash($meadow_specific_submission_cmd_args),
430  '--wrap',
431  $worker_cmd,
432  );
433 
434  }
435 
436  print "Executing [ " . $self->name . " ]: " . join(' ', @cmd) . "\n";
437 
438  my @output = execute_command(\@cmd);
439 
440  unless ($output[0] =~ m/(\d+)/) {
441  die "Could not find job id in sbatch output";
442  }
443 
444  my $slurm_job_id = $output[0];
445 
446  if (looks_like_number($slurm_job_id)) {
447  my $return_value;
448 
449  # Modify the return values depending on the submission type: single job vs job array
450  # as these have different indexes:
451  # array jobs: [54729315_1, 54729315_2]
452  # single job: [54728975]
453  if ($array_required) {
454  $return_value = [map {$slurm_job_id . '_' . $_ . ''} (1 .. $required_worker_count)];
455  } else {
456  $return_value = [$slurm_job_id];
457  }
458 
459  return $return_value;
460  } else {
461  die "Unexpected output for sbatch command. Command: '@cmd'; STDOUT: '" . (join "\n", @output) . "'";
462  }
463 } ## end sub submit_workers_return_meadow_pids
464 
465 sub execute_command {
466  # command can be an arrayref or a string
467  my $cmd = shift;
468 
469  # Timeout for slurm commands
470  my $timeout = 120;
471 
472  my ($success, $error_message, undef, $stdout_buf, $stderr_buf) =
473  run(command => $cmd, verbose => 0, timeout => $timeout);
474 
475  my $output;
476  if ($success) {
477  $output = join("", @$stdout_buf);
478  } else {
479  die "Failed to execute command '$cmd'. Error: '$error_message', " .
480  "STDOUT: '@$stdout_buf', STDERR: '@$stderr_buf'";
481  }
482  return wantarray ? split(/\n/, $output // "") : $output;
483 }
484 
485 sub get_current_worker_process_id {
486  my ($self) = @_;
487 
488  my $slurm_jobid = $ENV{'SLURM_JOB_ID'};
489  my $slurm_array_job_id = $ENV{'SLURM_ARRAY_JOB_ID'};
490  my $slurm_array_task_id = $ENV{'SLURM_ARRAY_TASK_ID'};
491 
492  # We have a slurm job
493  if (defined($slurm_jobid)) {
494  # We have an array job
495  if (defined($slurm_array_job_id) and defined($slurm_array_task_id)) {
496  return "$slurm_array_job_id\_$slurm_array_task_id";
497  } else {
498  return $slurm_jobid;
499  }
500  } else {
501  die "Could not establish the process_id";
502  }
503 }
504 
505 sub deregister_local_process {
506  my ($self) = @_;
507 
508  delete $ENV{'SLURM_JOB_ID'};
509  delete $ENV{'SLURM_ARRAY_JOB_ID'};
510  delete $ENV{'SLURM_ARRAY_TASK_ID'};
511 }
512 
513 =head get_cause_of_death()
514 
515  Args: : Slurm job state
516  Description : Translates a SLURM job state into an eHive cause-of-death
517  Exception : None
518  Returntype : String
519 
520 =cut
521 
522 sub get_cause_of_death {
523  my ($state) = @_;
524 
525  my %slurm_status_2_cod = (
526  'TIMEOUT' => 'RUNLIMIT',
527  'FAILED' => 'CONTAMINATED',
528  'OUT_OF_MEMORY' => 'MEMLIMIT',
529  'CANCELLED' => 'KILLED_BY_USER',
530  );
531  my $cod = $slurm_status_2_cod{$state};
532  unless ($cod) {
533  $cod = "UNKNOWN";
534  }
535  return $cod;
536 }
537 
538 1;
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
EnsEMBL
Definition: Filter.pm:1
map
public map()
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::Meadow::SLURM
Definition: SLURM.pm:17
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
Bio::EnsEMBL::Hive::Meadow::job_array_common_name
public job_array_common_name()
run
public run()
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::Worker::process_id
public process_id()