my $self = shift;
my $matrix = shift;
unless ($matrix and
$matrix->isa('Bio::EnsEMBL::IdMapping::ScoredMappingMatrix')) {
throw('Need a Bio::EnsEMBL::IdMapping::ScoredMappingMatrix.');
}
# serialise SyntenyFramework to disk
$self->logger->debug("Serialising SyntenyFramework...\n", 0, 'stamped');
$self->logger->debug("Done.\n", 0, 'stamped');
# split the ScoredMappingMatrix into chunks and write to disk
my $matrix_size = $matrix->size;
$self->logger->debug("Scores before rescoring: $matrix_size.\n");
my $num_jobs = $self->conf->param('synteny_rescore_jobs') || 20;
$num_jobs++;
my $dump_path = path_append($self->conf->param('basedir'),
'matrix/synteny_rescore');
$self->logger->debug("Creating sub-matrices...\n", 0, 'stamped');
foreach my $i (1..$num_jobs) {
my $start = (int($matrix_size/($num_jobs-1)) * ($i - 1)) + 1;
my $end = int($matrix_size/($num_jobs-1)) * $i;
$self->logger->debug("$start-$end\n", 1);
my $sub_matrix = $matrix->sub_matrix($start, $end);
$sub_matrix->cache_file_name("gene_matrix_synteny$i.ser");
$sub_matrix->dump_path($dump_path);
$sub_matrix->write_to_file;
}
$self->logger->debug("Done.\n", 0, 'stamped');
# create an empty lsf log directory
my $logpath = path_append($self->logger->logpath, 'synteny_rescore');
system("rm -rf $logpath") == 0 or
$self->logger->error("Unable to delete lsf log dir $logpath: $!\n");
system("mkdir -p $logpath") == 0 or
$self->logger->error("Can't create lsf log dir $logpath: $!\n");
# build lsf command
my $lsf_name = 'idmapping_synteny_rescore_'.time;
my $options = $self->conf->create_commandline_options(
logauto => 1,
logautobase => "synteny_rescore",
logpath => $logpath,
interactive => 0,
is_component => 1,
);
my $cmd = qq{$Bin/synteny_rescore.pl $options --index \$LSB_JOBINDEX};
my $bsub_cmd =
sprintf( "|bsub -J '%s[1-%d]' "
. "-o %s/synteny_rescore.%%I.out "
. "-e %s/synteny_rescore.%%I.err %s",
$lsf_name, $num_jobs, $logpath, $logpath,
$self->conf()->param('lsf_opt_synteny_rescore') );
# run lsf job array
$self->logger->info("Submitting $num_jobs jobs to lsf.\n");
$self->logger->debug("$cmd\n\n");
local *BSUB;
open( BSUB, $bsub_cmd ) ## no critic
or $self->logger->error("Could not open open pipe to bsub: $!\n");
print BSUB $cmd;
$self->logger->error("Error submitting synteny rescoring jobs: $!\n")
unless ($? == 0);
close BSUB;
# submit dependent job to monitor finishing of jobs
$self->logger->info("Waiting for jobs to finish...\n", 0, 'stamped');
my $dependent_job =
qq{bsub -K -w "ended($lsf_name)" -q production } .
qq{-M 1000 -R 'select[mem>1000]' -R 'rusage[mem=1000]' } .
qq{-o $logpath/synteny_rescore_depend.out /bin/true};
system($dependent_job) == 0 or
$self->logger->error("Error submitting dependent job: $!\n");
$self->logger->info("All jobs finished.\n", 0, 'stamped');
# check for lsf errors
sleep(5);
my $err;
foreach my $i (1..$num_jobs) {
$err++ unless (-e "$logpath/synteny_rescore.$i.success");
}
if ($err) {
$self->logger->error("At least one of your jobs failed.\nPlease check the logfiles at $logpath for errors.\n");
}
# merge and return matrix
$self->logger->debug("Merging rescored matrices...\n");
$matrix->flush;
foreach my $i (1..$num_jobs) {
# read partial matrix created by lsf job from file
-DUMP_PATH => $dump_path,
-CACHE_FILE => "gene_matrix_synteny$i.ser",
);
# merge with main matrix
$matrix->merge($sub_matrix);
}
$self->logger->debug("Done.\n");
$self->logger->debug("Scores after rescoring: ".$matrix->size.".\n");
return $matrix;
}