my $self = shift;
my $source_file = $self->exon_fasta_file('source');
my $target_file = $self->exon_fasta_file('target');
my $source_size = -s $source_file;
my $target_size = -s $target_file;
# check if fasta files exist and are not empty
unless ($source_size and $target_size) {
throw("Can't find exon fasta files.");
}
# create an empty lsf log directory
my $logpath = path_append($self->logger->logpath, 'exonerate');
system("rm -rf $logpath") == 0 or
$self->logger->error("Unable to delete lsf log dir $logpath: $!\n");
system("mkdir -p $logpath") == 0 or
$self->logger->error("Can't create lsf log dir $logpath: $!\n");
# delete exonerate output from previous runs
my $dump_path = $self->cache->dump_path;
opendir(my $dumpdir, $dump_path) or
$self->logger->error("Can't open $dump_path for reading: $!");
while (defined(my $file = readdir($dumpdir))) {
next unless /exonerate_map\.\d+/;
unlink("$dump_path/$file") or
$self->
logger->error("Can't delete $dump_path/$file: $!");
}
closedir($dumpdir);
# determine number of jobs to split task into
my $bytes_per_job = $self->conf->param('exonerate_bytes_per_job')
|| 250000;
my $num_jobs = $self->conf->param('exonerate_jobs');
$num_jobs ||= int( $source_size/$bytes_per_job + 1 );
my $percent =
int( ( $self->conf->param('exonerate_threshold') || 0.5 )*100 );
my $lsf_name = 'idmapping_exonerate_' . time;
my $exonerate_path = $self->conf->param('exonerate_path');
my $exonerate_extra_params =
$self->conf->param('exonerate_extra_params');
#
# run exonerate jobs using lsf
#
my $exonerate_job =
qq{$exonerate_path } .
qq{--query $source_file --target $target_file } .
q{--querychunkid $LSB_JOBINDEX } .
qq{--querychunktotal $num_jobs } .
q{--model ungapped -M 1000 -D 100 } .
q{--showalignment FALSE --subopt no } . qq{--percent $percent } .
$self->conf->param('exonerate_extra_params') . " " .
q{--ryo 'myinfo: %qi %ti %et %ql %tl\n' } .
qq{| grep '^myinfo:' > $dump_path/exonerate_map.\$LSB_JOBINDEX} .
"\n";
$self->logger->info("Submitting $num_jobs exonerate jobs to lsf.\n");
$self->logger->debug("$exonerate_job\n\n");
my $bsub_cmd = sprintf(
"|bsub -J '%s[1-%d]%%%d' -o %s/exonerate.%%I.out %s",
$lsf_name,
$num_jobs,
$self->conf()->param('exonerate_concurrent_jobs') || 200,
$logpath,
$self->conf()->param('lsf_opt_exonerate') );
local *BSUB;
open( BSUB, $bsub_cmd ) ## no critic
or $self->logger->error("Could not open open pipe to bsub: $!\n");
print BSUB $exonerate_job;
$self->logger->error("Error submitting exonerate jobs: $!\n")
unless ($? == 0);
close BSUB;
# submit dependent job to monitor finishing of exonerate jobs
$self->logger->info("Waiting for exonerate jobs to finish...\n", 0, 'stamped');
my $dependent_job =
qq{bsub -K -w "ended($lsf_name)" -q production } .
qq{-M 1000 -R 'select[mem>1000]' -R 'rusage[mem=1000]' } .
qq{-o $logpath/exonerate_depend.out /bin/true};
system($dependent_job) == 0 or
$self->logger->error("Error submitting dependent job: $!\n");
$self->logger->info("All exonerate jobs finished.\n", 0, 'stamped');
#
# check results
#
my @missing;
my @error;
for (my $i = 1; $i <= $num_jobs; $i++) {
# check that output file exists
my $outfile = "$dump_path/exonerate_map.$i";
push @missing, $outfile unless (-f "$outfile");
# check no errors occurred
my $errfile = "$logpath/exonerate.$i.err";
push @error, $errfile if (-s "$errfile");
}
if (@missing) {
$self->logger->info("Couldn't find all exonerate output files. These are missing:\n");
foreach (@missing) {
$self->logger->info("$_\n", 1);
}
exit(1);
}
if (@error) {
$self->logger->info("One or more exonerate jobs failed. Check these error files:\n");
foreach (@error) {
$self->logger->info("$_\n", 1);
}
exit(1);
}
}