9 This is the
'LSF' implementation of Meadow
15 =item
LSF being temporarily unavailable
17 We should probably implement a method
using IPC::Open3 (see Bio::EnsEMBL::Compara::Utils::RunCommand)
18 that captures stderr and can parse stdout on the fly.
19 Depending on the
Meadow method, we should either retry say 1 minute later, or
20 return something like undef to tell the caller that no operation was done.
22 Beekeeper : loop #15 ======================================================
23 GarbageCollector: Checking
for lost Workers...
24 GarbageCollector: [
Queen:] out of 20 Workers that haven
't checked in during the last 5 seconds...
25 GarbageCollector: [LSF/EBI Meadow:] LOST:20
27 GarbageCollector: Discovered 20 lost LSF Workers
28 LSF::parse_report_source_line( "bacct -f - -l '4126850[15]
' '4126850[6]
' '4126835[24]
' '4126850[33]
' '4126835[10]
' '4126835[39]
' '4126850[23]
' '4126835[3]
' '4126835[19]
' '4126835[31]
' '4126835[40]
' '4126835[41]
' '4126850[5]
' '4126850[41]
' '4126850[2]
' '4126850[3]
' '4126835[5]
' '4126835[33]
' '4126850[7]
' '4126850[42]
'" )
29 ls_getclustername(): Slave LIM configuration is not ready yet. Please give file name.
30 Could not read from 'bacct -f - -l
'4126850[15]' '4126850[6]' '4126835[24]' '4126850[33]' '4126835[10]' '4126835[39]' '4126850[23]' '4126835[3]' '4126835[19]' '4126835[31]' '4126835[40]' '4126835[41]' '4126850[5]' '4126850[41]' '4126850[2]' '4126850[3]' '4126835[5]' '4126835[33]' '4126850[7]' '4126850[42]''. Received the error 255
36 See the NOTICE file distributed with this work for additional information
37 regarding copyright ownership.
39 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
40 You may obtain a copy of the License at
42 http://www.apache.org/licenses/LICENSE-2.0
44 Unless required by applicable law or agreed to in writing, software distributed under the License
45 is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
46 See the License for the specific language governing permissions and limitations under the License.
50 Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
55 package Bio::EnsEMBL::Hive::Meadow::LSF;
62 use Bio::EnsEMBL::Hive::Utils ('split_for_bash
', 'whoami
');
67 our $VERSION = '5.2
'; # Semantic version of the Meadow interface:
68 # change the Major version whenever an incompatible change is introduced,
69 # change the Minor version whenever the interface is extended, but compatibility is retained.
74 Description : Determine the LSF cluster_name, if an LSF meadow is available.
80 my $re_lsf_names = qr/(IBM Spectrum LSF|Platform LSF|openlava project)/;
81 my $re_cluster_name = qr/^My cluster name is\s+(\S+)/;
82 my @lsid_out = `lsid 2>/dev/null`;
85 foreach my $lsid_line (@lsid_out) {
86 if ($lsid_line =~ $re_lsf_names) {
88 } elsif ($lsid_line =~ $re_cluster_name) {
95 sub get_current_worker_process_id {
98 my $lsb_jobid = $ENV{'LSB_JOBID
'};
99 my $lsb_jobindex = $ENV{'LSB_JOBINDEX
'};
101 if(defined($lsb_jobid) and defined($lsb_jobindex)) {
102 if($lsb_jobindex>0) {
103 return "$lsb_jobid\[$lsb_jobindex\]";
108 die "Could not establish the process_id";
113 sub deregister_local_process {
116 delete $ENV{'LSB_JOBID
'};
117 delete $ENV{'LSB_JOBINDEX
'};
121 sub status_of_all_our_workers { # returns an arrayref
123 my $meadow_users_of_interest = shift @_;
125 $meadow_users_of_interest = [ 'all
' ] unless ($meadow_users_of_interest && scalar(@$meadow_users_of_interest));
127 my $jnp = $self->job_name_prefix();
129 my @status_list = ();
131 foreach my $meadow_user (@$meadow_users_of_interest) {
132 my $cmd = "bjobs -w -u $meadow_user 2>/dev/null";
134 # warn "LSF::status_of_all_our_workers() running cmd:\n\t$cmd\n";
136 foreach my $line (`$cmd`) {
137 my ($group_pid, $user, $status, $queue, $submission_host, $running_host, $job_name) = split(/\s+/, $line);
139 # skip the header line and jobs that are done
140 next if(($group_pid eq 'JOBID
') or ($status eq 'DONE
') or ($status eq 'EXIT
'));
142 # skip the hive jobs that belong to another pipeline
143 next if (($job_name =~ /Hive-/) and (index($job_name, $jnp) != 0));
145 my $worker_pid = $group_pid;
146 if($job_name=~/(\[\d+\])$/ and $worker_pid!~/\[\d+\]$/) { # account for the difference in LSF 9.1.1.1 vs LSF 9.1.2.0 bjobs' output
149 push @status_list, [$worker_pid, $user, $status];
153 return \@status_list;
157 sub check_worker_is_alive_and_mine {
158 my ($self, $worker) = @_;
160 my $wpid = $worker->process_id();
161 my $this_user = whoami();
162 my $cmd = qq{bjobs -u $this_user $wpid 2>&1};
164 my @bjobs_out = qx/$cmd/;
165 # warn "LSF::check_worker_is_alive_and_mine() running cmd:\n\t$cmd\n";
167 my $is_alive_and_mine = 0;
168 foreach my $bjobs_line (@bjobs_out) {
169 unless ($bjobs_line =~ /JOBID|DONE|EXIT/) { # *SUSP, UNKWN, and ZOMBI are
"alive" for the purposes of
this call
170 # which is typically used to see if the process can be killed.
171 # Can't search for line containing the job id, since it may be
172 # formatted differently in bjob output than in $worker->process_id()
173 # (e.g. for array jobs), so we exclude the header by excluding "JOBID"
174 $is_alive_and_mine = 1;
177 return $is_alive_and_mine;
182 my ($self, $worker, $fast) = @_;
186 $exec_status = system(
'bkill',
'-r', $worker->process_id());
188 $exec_status = system(
'bkill', $worker->process_id());
191 return ( $exec_status >> 8 );
195 sub _convert_to_datetime { # a
private subroutine that can recover missing year from an incomplete date and then transforms it into SQL
's datetime for storage
196 my ($weekday, $yearless, $real_year) = @_;
199 my $datetime = Time::Piece->strptime("$yearless $real_year", '%b %d %T %Y
');
200 return $datetime->date.' '.$datetime->hms;
202 my $curr_year = Time::Piece->new->year();
205 while ($years_back < 28) { # The Gregorian calendar repeats every 28 years
206 my $candidate_year = $curr_year - $years_back;
207 my $datetime = Time::Piece->strptime("$yearless $candidate_year", '%b %d %T %Y
');
208 if($datetime->wdayname eq $weekday) {
209 return $datetime->date.' '.$datetime->hms;
215 return; # could not guess the year
219 sub parse_report_source_line {
220 my ($self, $bacct_source_line) = @_;
222 print "LSF::parse_report_source_line( \"$bacct_source_line\" )\n";
224 # Conplete list of exit codes is available at
225 # https://www.ibm.com/support/knowledgecenter/SSETD4_9.1.3/lsf_admin/termination_reasons_lsf.html
227 'TERM_MEMLIMIT
' => 'MEMLIMIT
',
228 'TERM_RUNLIMIT
' => 'RUNLIMIT
',
229 'TERM_OWNER
' => 'KILLED_BY_USER
', # bkill (wait until it dies)
230 'TERM_FORCE_OWNER
' => 'KILLED_BY_USER
', # bkill -r (quick remove)
231 'TERM_BUCKET_KILL
' => 'KILLED_BY_USER
', # bkill -b (kills large numbers of jobs as soon as possible)
232 'TERM_REQUEUE_OWNER
'=> 'KILLED_BY_USER
', # Job killed and requeued by owner
242 local $/ = "------------------------------------------------------------------------------\n\n";
243 open(my $bacct_fh, '-|
', $bacct_source_line);
244 my $record = <$bacct_fh>; # skip the header
246 my %report_entry = ();
248 for my $record (<$bacct_fh>) {
251 # warn "RECORD:\n$record";
253 my @lines = split(/\n/, $record);
254 if( my ($process_id) = $lines[0]=~/^Job <(\d+(?:\[\d+\])?)>/) {
256 my ($exit_status, $exception_status) = ('' x 2);
257 my ($when_born, $meadow_host);
258 my ($when_died, $cause_of_death);
260 my $line_has_key_values = 0;
262 if( /^(\w+)\s+(\w+\s+\d+\s+\d+:\d+:\d+)(?:\s+(\d{4}))?:\s+(?:\[\d+\]\s+)?[Dd]ispatched to\s<([\w\-\.]+)>/ ) {
263 $when_born = _convert_to_datetime($1, $2, $3);
266 elsif( /^(\w+)\s+(\w+\s+\d+\s+\d+:\d+:\d+)(?:\s+(\d{4}))?:\s+Completed\s<(\w+)>(?:\.|;\s+(\w+))/ ) {
267 $when_died = _convert_to_datetime($1, $2, $3);
268 $cause_of_death = $5 && ($status_2_cod{$5} || 'SEE_EXIT_STATUS
');
269 $exit_status = $4 . ($5 ? "/$5" : '');
271 elsif(/^\s*EXCEPTION STATUS:\s*(.*?)\s*$/) {
272 $exception_status = $1;
273 $exception_status =~s/\s+/;/g;
276 @keys = split(/\s+/, ' '.$_);
277 $line_has_key_values = 1;
279 elsif($line_has_key_values) {
280 @values = split(/\s+/, ' '.$_);
281 $line_has_key_values = 0;
285 my %usage; @usage{@keys} = @values;
287 #warn join(',
', map {sprintf('%s=%s
', $_, $usage{$_})} (sort keys %usage)), "\n";
289 my ($mem_in_units, $mem_unit) = $usage{'MEM
'} =~ /^([\d\.]+)([KMGT])$/;
290 my ($swap_in_units, $swap_unit) = $usage{'SWAP
'} =~ /^([\d\.]+)([KMGT])$/;
292 $report_entry{ $process_id } = {
293 # entries for 'worker
' table:
294 'meadow_host
' => $meadow_host,
295 'when_born
' => $when_born,
296 'when_died
' => $when_died,
297 'cause_of_death
' => $cause_of_death,
299 # entries for 'worker_resource_usage
' table:
300 'exit_status
' => $exit_status,
301 'exception_status
' => $exception_status,
302 'mem_megs
' => $mem_in_units * $units_2_megs{$mem_unit},
303 'swap_megs
' => $swap_in_units * $units_2_megs{$swap_unit},
304 'pending_sec
' => $usage{'WAIT
'},
305 'cpu_sec
' => $usage{'CPU_T
'},
306 'lifespan_sec
' => $usage{'TURNAROUND
'},
312 die "Could not read from '$bacct_source_line
'. Received the error $exit\n" if $exit;
314 return \%report_entry;
318 sub get_report_entries_for_process_ids {
319 my $self = shift @_; # make sure we get if off the way before splicing
321 my %combined_report_entries = ();
323 unless ($self->config_get('AccountingDisabled
')) {
324 while (my $pid_batch = join(' ', map { "'$_
'" } splice(@_, 0, 20))) { # can't fit too many pids on one shell cmdline
325 my $bacct_opts = $self->config_get(
'BacctExtraOptions') ||
"";
326 my $cmd =
"bacct $bacct_opts -l $pid_batch";
328 # warn "LSF::get_report_entries_for_process_ids() running cmd:\n\t$cmd\n";
330 my $batch_of_report_entries = $self->parse_report_source_line( $cmd );
332 %combined_report_entries = (%combined_report_entries, %$batch_of_report_entries);
336 return \%combined_report_entries;
340 sub get_report_entries_for_time_interval {
341 my ($self, $from_time, $to_time, $username) = @_;
343 my $batch_of_report_entries = {};
345 unless ($self->config_get(
'AccountingDisabled')) {
346 my $from_timepiece = Time::Piece->strptime($from_time,
'%Y-%m-%d %H:%M:%S');
347 $from_time = $from_timepiece->strftime(
'%Y/%m/%d/%H:%M');
349 my $to_timepiece = Time::Piece->strptime($to_time,
'%Y-%m-%d %H:%M:%S') + 2*ONE_MINUTE;
350 $to_time = $to_timepiece->strftime(
'%Y/%m/%d/%H:%M');
352 my $bacct_opts = $self->config_get(
'BacctExtraOptions') ||
"";
353 my $cmd =
"bacct $bacct_opts -l -C $from_time,$to_time ".($username ?
"-u $username" :
'');
355 # warn "LSF::get_report_entries_for_time_interval() running cmd:\n\t$cmd\n";
357 $batch_of_report_entries = $self->parse_report_source_line( $cmd );
360 return $batch_of_report_entries;
364 sub submit_workers_return_meadow_pids {
365 my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name, $rc_specific_submission_cmd_args, $submit_log_subdir) = @_;
367 my $job_array_common_name = $self->job_array_common_name($rc_name, $iteration);
368 my $array_required = $required_worker_count > 1;
369 my $job_array_name_with_indices = $job_array_common_name . ($array_required ?
"[1-${required_worker_count}]" :
'');
370 my $meadow_specific_submission_cmd_args = $self->config_get(
'SubmissionOptions');
372 my ($submit_stdout_file, $submit_stderr_file);
374 if($submit_log_subdir) {
375 $submit_stdout_file = $submit_log_subdir .
"/log_${rc_name}_%J_%I.out";
376 $submit_stderr_file = $submit_log_subdir .
"/log_${rc_name}_%J_%I.err";
378 $submit_stdout_file =
'/dev/null';
379 $submit_stderr_file =
'/dev/null';
382 $ENV{
'LSB_STDOUT_DIRECT'} =
'y'; # unbuffer the output of the bsub command
385 '-o', $submit_stdout_file,
386 '-e', $submit_stderr_file,
387 '-J', $job_array_name_with_indices,
388 split_for_bash($rc_specific_submission_cmd_args),
389 split_for_bash($meadow_specific_submission_cmd_args),
393 print
"Executing [ ".$self->signature.
" ] \t\t".join(
' ', @cmd).
"\n";
397 open(my $bsub_output_fh,
"-|", @cmd) || die
"Could not submit job(s): $!, $?"; # let
's abort the beekeeper and let the user check the syntax
398 while(my $line = <$bsub_output_fh>) {
399 if($line=~/^Job <(\d+)> is submitted to/) {
402 warn $line; # assuming it is a temporary blockage that might resolve itself with time
405 close $bsub_output_fh;
408 return ($array_required ? [ map { $lsf_jobid.'[
'.$_.']
' } (1..$required_worker_count) ] : [ $lsf_jobid ]);
410 die "Submission unsuccessful\n";