ensembl-hive  2.8.1
LSF.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  This is the 'LSF' implementation of Meadow
10 
11 =head1 TODO
12 
13 =over
14 
15 =item LSF being temporarily unavailable
16 
17 We should probably implement a method using IPC::Open3 (see Bio::EnsEMBL::Compara::Utils::RunCommand)
18 that captures stderr and can parse stdout on the fly.
19 Depending on the Meadow method, we should either retry say 1 minute later, or
20 return something like undef to tell the caller that no operation was done.
21 
22  Beekeeper : loop #15 ======================================================
23  GarbageCollector: Checking for lost Workers...
24  GarbageCollector: [Queen:] out of 20 Workers that haven't checked in during the last 5 seconds...
25  GarbageCollector: [LSF/EBI Meadow:] LOST:20
26 
27  GarbageCollector: Discovered 20 lost LSF Workers
28  LSF::parse_report_source_line( "bacct -f - -l '4126850[15]' '4126850[6]' '4126835[24]' '4126850[33]' '4126835[10]' '4126835[39]' '4126850[23]' '4126835[3]' '4126835[19]' '4126835[31]' '4126835[40]' '4126835[41]' '4126850[5]' '4126850[41]' '4126850[2]' '4126850[3]' '4126835[5]' '4126835[33]' '4126850[7]' '4126850[42]'" )
29  ls_getclustername(): Slave LIM configuration is not ready yet. Please give file name.
30  Could not read from 'bacct -f - -l '4126850[15]' '4126850[6]' '4126835[24]' '4126850[33]' '4126835[10]' '4126835[39]' '4126850[23]' '4126835[3]' '4126835[19]' '4126835[31]' '4126835[40]' '4126835[41]' '4126850[5]' '4126850[41]' '4126850[2]' '4126850[3]' '4126835[5]' '4126835[33]' '4126850[7]' '4126850[42]''. Received the error 255
31 
32 =back
33 
34 =head1 LICENSE
35 
36  See the NOTICE file distributed with this work for additional information
37  regarding copyright ownership.
38 
39  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
40  You may obtain a copy of the License at
41 
42  http://www.apache.org/licenses/LICENSE-2.0
43 
44  Unless required by applicable law or agreed to in writing, software distributed under the License
45  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
46  See the License for the specific language governing permissions and limitations under the License.
47 
48 =head1 CONTACT
49 
50  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
51 
52 =cut
53 
54 
55 package Bio::EnsEMBL::Hive::Meadow::LSF;
56 
57 use strict;
58 use warnings;
59 use Time::Piece;
60 use Time::Seconds;
61 
62 use Bio::EnsEMBL::Hive::Utils ('split_for_bash', 'whoami');
63 
64 use base ('Bio::EnsEMBL::Hive::Meadow');
65 
66 
67 our $VERSION = '5.2'; # Semantic version of the Meadow interface:
68  # change the Major version whenever an incompatible change is introduced,
69  # change the Minor version whenever the interface is extended, but compatibility is retained.
70 
71 =head2 name
72 
73  Args: : None
74  Description : Determine the LSF cluster_name, if an LSF meadow is available.
75  Returntype : String
76 
77 =cut
78 
79 sub name {
80  my $re_lsf_names = qr/(IBM Spectrum LSF|Platform LSF|openlava project)/;
81  my $re_cluster_name = qr/^My cluster name is\s+(\S+)/;
82  my @lsid_out = `lsid 2>/dev/null`;
83 
84  my $is_lsf = 0;
85  foreach my $lsid_line (@lsid_out) {
86  if ($lsid_line =~ $re_lsf_names) {
87  $is_lsf = 1;
88  } elsif ($lsid_line =~ $re_cluster_name) {
89  return $1 if $is_lsf;
90  }
91  }
92 }
93 
94 
95 sub get_current_worker_process_id {
96  my ($self) = @_;
97 
98  my $lsb_jobid = $ENV{'LSB_JOBID'};
99  my $lsb_jobindex = $ENV{'LSB_JOBINDEX'};
100 
101  if(defined($lsb_jobid) and defined($lsb_jobindex)) {
102  if($lsb_jobindex>0) {
103  return "$lsb_jobid\[$lsb_jobindex\]";
104  } else {
105  return $lsb_jobid;
106  }
107  } else {
108  die "Could not establish the process_id";
109  }
110 }
111 
112 
113 sub deregister_local_process {
114  my ($self) = @_;
115 
116  delete $ENV{'LSB_JOBID'};
117  delete $ENV{'LSB_JOBINDEX'};
118 }
119 
120 
121 sub status_of_all_our_workers { # returns an arrayref
122  my $self = shift @_;
123  my $meadow_users_of_interest = shift @_;
124 
125  $meadow_users_of_interest = [ 'all' ] unless ($meadow_users_of_interest && scalar(@$meadow_users_of_interest));
126 
127  my $jnp = $self->job_name_prefix();
128 
129  my @status_list = ();
130 
131  foreach my $meadow_user (@$meadow_users_of_interest) {
132  my $cmd = "bjobs -w -u $meadow_user 2>/dev/null";
133 
134 # warn "LSF::status_of_all_our_workers() running cmd:\n\t$cmd\n";
135 
136  foreach my $line (`$cmd`) {
137  my ($group_pid, $user, $status, $queue, $submission_host, $running_host, $job_name) = split(/\s+/, $line);
138 
139  # skip the header line and jobs that are done
140  next if(($group_pid eq 'JOBID') or ($status eq 'DONE') or ($status eq 'EXIT'));
141 
142  # skip the hive jobs that belong to another pipeline
143  next if (($job_name =~ /Hive-/) and (index($job_name, $jnp) != 0));
144 
145  my $worker_pid = $group_pid;
146  if($job_name=~/(\[\d+\])$/ and $worker_pid!~/\[\d+\]$/) { # account for the difference in LSF 9.1.1.1 vs LSF 9.1.2.0 bjobs' output
147  $worker_pid .= $1;
148  }
149  push @status_list, [$worker_pid, $user, $status];
150  }
151  }
152 
153  return \@status_list;
154 }
155 
156 
157 sub check_worker_is_alive_and_mine {
158  my ($self, $worker) = @_;
159 
160  my $wpid = $worker->process_id();
161  my $this_user = whoami();
162  my $cmd = qq{bjobs -u $this_user $wpid 2>&1};
163 
164  my @bjobs_out = qx/$cmd/;
165 # warn "LSF::check_worker_is_alive_and_mine() running cmd:\n\t$cmd\n";
166 
167  my $is_alive_and_mine = 0;
168  foreach my $bjobs_line (@bjobs_out) {
169  unless ($bjobs_line =~ /JOBID|DONE|EXIT/) { # *SUSP, UNKWN, and ZOMBI are "alive" for the purposes of this call
170  # which is typically used to see if the process can be killed.
171  # Can't search for line containing the job id, since it may be
172  # formatted differently in bjob output than in $worker->process_id()
173  # (e.g. for array jobs), so we exclude the header by excluding "JOBID"
174  $is_alive_and_mine = 1;
175  }
176  }
177  return $is_alive_and_mine;
178 }
179 
180 
181 sub kill_worker {
182  my ($self, $worker, $fast) = @_;
183 
184  my $exec_status;
185  if ($fast) {
186  $exec_status = system('bkill', '-r', $worker->process_id());
187  } else {
188  $exec_status = system('bkill', $worker->process_id());
189  }
190 
191  return ( $exec_status >> 8 );
192 }
193 
194 
195 sub _convert_to_datetime { # a private subroutine that can recover missing year from an incomplete date and then transforms it into SQL's datetime for storage
196  my ($weekday, $yearless, $real_year) = @_;
197 
198  if($real_year) {
199  my $datetime = Time::Piece->strptime("$yearless $real_year", '%b %d %T %Y');
200  return $datetime->date.' '.$datetime->hms;
201  } else {
202  my $curr_year = Time::Piece->new->year();
203 
204  my $years_back = 0;
205  while ($years_back < 28) { # The Gregorian calendar repeats every 28 years
206  my $candidate_year = $curr_year - $years_back;
207  my $datetime = Time::Piece->strptime("$yearless $candidate_year", '%b %d %T %Y');
208  if($datetime->wdayname eq $weekday) {
209  return $datetime->date.' '.$datetime->hms;
210  }
211  $years_back++;
212  }
213  }
214 
215  return; # could not guess the year
216 }
217 
218 
219 sub parse_report_source_line {
220  my ($self, $bacct_source_line) = @_;
221 
222  print "LSF::parse_report_source_line( \"$bacct_source_line\" )\n";
223 
224  # Conplete list of exit codes is available at
225  # https://www.ibm.com/support/knowledgecenter/SSETD4_9.1.3/lsf_admin/termination_reasons_lsf.html
226  my %status_2_cod = (
227  'TERM_MEMLIMIT' => 'MEMLIMIT',
228  'TERM_RUNLIMIT' => 'RUNLIMIT',
229  'TERM_OWNER' => 'KILLED_BY_USER', # bkill (wait until it dies)
230  'TERM_FORCE_OWNER' => 'KILLED_BY_USER', # bkill -r (quick remove)
231  'TERM_BUCKET_KILL' => 'KILLED_BY_USER', # bkill -b (kills large numbers of jobs as soon as possible)
232  'TERM_REQUEUE_OWNER'=> 'KILLED_BY_USER', # Job killed and requeued by owner
233  );
234 
235  my %units_2_megs = (
236  'K' => 1.0/1024,
237  'M' => 1,
238  'G' => 1024,
239  'T' => 1024*1024,
240  );
241 
242  local $/ = "------------------------------------------------------------------------------\n\n";
243  open(my $bacct_fh, '-|', $bacct_source_line);
244  my $record = <$bacct_fh>; # skip the header
245 
246  my %report_entry = ();
247 
248  for my $record (<$bacct_fh>) {
249  chomp $record;
250 
251  # warn "RECORD:\n$record";
252 
253  my @lines = split(/\n/, $record);
254  if( my ($process_id) = $lines[0]=~/^Job <(\d+(?:\[\d+\])?)>/) {
255 
256  my ($exit_status, $exception_status) = ('' x 2);
257  my ($when_born, $meadow_host);
258  my ($when_died, $cause_of_death);
259  my (@keys, @values);
260  my $line_has_key_values = 0;
261  foreach (@lines) {
262  if( /^(\w+)\s+(\w+\s+\d+\s+\d+:\d+:\d+)(?:\s+(\d{4}))?:\s+(?:\[\d+\]\s+)?[Dd]ispatched to\s<([\w\-\.]+)>/ ) {
263  $when_born = _convert_to_datetime($1, $2, $3);
264  $meadow_host = $4;
265  }
266  elsif( /^(\w+)\s+(\w+\s+\d+\s+\d+:\d+:\d+)(?:\s+(\d{4}))?:\s+Completed\s<(\w+)>(?:\.|;\s+(\w+))/ ) {
267  $when_died = _convert_to_datetime($1, $2, $3);
268  $cause_of_death = $5 && ($status_2_cod{$5} || 'SEE_EXIT_STATUS');
269  $exit_status = $4 . ($5 ? "/$5" : '');
270  }
271  elsif(/^\s*EXCEPTION STATUS:\s*(.*?)\s*$/) {
272  $exception_status = $1;
273  $exception_status =~s/\s+/;/g;
274  }
275  elsif(/^\s*CPU_T/) {
276  @keys = split(/\s+/, ' '.$_);
277  $line_has_key_values = 1;
278  }
279  elsif($line_has_key_values) {
280  @values = split(/\s+/, ' '.$_);
281  $line_has_key_values = 0;
282  }
283  }
284 
285  my %usage; @usage{@keys} = @values;
286 
287  #warn join(', ', map {sprintf('%s=%s', $_, $usage{$_})} (sort keys %usage)), "\n";
288 
289  my ($mem_in_units, $mem_unit) = $usage{'MEM'} =~ /^([\d\.]+)([KMGT])$/;
290  my ($swap_in_units, $swap_unit) = $usage{'SWAP'} =~ /^([\d\.]+)([KMGT])$/;
291 
292  $report_entry{ $process_id } = {
293  # entries for 'worker' table:
294  'meadow_host' => $meadow_host,
295  'when_born' => $when_born,
296  'when_died' => $when_died,
297  'cause_of_death' => $cause_of_death,
298 
299  # entries for 'worker_resource_usage' table:
300  'exit_status' => $exit_status,
301  'exception_status' => $exception_status,
302  'mem_megs' => $mem_in_units * $units_2_megs{$mem_unit},
303  'swap_megs' => $swap_in_units * $units_2_megs{$swap_unit},
304  'pending_sec' => $usage{'WAIT'},
305  'cpu_sec' => $usage{'CPU_T'},
306  'lifespan_sec' => $usage{'TURNAROUND'},
307  };
308  }
309  }
310  close $bacct_fh;
311  my $exit = $? >> 8;
312  die "Could not read from '$bacct_source_line'. Received the error $exit\n" if $exit;
313 
314  return \%report_entry;
315 }
316 
317 
318 sub get_report_entries_for_process_ids {
319  my $self = shift @_; # make sure we get if off the way before splicing
320 
321  my %combined_report_entries = ();
322 
323  unless ($self->config_get('AccountingDisabled')) {
324  while (my $pid_batch = join(' ', map { "'$_'" } splice(@_, 0, 20))) { # can't fit too many pids on one shell cmdline
325  my $bacct_opts = $self->config_get('BacctExtraOptions') || "";
326  my $cmd = "bacct $bacct_opts -l $pid_batch";
327 
328 # warn "LSF::get_report_entries_for_process_ids() running cmd:\n\t$cmd\n";
329 
330  my $batch_of_report_entries = $self->parse_report_source_line( $cmd );
331 
332  %combined_report_entries = (%combined_report_entries, %$batch_of_report_entries);
333  }
334  }
335 
336  return \%combined_report_entries;
337 }
338 
339 
340 sub get_report_entries_for_time_interval {
341  my ($self, $from_time, $to_time, $username) = @_;
342 
343  my $batch_of_report_entries = {};
344 
345  unless ($self->config_get('AccountingDisabled')) {
346  my $from_timepiece = Time::Piece->strptime($from_time, '%Y-%m-%d %H:%M:%S');
347  $from_time = $from_timepiece->strftime('%Y/%m/%d/%H:%M');
348 
349  my $to_timepiece = Time::Piece->strptime($to_time, '%Y-%m-%d %H:%M:%S') + 2*ONE_MINUTE;
350  $to_time = $to_timepiece->strftime('%Y/%m/%d/%H:%M');
351 
352  my $bacct_opts = $self->config_get('BacctExtraOptions') || "";
353  my $cmd = "bacct $bacct_opts -l -C $from_time,$to_time ".($username ? "-u $username" : '');
354 
355 # warn "LSF::get_report_entries_for_time_interval() running cmd:\n\t$cmd\n";
356 
357  $batch_of_report_entries = $self->parse_report_source_line( $cmd );
358  }
359 
360  return $batch_of_report_entries;
361 }
362 
363 
364 sub submit_workers_return_meadow_pids {
365  my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name, $rc_specific_submission_cmd_args, $submit_log_subdir) = @_;
366 
367  my $job_array_common_name = $self->job_array_common_name($rc_name, $iteration);
368  my $array_required = $required_worker_count > 1;
369  my $job_array_name_with_indices = $job_array_common_name . ($array_required ? "[1-${required_worker_count}]" : '');
370  my $meadow_specific_submission_cmd_args = $self->config_get('SubmissionOptions');
371 
372  my ($submit_stdout_file, $submit_stderr_file);
373 
374  if($submit_log_subdir) {
375  $submit_stdout_file = $submit_log_subdir . "/log_${rc_name}_%J_%I.out";
376  $submit_stderr_file = $submit_log_subdir . "/log_${rc_name}_%J_%I.err";
377  } else {
378  $submit_stdout_file = '/dev/null';
379  $submit_stderr_file = '/dev/null';
380  }
381 
382  $ENV{'LSB_STDOUT_DIRECT'} = 'y'; # unbuffer the output of the bsub command
383 
384  my @cmd = ('bsub',
385  '-o', $submit_stdout_file,
386  '-e', $submit_stderr_file,
387  '-J', $job_array_name_with_indices,
388  split_for_bash($rc_specific_submission_cmd_args),
389  split_for_bash($meadow_specific_submission_cmd_args),
390  $worker_cmd
391  );
392 
393  print "Executing [ ".$self->signature." ] \t\t".join(' ', @cmd)."\n";
394 
395  my $lsf_jobid;
396 
397  open(my $bsub_output_fh, "-|", @cmd) || die "Could not submit job(s): $!, $?"; # let's abort the beekeeper and let the user check the syntax
398  while(my $line = <$bsub_output_fh>) {
399  if($line=~/^Job <(\d+)> is submitted to/) {
400  $lsf_jobid = $1;
401  } else {
402  warn $line; # assuming it is a temporary blockage that might resolve itself with time
403  }
404  }
405  close $bsub_output_fh;
406 
407  if($lsf_jobid) {
408  return ($array_required ? [ map { $lsf_jobid.'['.$_.']' } (1..$required_worker_count) ] : [ $lsf_jobid ]);
409  } else {
410  die "Submission unsuccessful\n";
411  }
412 }
413 
414 1;
Bio::EnsEMBL::Hive::Meadow::LSF
Definition: LSF.pm:13
Bio::EnsEMBL::Hive::Meadow
Definition: DockerSwarm.pm:6
Bio::EnsEMBL::Hive::Beekeeper
Definition: Beekeeper.pm:13
Bio::EnsEMBL::Hive::Meadow
Definition: Meadow.pm:12
Bio::EnsEMBL::Hive::Queen
Definition: Queen.pm:47