9 This is the
'SLURM' implementation of an
EnsEMBL eHive Meadow.
13 Module version 5.5 is compatible with SLURM version slurm 23.02.7
17 See the NOTICE file distributed with
this work
for additional information
18 regarding copyright ownership.
20 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
21 You may obtain a copy of the License at
25 Unless required by applicable law or agreed to in writing, software distributed under the License
26 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27 See the License
for the specific language governing permissions and limitations under the License.
31 Please subscribe to the
Hive mailing list:
33 to discuss
Hive-related questions or to be notified of our updates
37 package Bio::EnsEMBL::Hive::Meadow::SLURM;
43 use DateTime::Format::ISO8601;
44 use File::Temp qw(tempdir);
45 use Scalar::Util qw(looks_like_number);
50 use parent
'Bio::EnsEMBL::Hive::Meadow';
52 # Semantic version of the Meadow interface:
53 # change the Major version whenever an incompatible change is introduced,
54 # change the Minor version whenever the interface is extended, but compatibility is retained.
55 # Module version 5.5 is compatible with Slurm 23.02.7
61 Description : Return cluster name
62 Return :
"slurm" if available,
else undef
67 # For SLURM, a cluster name will only be available when running in a federation,
68 # so we always return "slurm".
69 # Since this is also called to check for availability, assume Slurm is available
70 # if sinfo gives a version and a non-zero node count.
72 # List the slurm version and the cluster node count like "23.02.7:197"
73 my $sinfo = `sinfo -ho
"%v:%D" 2>/dev/
null`;
74 $sinfo =~ /^(\d+)(?:\.\d+)*:(\d+)$/;
75 my $slurm_version = $1;
78 if ($slurm_version and $node_count and $slurm_version >= 23 and $node_count > 0) {
85 my ($self, $config) = @_;
86 $self->SUPER::_init_meadow($config);
88 # Override user config. On Slurm, the tmp dirs are always cleaned up. It is
89 # not necessary to set this. To clean up, beekeeper will try an SSH log in
90 # to each node. This does not work and only produces error messages.
91 $config and $config->set(
'CleanupTempDirectoryKilledWorkers', 0);
95 =head count_pending_workers_by_rc_name
98 Description : Counts the number of pending workers of the user
99 Exception : Dies
if command to retrieve pending workers fails.
104 sub count_pending_workers_by_rc_name {
107 my $jnp = $self->job_name_prefix();
109 # Prefix for job is not implemented in Slurm, so need to get all
111 my $cmd =
"squeue --array --noheader --me --states='PENDING' --format='%j'";
113 my @output = execute_command($cmd);
115 my %pending_this_meadow_by_rc_name = ();
116 my $total_pending_this_meadow = 0;
118 foreach my $line (@output) {
119 if ($line =~ /\b\Q$jnp\E(\S+)\-\d+(\[\d+\])?\b/) {
120 $pending_this_meadow_by_rc_name{$1}++;
121 $total_pending_this_meadow++;
125 return (\%pending_this_meadow_by_rc_name, $total_pending_this_meadow);
126 } ## end sub count_pending_workers_by_rc_name
128 =head count_running_workers
131 Description : Counts the number of pending workers of the user
132 Exception : Dies
if command to retrieve pending workers fails.
137 sub count_running_workers {
140 my $jnp = $self->job_name_prefix();
142 my $cmd =
"squeue --array --noheader --me --states='RUNNING' --format='%j' | grep ^$jnp | wc -l";
143 my $output = execute_command($cmd);
150 =head status_of_all_our_workers()
153 Description : Counts the number of pending workers of the user
154 Exception : Dies
if command to retrieve pending workers fails.
155 Returntype : Array [ worker_pid, user, status ]
159 sub status_of_all_our_workers {
162 my $jnp = $self->job_name_prefix(); # reederj_ehive_rosaprd_278-
Hive-
163 my @status_list = ();
165 # PENDING, RUNNING, SUSPENDED, CANCELLED, COMPLETING, COMPLETED, CONFIGURING, FAILED,
166 # TIMEOUT, PREEMPTED, NODE_FAIL, REVOKED and SPECIAL_EXIT
168 my $cmd =
"squeue --array --noheader --me --format='%i|%T|%u'";
170 my @output = execute_command($cmd);
172 foreach my $line (@output) {
173 my ($worker_pid, $status, $user) = split(/\|/, $line);
175 # TODO: not exactly sure what these are used for in the external code -
176 # this is based on the LSF status codes that were ignored
177 # Do not count COMPLETED or FAILED jobs.
178 next
if (($status eq
'COMPLETED') or ($status eq
'FAILED'));
180 push @status_list, [$worker_pid, $user, $status];
182 return \@status_list;
183 } ## end sub status_of_all_our_workers
185 =head check_worker_is_alive_and_mine()
188 Description : Checks
if a worker is registered under the current logged in user.
189 Exception : Dies
if command to retrieve pending workers fails.
190 Returntype : String (value of squeue command )
194 sub check_worker_is_alive_and_mine {
195 my ($self, $worker) = @_;
198 my $this_user = $ENV{
'USER'};
200 my $cmd =
"squeue --noheader --me --job=$wpid";
202 my $output = eval{execute_command($cmd)};
204 return $output ? 1 : 0;
210 Description : Counts the number of pending workers of the user
211 Exception : Dies
if command to retrieve pending workers fails.
212 Returntype : Array [ worker_pid, user, status ]
217 my ($self, $worker, $fast) = @_;
219 my $cmd =
'scancel ' . $worker->process_id();
220 execute_command($cmd);
223 =head get_report_entries_for_process_ids()
226 Description : Gathers stats
for a specific set of workers via sacct.
227 Exception : Dies
if command to retrieve pending workers fails.
228 Returntype : Complex.
232 sub get_report_entries_for_process_ids {
233 my $self = shift @_; # make sure we get
if off the way before splicing
235 my %combined_report_entries = ();
237 while (my $pid_batch = join(
',', splice(@_, 0, 20))) {
238 # can't fit too many pids on one shell cmdline
240 # sacct -j 19661,19662,19663
241 # --units=M Display values in specified unit type. [KMGTP]
243 "sacct -n -p --units=M --format JobName,JobID,ExitCode,MaxRSS," .
244 "MaxDiskRead,CPUTimeRAW,ElapsedRAW,State,DerivedExitCode,End " .
247 # print "DEBUG get_report_entries_for_process_ids() running cmd:\n\t$cmd\n";
248 my $batch_of_report_entries = $self->parse_report_source_line($cmd);
250 %combined_report_entries = (%combined_report_entries, %$batch_of_report_entries);
253 return \%combined_report_entries;
256 =head get_report_entries_for_process_ids()
259 Description : Gathers statistics
for jobs
for a time interval by running sacct.
260 Exception : Dies
if command to retrieve stats fails.
261 Returntype : Complex.
262 Caller : Gets called from load_resource_usage.pl
266 sub get_report_entries_for_time_interval {
267 my ($self, $from_time, $to_time, $username) = @_;
269 my $from_timepiece = Time::Piece->strptime($from_time,
'%Y-%m-%d %H:%M:%S');
270 $from_time = $from_timepiece->strftime(
'%Y-%m-%dT%H:%M');
272 my $to_timepiece = Time::Piece->strptime($to_time,
'%Y-%m-%d %H:%M:%S') + 2 * ONE_MINUTE;
273 $to_time = $to_timepiece->strftime(
'%Y-%m-%dT%H:%M');
275 # sacct -s CA,CD,CG,F -S 2018-02-27T16:48 -E 2018-02-27T16:50
276 my $cmd =
"sacct -n -p --units=M -s CA,CD,F,OOM --format JobName,JobID,ExitCode,MaxRSS,MaxDiskRead," .
277 "CPUTimeRAW,ElapsedRAW,State,DerivedExitCode,End -S $from_time -E $to_time";
279 # print "DEBUG get_report_entries_for_time_interval() running cmd:\n\t$cmd\n";
281 my $batch_of_report_entries = $self->parse_report_source_line($cmd);
283 return $batch_of_report_entries;
286 =head parse_report_source_line()
288 Args: : Command ( sacct )
289 Description : Parses the resource counts the number of pending workers of the user
290 Exception : Dies
if command to retrieve pending workers fails.
295 sub parse_report_source_line {
296 my ($self, $bacct_source_line) = @_;
298 my $jnp = $self->job_name_prefix(); # reederj_ehive_rosaprd_278-
Hive-
300 my @lines = execute_command($bacct_source_line);
302 my %report_entry = ();
305 for my $row (@lines) {
306 my @col = split(/\|/, $row);
308 my $job_name = $col[0]; # JobName -
for explanation please look at sacct command
309 my $job_id = $col[1]; # JobID
310 my $exit_code = $col[2]; # ExitCode
311 my $mem_used = $col[3]; # MaxRSS in units=M
312 #my $reserved_time = $col[4]; # Reserved / pending time ...
313 my $reserved_time = 0;
314 my $max_disk_read = $col[4]; # MaxDiskRead
315 my $total_cpu = $col[5]; # CPUTimeRAW
316 my $elapsed = $col[6]; # ElapsedRAW
317 my $state = $col[7]; # State
318 my $exception_status = $col[8]; # DerivedExitCode
319 my $endtime = $col[9];
321 my $datetime = DateTime::Format::ISO8601->parse_datetime($endtime);
323 # print "DEBUG: $job_id\t$state\n";
325 # parse the correct state
326 if ($job_name =~ m/$jnp/) {
327 $job_id_to_state{$job_id} = $state;
330 next unless $job_name =~ m/batch/;
334 $max_disk_read =~ s/M$
336 if ($reserved_time =~ /:/) {
337 my $reserved_timepiece = Time::Piece->strptime($reserved_time,
'%H:%M:%S');
339 $reserved_timepiece->hour*3600 +
340 $reserved_timepiece->minute*60 +
341 $reserved_timepiece->second;
344 # get previously parsed status ( slurm returns 3 rows of statuses which are different ! )
345 my $cause_of_death = get_cause_of_death($job_id_to_state{$job_id});
346 $exception_status = $cause_of_death;
348 $report_entry{$job_id} = {
349 # entries for 'worker' table:
350 'when_died' => $datetime->date .
' ' . $datetime->hms,
351 'cause_of_death' => $cause_of_death,
353 # entries for 'worker_resource_usage' table:
354 'exit_status' => $exit_code,
355 'exception_status' => $exception_status,
356 'mem_megs' => $mem_used, # mem_in_units, returnd by sacct with --units=M
357 'swap_megs' => $max_disk_read, # swap_in_units
358 'pending_sec' => $reserved_time || 0,
359 'cpu_sec' => $total_cpu,
360 'lifespan_sec' => $elapsed,
362 } ## end
for my $row (@$lines)
363 return \%report_entry;
364 } ## end sub parse_report_source_line
366 =head submit_workers_return_meadow_pids()
368 Args: : Command ( sacct )
369 Comment : # Works with Slurm 17.02.9
370 Description : Runs sbatch to submit workers to
SLURM
371 Exception : Dies
if command to retrieve pending workers fails.
376 sub submit_workers_return_meadow_pids {
377 my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name,
378 $rc_specific_submission_cmd_args, $submit_log_subdir) = @_;
382 # Flag if we should submit a job array or not
383 my $array_required = $required_worker_count > 1;
385 my $job_array_spec =
"1-${required_worker_count}";
386 my $meadow_specific_submission_cmd_args = $self->config_get(
'SubmissionOptions');
388 my ($submit_stdout_file, $submit_stderr_file);
390 if ($submit_log_subdir) {
391 $submit_stdout_file = $submit_log_subdir .
"/log_${rc_name}_%A_%a.out";
392 $submit_stderr_file = $submit_log_subdir .
"/log_${rc_name}_%A_%a.err";
394 $submit_stdout_file =
'/dev/null';
395 $submit_stderr_file =
'/dev/null';
398 #No equivalent in sbatch, but can be accomplished with stdbuf -oL -eL
399 #$ENV{'LSB_STDOUT_DIRECT'} = 'y'; # unbuffer the output of the bsub command
401 #Note: job arrays share the same name in slurm and are 0-based, but this may still work
403 if ($array_required eq
"1") {
404 # We have to submit a job array - this will change the job ids in slurm to <job_id>_ARRAY
409 '-o', $submit_stdout_file,
410 '-e', $submit_stderr_file,
411 # inform SLURM we submit an ARRAY
412 # jobs submitted with 'sbatch -a ' get different ids back when executing
413 # 'squeue --array -h -u <user> -o '%i|%T|%u|%A' later
414 '-a', $job_array_spec,
415 '-J', $job_array_common_name,
416 split_for_bash($rc_specific_submission_cmd_args),
417 split_for_bash($meadow_specific_submission_cmd_args),
425 '-o', $submit_stdout_file,
426 '-e', $submit_stderr_file,
427 '-J', $job_array_common_name,
428 split_for_bash($rc_specific_submission_cmd_args),
429 split_for_bash($meadow_specific_submission_cmd_args),
436 print
"Executing [ " . $self->name .
" ]: " . join(
' ', @cmd) .
"\n";
438 my @output = execute_command(\@cmd);
440 unless ($output[0] =~ m/(\d+)/) {
441 die
"Could not find job id in sbatch output";
444 my $slurm_job_id = $output[0];
446 if (looks_like_number($slurm_job_id)) {
449 # Modify the return values depending on the submission type: single job vs job array
450 # as these have different indexes:
451 # array jobs: [54729315_1, 54729315_2]
452 # single job: [54728975]
453 if ($array_required) {
454 $return_value = [
map {$slurm_job_id .
'_' . $_ .
''} (1 .. $required_worker_count)];
456 $return_value = [$slurm_job_id];
459 return $return_value;
461 die
"Unexpected output for sbatch command. Command: '@cmd'; STDOUT: '" . (join
"\n", @output) .
"'";
463 } ## end sub submit_workers_return_meadow_pids
465 sub execute_command {
466 # command can be an arrayref or a string
469 # Timeout for slurm commands
472 my ($success, $error_message, undef, $stdout_buf, $stderr_buf) =
473 run(command => $cmd, verbose => 0, timeout => $timeout);
477 $output = join(
"", @$stdout_buf);
479 die
"Failed to execute command '$cmd'. Error: '$error_message', " .
480 "STDOUT: '@$stdout_buf', STDERR: '@$stderr_buf'";
482 return wantarray ? split(/\n/, $output
485 sub get_current_worker_process_id {
488 my $slurm_jobid = $ENV{
'SLURM_JOB_ID'};
489 my $slurm_array_job_id = $ENV{
'SLURM_ARRAY_JOB_ID'};
490 my $slurm_array_task_id = $ENV{
'SLURM_ARRAY_TASK_ID'};
492 # We have a slurm job
493 if (defined($slurm_jobid)) {
494 # We have an array job
495 if (defined($slurm_array_job_id) and defined($slurm_array_task_id)) {
496 return "$slurm_array_job_id\_$slurm_array_task_id";
501 die
"Could not establish the process_id";
505 sub deregister_local_process {
508 delete $ENV{
'SLURM_JOB_ID'};
509 delete $ENV{
'SLURM_ARRAY_JOB_ID'};
510 delete $ENV{
'SLURM_ARRAY_TASK_ID'};
513 =head get_cause_of_death()
515 Args: : Slurm job state
516 Description : Translates a
SLURM job state into an eHive cause-of-death
522 sub get_cause_of_death {
525 my %slurm_status_2_cod = (
526 'TIMEOUT' =>
'RUNLIMIT',
527 'FAILED' =>
'CONTAMINATED',
528 'OUT_OF_MEMORY' =>
'MEMLIMIT',
529 'CANCELLED' =>
'KILLED_BY_USER',
531 my $cod = $slurm_status_2_cod{$state};