9 This is the
'SLURM' implementation of an
EnsEMBL eHive Meadow.
13 Module version 5.5 is compatible with SLURM version slurm 23.02.7
17 See the NOTICE file distributed with
this work
for additional information
18 regarding copyright ownership.
20 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
21 You may obtain a copy of the License at
25 Unless required by applicable law or agreed to in writing, software distributed under the License
26 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27 See the License
for the specific language governing permissions and limitations under the License.
31 Please subscribe to the
Hive mailing list:
33 to discuss
Hive-related questions or to be notified of our updates
37 package Bio::EnsEMBL::Hive::Meadow::SLURM;
43 use DateTime::Format::ISO8601;
44 use File::Temp qw(tempdir);
45 use Scalar::Util qw(looks_like_number);
50 use parent
'Bio::EnsEMBL::Hive::Meadow';
52 # Semantic version of the Meadow interface:
53 # change the Major version whenever an incompatible change is introduced,
54 # change the Minor version whenever the interface is extended, but compatibility is retained.
55 # Module version 5.5 is compatible with Slurm 23.02.7
61 Description : Return cluster name
62 Return :
"slurm" if available,
else undef
67 # For SLURM, a cluster name will only be available when running in a federation,
68 # so we always return "slurm".
69 # Since this is also called to check for availability, assume Slurm is available
70 # if sinfo gives a version and a non-zero node count.
72 # List the slurm version and the cluster node count like "23.02.7:197"
73 my $sversion = `sinfo -V 2>/dev/
null`;
74 $sversion =~ /^slurm (\d+)(?:\.\d+)*$/i;
75 my $slurm_version = $1;
77 my $sinfo = `sinfo -ho
"%D" 2>/dev/
null`;
81 if ($slurm_version and $node_count and $slurm_version >= 23 and $node_count > 0) {
88 my ($self, $config) = @_;
89 $self->SUPER::_init_meadow($config);
91 # Override user config. On Slurm, the tmp dirs are always cleaned up. It is
92 # not necessary to set this. To clean up, beekeeper will try an SSH log in
93 # to each node. This does not work and only produces error messages.
94 $config and $config->set(
'CleanupTempDirectoryKilledWorkers', 0);
98 =head count_pending_workers_by_rc_name
101 Description : Counts the number of pending workers of the user
102 Exception : Dies
if command to retrieve pending workers fails.
107 sub count_pending_workers_by_rc_name {
110 my $jnp = $self->job_name_prefix();
112 # Prefix for job is not implemented in Slurm, so need to get all
114 my $cmd =
"squeue --array --noheader --me --states='PENDING' --format='%j'";
116 my @output = execute_command($cmd);
118 my %pending_this_meadow_by_rc_name = ();
119 my $total_pending_this_meadow = 0;
121 foreach my $line (@output) {
122 if ($line =~ /\b\Q$jnp\E(\S+)\-\d+(\[\d+\])?\b/) {
123 $pending_this_meadow_by_rc_name{$1}++;
124 $total_pending_this_meadow++;
128 return (\%pending_this_meadow_by_rc_name, $total_pending_this_meadow);
129 } ## end sub count_pending_workers_by_rc_name
131 =head count_running_workers
134 Description : Counts the number of pending workers of the user
135 Exception : Dies
if command to retrieve pending workers fails.
140 sub count_running_workers {
143 my $jnp = $self->job_name_prefix();
145 my $cmd =
"squeue --array --noheader --me --states='RUNNING' --format='%j' | grep ^$jnp | wc -l";
146 my $output = execute_command($cmd);
153 =head status_of_all_our_workers()
156 Description : Counts the number of pending workers of the user
157 Exception : Dies
if command to retrieve pending workers fails.
158 Returntype : Array [ worker_pid, user, status ]
162 sub status_of_all_our_workers {
165 my $jnp = $self->job_name_prefix(); # reederj_ehive_rosaprd_278-
Hive-
166 my @status_list = ();
168 # PENDING, RUNNING, SUSPENDED, CANCELLED, COMPLETING, COMPLETED, CONFIGURING, FAILED,
169 # TIMEOUT, PREEMPTED, NODE_FAIL, REVOKED and SPECIAL_EXIT
171 my $cmd =
"squeue --array --noheader --me --format='%i|%T|%u'";
173 my @output = execute_command($cmd);
175 foreach my $line (@output) {
176 my ($worker_pid, $status, $user) = split(/\|/, $line);
178 # TODO: not exactly sure what these are used for in the external code -
179 # this is based on the LSF status codes that were ignored
180 # Do not count COMPLETED or FAILED jobs.
181 next
if (($status eq
'COMPLETED') or ($status eq
'FAILED'));
183 push @status_list, [$worker_pid, $user, $status];
185 return \@status_list;
186 } ## end sub status_of_all_our_workers
188 =head check_worker_is_alive_and_mine()
191 Description : Checks
if a worker is registered under the current logged in user.
192 Exception : Dies
if command to retrieve pending workers fails.
193 Returntype : String (value of squeue command )
197 sub check_worker_is_alive_and_mine {
198 my ($self, $worker) = @_;
201 my $this_user = $ENV{
'USER'};
203 my $cmd =
"squeue --noheader --me --job=$wpid";
205 my $output = eval{execute_command($cmd)};
207 return $output ? 1 : 0;
213 Description : Counts the number of pending workers of the user
214 Exception : Dies
if command to retrieve pending workers fails.
215 Returntype : Array [ worker_pid, user, status ]
220 my ($self, $worker, $fast) = @_;
222 my $cmd =
'scancel ' . $worker->process_id();
223 execute_command($cmd);
226 =head get_report_entries_for_process_ids()
229 Description : Gathers stats
for a specific set of workers via sacct.
230 Exception : Dies
if command to retrieve pending workers fails.
231 Returntype : Complex.
235 sub get_report_entries_for_process_ids {
236 my $self = shift @_; # make sure we get
if off the way before splicing
238 my %combined_report_entries = ();
240 while (my $pid_batch = join(
',', splice(@_, 0, 20))) {
241 # can't fit too many pids on one shell cmdline
243 # sacct -j 19661,19662,19663
244 # --units=M Display values in specified unit type. [KMGTP]
246 "sacct -n -p --units=M --format JobName,JobID,ExitCode,MaxRSS," .
247 "MaxDiskRead,CPUTimeRAW,ElapsedRAW,State,DerivedExitCode,End " .
250 # print "DEBUG get_report_entries_for_process_ids() running cmd:\n\t$cmd\n";
251 my $batch_of_report_entries = $self->parse_report_source_line($cmd);
253 %combined_report_entries = (%combined_report_entries, %$batch_of_report_entries);
256 return \%combined_report_entries;
259 =head get_report_entries_for_process_ids()
262 Description : Gathers statistics
for jobs
for a time interval by running sacct.
263 Exception : Dies
if command to retrieve stats fails.
264 Returntype : Complex.
265 Caller : Gets called from load_resource_usage.pl
269 sub get_report_entries_for_time_interval {
270 my ($self, $from_time, $to_time, $username) = @_;
272 my $from_timepiece = Time::Piece->strptime($from_time,
'%Y-%m-%d %H:%M:%S');
273 $from_time = $from_timepiece->strftime(
'%Y-%m-%dT%H:%M');
275 my $to_timepiece = Time::Piece->strptime($to_time,
'%Y-%m-%d %H:%M:%S') + 2 * ONE_MINUTE;
276 $to_time = $to_timepiece->strftime(
'%Y-%m-%dT%H:%M');
278 # sacct -s CA,CD,CG,F -S 2018-02-27T16:48 -E 2018-02-27T16:50
279 my $cmd =
"sacct -n -p --units=M -s CA,CD,F,OOM --format JobName,JobID,ExitCode,MaxRSS,MaxDiskRead," .
280 "CPUTimeRAW,ElapsedRAW,State,DerivedExitCode,End -S $from_time -E $to_time";
282 # print "DEBUG get_report_entries_for_time_interval() running cmd:\n\t$cmd\n";
284 my $batch_of_report_entries = $self->parse_report_source_line($cmd);
286 return $batch_of_report_entries;
289 =head parse_report_source_line()
291 Args: : Command ( sacct )
292 Description : Parses the resource counts the number of pending workers of the user
293 Exception : Dies
if command to retrieve pending workers fails.
298 sub parse_report_source_line {
299 my ($self, $bacct_source_line) = @_;
301 my $jnp = $self->job_name_prefix(); # reederj_ehive_rosaprd_278-
Hive-
303 my @lines = execute_command($bacct_source_line);
305 my %report_entry = ();
308 for my $row (@lines) {
309 my @col = split(/\|/, $row);
311 my $job_name = $col[0]; # JobName -
for explanation please look at sacct command
312 my $job_id = $col[1]; # JobID
313 my $exit_code = $col[2]; # ExitCode
314 my $mem_used = $col[3]; # MaxRSS in units=M
315 #my $reserved_time = $col[4]; # Reserved / pending time ...
316 my $reserved_time = 0;
317 my $max_disk_read = $col[4]; # MaxDiskRead
318 my $total_cpu = $col[5]; # CPUTimeRAW
319 my $elapsed = $col[6]; # ElapsedRAW
320 my $state = $col[7]; # State
321 my $exception_status = $col[8]; # DerivedExitCode
322 my $endtime = $col[9];
324 my $datetime = DateTime::Format::ISO8601->parse_datetime($endtime);
326 # print "DEBUG: $job_id\t$state\n";
328 # parse the correct state
329 if ($job_name =~ m/$jnp/) {
330 $job_id_to_state{$job_id} = $state;
333 next unless $job_name =~ m/batch/;
337 $max_disk_read =~ s/M$
339 if ($reserved_time =~ /:/) {
340 my $reserved_timepiece = Time::Piece->strptime($reserved_time,
'%H:%M:%S');
342 $reserved_timepiece->hour*3600 +
343 $reserved_timepiece->minute*60 +
344 $reserved_timepiece->second;
347 # get previously parsed status ( slurm returns 3 rows of statuses which are different ! )
348 my $cause_of_death = get_cause_of_death($job_id_to_state{$job_id});
349 $exception_status = $cause_of_death;
351 $report_entry{$job_id} = {
352 # entries for 'worker' table:
353 'when_died' => $datetime->date .
' ' . $datetime->hms,
354 'cause_of_death' => $cause_of_death,
356 # entries for 'worker_resource_usage' table:
357 'exit_status' => $exit_code,
358 'exception_status' => $exception_status,
359 'mem_megs' => $mem_used, # mem_in_units, returnd by sacct with --units=M
360 'swap_megs' => $max_disk_read, # swap_in_units
361 'pending_sec' => $reserved_time || 0,
362 'cpu_sec' => $total_cpu,
363 'lifespan_sec' => $elapsed,
365 } ## end
for my $row (@$lines)
366 return \%report_entry;
367 } ## end sub parse_report_source_line
369 =head submit_workers_return_meadow_pids()
371 Args: : Command ( sacct )
372 Comment : # Works with Slurm 17.02.9
373 Description : Runs sbatch to submit workers to
SLURM
374 Exception : Dies
if command to retrieve pending workers fails.
379 sub submit_workers_return_meadow_pids {
380 my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name,
381 $rc_specific_submission_cmd_args, $submit_log_subdir) = @_;
385 # Flag if we should submit a job array or not
386 my $array_required = $required_worker_count > 1;
388 my $job_array_spec =
"1-${required_worker_count}";
389 my $meadow_specific_submission_cmd_args = $self->config_get(
'SubmissionOptions');
391 my ($submit_stdout_file, $submit_stderr_file);
393 if ($submit_log_subdir) {
394 $submit_stdout_file = $submit_log_subdir .
"/log_${rc_name}_%A_%a.out";
395 $submit_stderr_file = $submit_log_subdir .
"/log_${rc_name}_%A_%a.err";
397 $submit_stdout_file =
'/dev/null';
398 $submit_stderr_file =
'/dev/null';
401 #No equivalent in sbatch, but can be accomplished with stdbuf -oL -eL
402 #$ENV{'LSB_STDOUT_DIRECT'} = 'y'; # unbuffer the output of the bsub command
404 #Note: job arrays share the same name in slurm and are 0-based, but this may still work
406 if ($array_required eq
"1") {
407 # We have to submit a job array - this will change the job ids in slurm to <job_id>_ARRAY
412 '-o', $submit_stdout_file,
413 '-e', $submit_stderr_file,
414 # inform SLURM we submit an ARRAY
415 # jobs submitted with 'sbatch -a ' get different ids back when executing
416 # 'squeue --array -h -u <user> -o '%i|%T|%u|%A' later
417 '-a', $job_array_spec,
418 '-J', $job_array_common_name,
419 split_for_bash($rc_specific_submission_cmd_args),
420 split_for_bash($meadow_specific_submission_cmd_args),
428 '-o', $submit_stdout_file,
429 '-e', $submit_stderr_file,
430 '-J', $job_array_common_name,
431 split_for_bash($rc_specific_submission_cmd_args),
432 split_for_bash($meadow_specific_submission_cmd_args),
439 print
"Executing [ " . $self->name .
" ]: " . join(
' ', @cmd) .
"\n";
441 my @output = execute_command(\@cmd);
443 unless ($output[0] =~ m/(\d+)/) {
444 die
"Could not find job id in sbatch output";
447 my $slurm_job_id = $output[0];
449 if (looks_like_number($slurm_job_id)) {
452 # Modify the return values depending on the submission type: single job vs job array
453 # as these have different indexes:
454 # array jobs: [54729315_1, 54729315_2]
455 # single job: [54728975]
456 if ($array_required) {
457 $return_value = [
map {$slurm_job_id .
'_' . $_ .
''} (1 .. $required_worker_count)];
459 $return_value = [$slurm_job_id];
462 return $return_value;
464 die
"Unexpected output for sbatch command. Command: '@cmd'; STDOUT: '" . (join
"\n", @output) .
"'";
466 } ## end sub submit_workers_return_meadow_pids
468 sub execute_command {
469 # command can be an arrayref or a string
472 # Timeout for slurm commands
475 my ($success, $error_message, undef, $stdout_buf, $stderr_buf) =
476 run(command => $cmd, verbose => 0, timeout => $timeout);
480 $output = join(
"", @$stdout_buf);
482 die
"Failed to execute command '$cmd'. Error: '$error_message', " .
483 "STDOUT: '@$stdout_buf', STDERR: '@$stderr_buf'";
485 return wantarray ? split(/\n/, $output
488 sub get_current_worker_process_id {
491 my $slurm_jobid = $ENV{
'SLURM_JOB_ID'};
492 my $slurm_array_job_id = $ENV{
'SLURM_ARRAY_JOB_ID'};
493 my $slurm_array_task_id = $ENV{
'SLURM_ARRAY_TASK_ID'};
495 # We have a slurm job
496 if (defined($slurm_jobid)) {
497 # We have an array job
498 if (defined($slurm_array_job_id) and defined($slurm_array_task_id)) {
499 return "$slurm_array_job_id\_$slurm_array_task_id";
504 die
"Could not establish the process_id";
508 sub deregister_local_process {
511 delete $ENV{
'SLURM_JOB_ID'};
512 delete $ENV{
'SLURM_ARRAY_JOB_ID'};
513 delete $ENV{
'SLURM_ARRAY_TASK_ID'};
516 =head get_cause_of_death()
518 Args: : Slurm job state
519 Description : Translates a
SLURM job state into an eHive cause-of-death
525 sub get_cause_of_death {
528 my %slurm_status_2_cod = (
529 'TIMEOUT' =>
'RUNLIMIT',
530 'FAILED' =>
'CONTAMINATED',
531 'OUT_OF_MEMORY' =>
'MEMLIMIT',
532 'CANCELLED' =>
'KILLED_BY_USER',
534 my $cod = $slurm_status_2_cod{$state};