ensembl-hive  2.7.0
SubmitMapper.pm
Go to the documentation of this file.
1 =head1 LICENSE
2 
3 See the NOTICE file distributed with this work for additional information
4 regarding copyright ownership.
5 
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 
10  http://www.apache.org/licenses/LICENSE-2.0
11 
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 
18 =cut
19 
20 package XrefMapper::SubmitMapper;
21 use strict;
22 
23 use vars '@ISA';
24 @ISA = qw{ XrefMapper::BasicMapper };
25 
26 use warnings;
28 
29 use Cwd;
30 use DBI;
31 use File::Basename;
32 use IPC::Open3;
33 
35 
36 ##################################################################
37 # JOB 1 Do exonerate jobs and get core info
38 ##################################################################
39 
40 # Also load database with connection details of the core database for use later in JOBS
41 
42 # One get all info from core that is needed. :- stable_id's etc.
43 
44 # Process the direct xrefs by putting them in the object_xref table
45 
46 # dump the fasta files
47 
48 # submit exonerate jobs (fill tables mapping, mapping_jobs) if not already done
49 
50 # submit coordinate xrefs if not processed
51 
52 
53 sub new {
54  my($class, $mapper) = @_;
55 
56  my $self ={};
57  bless $self,$class;
58  $self->core($mapper->core);
59  $self->xref($mapper->xref);
60  $self->mapper($mapper);
61  $self->verbose($mapper->verbose);
62  $self->nofarm($mapper->nofarm);
63  return $self;
64 }
65 
66 
67 sub mapper{
68  my ($self, $arg) = @_;
69 
70  (defined $arg) &&
71  ($self->{_mapper} = $arg );
72  return $self->{_mapper};
73 }
74 
75 
76 
77 
78 ##############################################################################
79 # dump fasta files code
80 ##############################################################################
81 =head2 dump_seqs
82 
83  Arg[1]: xref object which holds info needed for the dump of xref
84 
85  Description: Dumps out the files for the mapping. Xref object should hold
86  the value of the databases and source to be used.
87  Returntype : none
88  Exceptions : will die if species not known or an error occurs while
89  : trying to write to files.
90  Caller : general
91 
92 =cut
93 
94 
95 
96 sub dump_seqs{
97 
98  my ($self, $location) = @_;
99 
100  $self->core->dbc->disconnect_if_idle(1);
101  $self->core->dbc->disconnect_when_inactive(1);
102 
103  $self->dump_xref();
104 
105  $self->core->dbc->disconnect_if_idle(0);
106  $self->core->dbc->disconnect_when_inactive(0);
107 
108 
109 
110 
111  $self->xref->dbc->disconnect_when_inactive(1);
112  $self->xref->dbc->disconnect_if_idle(1);
113 
114  $self->dump_ensembl($location);
115 
116  $self->xref->dbc->disconnect_if_idle(0);
117  $self->xref->dbc->disconnect_when_inactive(0);
118 
119 }
120 
121 sub no_dump_xref {
122  my ($self) = @_;
123 
124  my @method= @{$self->get_stored_methods()};
125  $self->method(\@method);
126 
127  $self->core->dna_file($self->core->dir."/".$self->core->species."_dna.fasta");
128  $self->core->protein_file($self->core->dir."/".$self->core->species."_protein.fasta");
129 }
130 
131 =head2 dump_xref
132 
133  Arg[1]: xref object which holds info on method and files.
134 
135  Description: Dumps the Xref data as fasta file(s)
136  Returntype : none
137  Exceptions : none
138  Caller : dump_seqs
139 
140 =cut
141 
142 
143 
144 =head2 get_stored_methods
145 
146  Description: Retrieves exonerate methods stored in the source_mapping_method table in the xref database
147  Returntype : arrayref
148  Exceptions : none
149  Caller : no_dump_xref
150 
151 =cut
152 
153 
154 sub get_stored_methods {
155  my ($self) = @_;
156 
157  my $sth = $self->xref()->dbc->prepare("select distinct method from source_mapping_method order by method");
158  $sth->execute();
159 
160  my $method;
161  my @methods;
162  $sth->bind_columns(\$method);
163  while($sth->fetch()){
164  push(@methods,$method);
165  }
166  $sth->finish;
167  return \@methods;
168 
169 }
170 
171 sub dump_xref{
172  my ($self) = @_;
173 
174  my $xref =$self->xref();
175  if(!defined($xref->dir())){
176  if(defined($self->dir)){
177  $xref->species($self->dir);
178  $self->species_id($self->get_id_from_species_name($self->species));
179  }
180  else{
181  $xref->dir(".");
182  }
183  }
184 
185  #we will populate @method with all exonerate methods which will be used to perform sequence mapping
186  my @method;
187 
188  my ($default_method, $override_method_for_source);
189 
190  if($self->mapper->can('set_methods')){
191  ($default_method, $override_method_for_source) = $self->mapper->set_methods();
192  }
193  else {
194  ($default_method, $override_method_for_source) = $self->set_methods();
195  }
196 
197  push(@method, $default_method);
198  print "Default exonerate method is $default_method\n" if($self->verbose);
199 
200  # %override_method_for_source is keyed on exonerate methods; values are arrays of xref sources
201  my %override_method_for_source= %{$override_method_for_source};
202 
203  #array which holds all source ids, for which the default exonerate method was overriden
204  my @all_source_ids;
205 
206  #similar hash to %override_method_for_source but instead of array of source names as values, it contains array of
207  #corresponding source ids as values
208  my %methods_and_source_ids;
209 
210  my %source_mapping_method;
211 
212  #only sources which have xrefs in the primary_xref table (which stores sequences) are relevant here
213  #multiple sources can have the same name but different priority_descriptions so
214  #one name can map to multiple ids
215 
216  #%source_ids is keyed on source name, values are arrays of source ids - populate it with all
217  #sources which have xrefs in primary_xref table
218 
219  my %source_ids;
220 
221  my $source_sth = $xref->dbc->prepare("select distinct name, source_id from primary_xref join xref using(xref_id) join source using(source_id) order by name;");
222  $source_sth->execute();
223  while ( my ($name, $source_id) = $source_sth->fetchrow_array() ){
224  push (@{$source_ids{$name}}, $source_id);
225  }
226  $source_sth->finish();
227 
228 
229  if (%override_method_for_source) {
230 
231  print "Default exonerate method overridden for some sources\n" if($self->verbose);
232 
233  foreach my $method (keys %override_method_for_source) {
234 
235  #convert source names to source_ids
236 
237  my @source_names = @{$override_method_for_source{$method}};
238 
239  my $a_source_exists = 0;
240 
241  foreach my $source_name (@source_names) {
242  #a source name should be defined only once against one exonerate method only
243  if (exists($source_mapping_method{$source_name})) {
244  die "$source_name source name defined more than once in set_methods method (SubmitMapper method which can be overriden in species.pm module)\n";
245  }
246  $source_mapping_method{$source_name} = $method;
247 
248  my @source_ids = @{$source_ids{$source_name}} if (exists($source_ids{$source_name}));
249 
250  if (@source_ids) {
251  $a_source_exists = 1;
252  foreach my $source_id (@source_ids) {
253  push @{$methods_and_source_ids{$method}}, $source_id;
254  push @all_source_ids, $source_id;
255  }
256 
257  } else {
258  print "WARNING: source id for $source_name not found. Xrefs for this source will not be sequence mapped. There are no xrefs from this source in the primary_xref table.\n" if($self->verbose);
259  }
260  }
261 
262  #if we found at least one source id with xrefs to be mapped using $method, add $method to @method
263  if ($a_source_exists) {
264  push(@method, $method);
265  }
266  }
267 
268  }
269 
270  #store source_ids and mapping methods in source_mapping_method table
271  my $insert_src_method_sth = $xref->dbc->prepare("insert into source_mapping_method values(?,?)");
272 
273 
274  foreach my $source_name (keys %source_ids){
275  my $method;
276  if (exists($source_mapping_method{$source_name})){
277  $method = $source_mapping_method{$source_name};
278  } else {
279  $method = $default_method;
280  }
281  foreach my $source_id (@{$source_ids{$source_name}}){
282 
283  $insert_src_method_sth->execute($source_id,$method);
284  print "Will use $method method for source id $source_id, $source_name\n" if($self->verbose);
285 
286  }
287  }
288  $insert_src_method_sth->finish();
289 
290  $self->method(\@method);
291 
292  if(defined($self->mapper->dumpcheck())){
293  my $skip = 1;
294  foreach my $method (@method){
295  if(!-e $xref->dir()."/xref_".$method."_dna.fasta"){
296  $skip = 0;
297  }
298  if(!-e $xref->dir()."/xref_".$method."_peptide.fasta"){
299  $skip = 0;
300  }
301  }
302  if($skip){
303  print "Xref fasta files found and will be used (No new dumping)\n" if($self->verbose);
304  return;
305  }
306  }
307 
308  print "Dumping Xref fasta files\n" if($self->verbose());
309 
310  foreach my $method (@method){
311  for my $sequence_type ('dna', 'peptide') {
312 
313 
314  my $filename = $xref->dir() . "/xref_".$method."_" . $sequence_type . ".fasta";
315  open( my $DH,">", $filename) || die "Could not open $filename";
316 
317  my $sql = "SELECT p.xref_id, p.sequence, x.species_id , x.source_id ";
318  $sql .= " FROM primary_xref p, xref x ";
319  $sql .= " WHERE p.xref_id = x.xref_id AND ";
320  $sql .= " p.sequence_type ='" . $sequence_type ."' ";
321 
322  #for the default method don't select sources for which the method was overriden
323  if ($method eq $default_method && scalar(@all_source_ids) > 0 ) {
324  $sql .= "AND x.source_id not in (" . join(',',@all_source_ids).")";
325  }
326 
327  #for a non default method only select sources which should have their xrefs mapped using this method
328  if ($method ne $default_method) {
329  $sql .= "AND x.source_id in (" . join(',',@{$methods_and_source_ids{$method}}).")";
330  }
331 
332  my $sth = $xref->dbc->prepare($sql);
333  $sth->execute();
334  while(my @row = $sth->fetchrow_array()){
335  # Ambiguous peptides must be cleaned out to protect Exonerate from J,O and U codes
336  $row[1] = uc($row[1]);
337  $row[1] =~ s/(.{60})/$1\n/g;
338  if ($sequence_type eq 'peptide') { $row[1] =~ tr/JOU/X/ }
339  print $DH ">".$row[0]."\n".$row[1]."\n";
340 
341  }
342 
343  close $DH;
344  $sth->finish();
345 
346  }
347  }
348  my $sth = $xref->dbc->prepare("insert into process_status (status, date) values('xref_fasta_dumped',now())");
349  $sth->execute();
350  $sth->finish;
351 
352 
353  return;
354 
355 }
356 
357 
358 =head2 dump_ensembl
359 
360  Description: Dumps the ensembl data to a file in fasta format.
361  Returntype : none
362  Exceptions : none
363  Caller : dump_seqs
364 
365 =cut
366 
367 sub dump_ensembl{
368  my ($self) = @_;
369 
370 
371  my $num_seqs = 0;
372  my $sth = $self->core->dbc->prepare("select count(distinct (seq_region_id)) from gene");
373  $sth->execute;
374  $sth->bind_columns(\$num_seqs);
375  $sth->fetch;
376  $sth->finish;
377 
378  my $num_genes;
379  $sth = $self->core->dbc->prepare("select count(1) from gene");
380  $sth->execute;
381  $sth->bind_columns(\$num_genes);
382  $sth->fetch;
383  $sth->finish;
384 
385 
386  # my $ignore_genes = $self->fetch_alt_allele_list();
387  # comment out until we know what we want to do with alt alleles.
388  my %temp_hash;
389  my $ignore_genes = \%temp_hash;
390 
391 
392  if ($num_genes < $num_seqs) {
393  $self->fetch_and_dump_seq_via_genes($ignore_genes);
394  }
395  else {
396  $self->fetch_and_dump_seq_via_toplevel($ignore_genes);
397  }
398 
399 }
400 
401 
402 sub fetch_alt_allele_list {
403  my ($self) = @_;
404 
405  my %allele;
406 
407  my $sth = $self->xref->dbc->prepare(" select gene_id from alt_allele where is_reference = 0");
408  $sth->execute();
409  my $gene_id;
410  $sth->bind_columns(\$gene_id);
411  while ($sth->fetch()){
412  $allele{$gene_id} = 1;
413  }
414  return \%allele;
415 
416 }
417 
418 sub fetch_and_dump_seq_via_toplevel{
419  my ($self, $ignore_genes) = @_;
420 
421  my $inc_dupes = 0; # do not include duplicate regions like PARs?
422  my $inc_nonref = 1; # include non-reference regions like haplotypes?
423  my $ensembl = $self->core;
424  $self->add_meta_pair("dump_method","fetch_and_dump_seq_via_toplevel");
425 
426 
427  #
428  # store ensembl dna file name and open it
429  #
430  if(!defined($ensembl->dir())){
431  $ensembl->dir(".");
432  }
433  $ensembl->dna_file($ensembl->dir."/".$ensembl->species."_dna.fasta");
434 
435 
436  #
437  # store ensembl protein file name and open it
438  #
439  $ensembl->protein_file($ensembl->dir."/".$ensembl->species."_protein.fasta");
440 
441  if(defined($self->mapper->dumpcheck()) and -e $ensembl->protein_file() and -e $ensembl->dna_file()){
442  my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
443  $sth->execute();
444  print "Ensembl Fasta files found (no new dumping)\n" if($self->verbose());
445  return;
446  }
447 
448  print "Dumping Ensembl Fasta files\n" if($self->verbose());
449 
450  open(my $dnah,">", $ensembl->dna_file())
451  || die("Could not open dna file for writing: ".$ensembl->dna_file."\n");
452 
453  open(my $peph, ">", $ensembl->protein_file())
454  || die("Could not open protein file for writing: ".$ensembl->protein_file."\n");
455 
456  my $db = new Bio::EnsEMBL::DBSQL::DBAdaptor(-dbconn => $ensembl->dbc);
457 
458  my $slice_adaptor = $db->get_SliceAdaptor();
459 
460  my $slices = $slice_adaptor->fetch_all( 'toplevel', undef, $inc_nonref, $inc_dupes ) || [];
461  my $script_count = 0;
462  my $lation_count = 0;
463  my $ama = $slice_adaptor->db()->get_AssemblyMapperAdaptor();
464  my $seqa = $slice_adaptor->db()->get_SequenceAdaptor();
465 
466  while(my $slice = shift @$slices){
467 
468  #get genes from this slice
469  my @genes = sort { $a->start() <=> $b->start() }
470  @{$slice->get_all_Genes(undef, undef, 1, undef, undef)};
471 # my $genes = $slice->get_all_Genes(undef,undef,1);
472 
473 # while(my $gene = shift @$genes){
474  while(my $gene = shift @genes){
475  next if $gene->biotype eq 'J_segment';
476  next if $gene->biotype eq 'D_segment';
477  next if $ignore_genes->{$gene->dbID};
478 
479  foreach my $transcript (@{$gene->get_all_Transcripts()}) {
480  my $seq = $transcript->spliced_seq();
481  $seq =~ s/(.{60})/$1\n/g;
482  print $dnah ">" . $transcript->dbID() . "\n" .$seq."\n" || die "Error writing for transcript ".$transcript->dbID."\n$!\n";
483  $script_count++;
484  my $trans = $transcript->translation();
485  my $translation = $transcript->translate();
486 
487  if(defined($translation)){
488  my $pep_seq = $translation->seq();
489  $pep_seq =~ s/(.{60})/$1\n/g;
490  print $peph ">".$trans->dbID()."\n".$pep_seq."\n" || die "Error writing for translation ".$trans->dbID."\n$!\n";
491  $lation_count++;
492  }
493  }
494  }
495 
496  # Zap caches!
497  %{ $slice_adaptor->{'sr_name_cache'} } = ();
498  %{ $slice_adaptor->{'sr_id_cache'} } = ();
499 
500  $ama->delete_cache();
501  $seqa->clear_cache();
502  }
503 
504  close $dnah || die "unable to close dna file\n$!\n";
505  close $peph || die "unable to close peptide file\n$!\n";
506 
507 
508 
509  sleep(10); # give the disks a chance.
510 
511  #####################################################
512  # Sanity check as some of the disks get timeouts etc.
513  #####################################################
514 
515  my $line = 'grep "^>" '.$ensembl->dna_file().' | wc -l';
516  my $out = `$line`;
517  chomp $out;
518  if($out != $script_count){
519  die "Problem writing DNA file as there should be $script_count entries but file has only $out\n";
520  }
521 
522 
523  $line = 'grep "^>" '.$ensembl->protein_file().' | wc -l';
524  $out = `$line`;
525  chomp $out;
526  if($out != $lation_count){
527  die "Problem writing PEPTIDE file as there should be $lation_count entries but file has only $out\n";
528  }
529 
530 
531  print $script_count ." Transcripts dumped ". $lation_count. " Transaltions dumped\n" if($self->verbose);
532 
533  #######################################
534  # Update the database about the status.
535  #######################################
536 
537  my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
538  $sth->execute();
539  $sth->finish;
540 }
541 
542 
543 
544 =head2 fetch_and_dump_seq_via_genes
545 
546  Description: Dumps the ensembl data to a file in fasta format.
547  Returntype : none
548  Exceptions : wil die if the are errors in db connection or file creation.
549  Caller : dump_ensembl
550 
551 =cut
552 
553 sub fetch_and_dump_seq_via_genes{
554  my ($self,$ignore_genes) = @_;
555 
556  my $ensembl = $self->core;
557  my $db = new Bio::EnsEMBL::DBSQL::DBAdaptor(-dbconn => $ensembl->dbc);
558 
559  my $slice_adaptor = $db->get_SliceAdaptor();
560  my $ama = $db->get_AssemblyMapperAdaptor();
561  my $seqa = $db->get_SequenceAdaptor();
562  my $gene_adaptor = $db->get_GeneAdaptor();
563 
564  #
565  # store ensembl dna file name and open it
566  #
567  if(!defined($ensembl->dir())){
568  $ensembl->dir(".");
569  }
570  $ensembl->dna_file($ensembl->dir."/".$ensembl->species."_dna.fasta");
571 
572 
573  #
574  # store ensembl protein file name and open it
575  #
576  $ensembl->protein_file($ensembl->dir."/".$ensembl->species."_protein.fasta");
577 
578  $self->add_meta_pair("dump_method","fetch_and_dump_seq_via_genes");
579 
580  if(defined($self->mapper->dumpcheck()) and -e $ensembl->protein_file() and -e $ensembl->dna_file()){
581  my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
582  $sth->execute();
583  print "Ensembl Fasta files found (no new dumping)\n" if($self->verbose());
584  return;
585  }
586 
587  print "Dumping Ensembl Fasta files\n" if($self->verbose());
588 
589  open(my $dnah, ">", $ensembl->dna_file())
590  || die("Could not open dna file for writing: ".$ensembl->dna_file."\n");
591 
592  open(my $peph, ">", $ensembl->protein_file())
593  || die("Could not open protein file for writing: ".$ensembl->protein_file."\n");
594 
595 
596 
597  # fetch by location, or everything if not defined
598 
599  my @genes;
600  my $constraint;
601 
602 
603 # TEST PURPOSES ONLY#################################################
604 #####################################################################
605  @genes = @{$gene_adaptor->fetch_all()};
606 
607 # push @genes, $gene_adaptor->fetch_by_stable_id("ENSG00000139618");
608 #####################################################################
609 # my $max = undef;
610 
611  my $script_count=0;
612  my $lation_count=0;
613  foreach my $gene (@genes){
614  next if $gene->biotype eq 'J_segment';
615  next if $gene->biotype eq 'D_segment';
616  next if $ignore_genes->{$gene->dbID};
617 
618  foreach my $transcript (@{$gene->get_all_Transcripts()}) {
619  my $seq = $transcript->spliced_seq();
620  $seq =~ s/(.{60})/$1\n/g;
621  print $dnah ">" . $transcript->dbID() . "\n" .$seq."\n" || die "Error writing for transcript ".$transcript->dbID."\n$!\n";
622  $script_count++;
623  my $trans = $transcript->translation();
624  my $translation = $transcript->translate();
625 
626  if(defined($translation)){
627  my $pep_seq = $translation->seq();
628  $pep_seq =~ s/(.{60})/$1\n/g;
629  print $peph ">".$trans->dbID()."\n".$pep_seq."\n" || die "Error writing for translation ".$trans->dbID."\n$!\n";
630  $lation_count++;
631  }
632  }
633 
634  # Zap caches!
635  %{ $slice_adaptor->{'sr_name_cache'} } = ();
636  %{ $slice_adaptor->{'sr_id_cache'} } = ();
637 
638  $ama->delete_cache();
639 
640  $seqa->clear_cache();
641  }
642  close $dnah || die "unable to close dna file\n$!\n";
643  close $peph || die "unable to close peptide file\n$!\n";
644 
645 
646 
647  sleep(10);
648 
649  #####################################################
650  # Sanity check as some of the disks get timeouts etc.
651  #####################################################
652 
653  my $line = 'grep "^>" '.$ensembl->dna_file().' | wc -l';
654  my $out = `$line`;
655  chomp $out;
656  if($out != $script_count){
657  die "Problem writing DNA file as there should be $script_count entries but file has only $out\n";
658  }
659 
660 
661  $line = 'grep "^>" '.$ensembl->protein_file().' | wc -l';
662  $out = `$line`;
663  chomp $out;
664  if($out != $lation_count){
665  die "Problem writing PEPTIDE file as there should be $lation_count entries but file has only $out\n";
666  }
667 
668  print $script_count ." Transcripts dumped ". $lation_count. " Transaltions dumped\n" if($self->verbose);
669 
670  #######################################
671  # Update the database about the status.
672  #######################################
673 
674  my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
675  $sth->execute();
676  $sth->finish;
677 
678 }
679 
680 =head2 set_methods
681 
682  Description: Specifies the default exonerate method and non default methods which should be used for
683  one or more sources.
684  Returns a string containing the default method and a hash refrence.
685  The hash key is an exonerate method, the corresponding value is an array of source names
686  whose xrefs are to be mapped using the exonerate method in the hash key.
687  Multiple sources can be matched but only those with xrefs in the primary_xref table
688  which holds sequence data are considered. If a particular source is not found, a warning
689  will be written out by the caller.
690  This method can be overriden in species.pm
691  Returntype : string, hash of arrays
692  Example : my ($default_method, $override_method_for_source) = $self->set_methods();
693  Exceptions : none
694  Caller : dump_xref
695 
696 =cut
697 
698 sub set_methods{
699 
700  my $default_method = 'ExonerateGappedBest1';
701  my %override_method_for_source = ( ExonerateGappedBest_100_perc_id => ['Uniprot/SWISSPROT', 'Uniprot/SPTREMBL'] ,
702  ExonerateGappedBest5 => ['RefSeq_mRNA','RefSeq_mRNA_predicted', 'RefSeq_ncRNA', 'RefSeq_ncRNA_predicted' ],
703  );
704 
705  return $default_method, \%override_method_for_source;
706 }
707 
708 
709 
710 ###################################################################################################
711 # exonerate subs
712 ###################################################################################################
713 =head2 jobcount
714 
715  Arg [1] : (optional)
716  Example : $mapper->jobcount(1004);
717  Description: Getter / Setter for number of jobs submitted.
718  Returntype : scalar
719  Exceptions : none
720 
721 =cut
722 
723 sub jobcount {
724  my ($self, $arg) = @_;
725 
726  (defined $arg) &&
727  ($self->{_jobcount} = $arg );
728  return $self->{_jobcount};
729 }
730 
731 
732 sub method{
733  my ($self, $arg) = @_;
734 
735  (defined $arg) &&
736  ($self->{_method} = $arg );
737  return $self->{_method};
738 }
739 
740 =head2 build_list_and_map
741 
742  Arg[1]: xref object which holds info on method and files.
743 
744  Description: runs the mapping of the list of files with species methods
745  Returntype : none
746  Exceptions : none
747  Caller : general
748 
749 =cut
750 
751 sub build_list_and_map {
752 
753  my ($self) = @_;
754 
755  my @list=();
756 
757  foreach my $method (@{$self->method()}){
758  my @dna=();
759  my $q_dna_file = $self->xref->dir."/xref_".$method."_dna.fasta";
760  if (-e $q_dna_file and -s $q_dna_file) {
761  push @dna, $method;
762  push @dna, $q_dna_file;
763  push @dna, $self->core->dna_file();
764  push @list, \@dna;
765  }
766 
767  my @pep=();
768  my $q_pep_file = $self->xref->dir."/xref_".$method."_peptide.fasta";
769  if (-e $q_pep_file and -s $q_pep_file) {
770  push @pep, $method;
771  push @pep, $self->xref->dir."/xref_".$method."_peptide.fasta";
772  push @pep, $self->core->protein_file();
773  push @list, \@pep;
774  }
775  }
776  $self->run_mapping(\@list);
777 
778 }
779 
780 
781 =head2 fix_mappings
782 
783  Example : none
784  Description: submit mapping jobs to LSF for those that had problems, and wait for them to finish.
785  Returntype : none
786  Exceptions : none
787  Caller : general
788 
789 =cut
790 
791 sub fix_mappings {
792  my $self = shift;
793 
794  # 1) find the broken jobs
795 
796  # 2) remove the object_xrefs from object_xref_start to object_xref_end
797  # Plus identity xrefs for these
798 
799  my $rm_object_xrefs_sth = $self->xref->dbc->prepare("delete o from object_xref o where object_xref_id >= ? and object_xref_id <= ?");
800  my $rm_identity_xrefs_sth = $self->xref->dbc->prepare("delete i from identity_xref i where object_xref_id >= ? and object_xref_id <= ?");
801  my $reset_object_xref_limits_sth = $self ->xref->dbc->prepare("update mapping_jobs set object_xref_start = null, object_xref_end = null where job_id = ? and array_number = ?");
802 
803  # 3) remove map, err and out files for each
804 
805  # 4) rerun the mapping jobs SETTING NEW DATA IN mapping_jobs
806 
807  my $sql = 'SELECT mapping.job_id, map_file, out_file, err_file, object_xref_start, object_xref_end, array_number, command_line, method, root_dir from mapping_jobs, mapping where mapping.job_id = mapping_jobs.job_id and status = "FAILED"';
808 
809  my $sth = $self->xref->dbc->prepare($sql);
810  $sth->execute();
811  my ($job_id, $map_file, $out_file, $err_file, $object_xref_start, $object_xref_end, $array_number, $command_line, $method, $root_dir);
812 
813  $sth->bind_columns(\$job_id, \$map_file, \$out_file, \$err_file, \$object_xref_start, \$object_xref_end, \$array_number, \$command_line, \$method, \$root_dir);
814 
815  my @running_methods;
816  my @job_names;
817 
818  while ($sth->fetch){
819  my $start = undef;
820  my $end = undef;
821 
822  $command_line =~ s/\$LSB_JOBINDEX/$array_number/g;
823 
824  if(defined($object_xref_start) and $object_xref_start){
825  $start = $object_xref_start;
826  }
827  if(defined($object_xref_end) and $object_xref_end){
828  $end = $object_xref_end;
829  }
830 
831  if(!defined($start) and !defined($end)){
832  }
833  elsif(!defined($start) or !defined($end)){
834  print STDERR "Could not clean up for ".$job_id."[".$array_number."]\n";
835  next;
836  }
837  else{ # REMOVE the entries
838  print "removing object_xref etc from $start to $end\n";
839  $rm_object_xrefs_sth->execute($start, $end);
840  $rm_identity_xrefs_sth->execute($start, $end);
841  $reset_object_xref_limits_sth->execute($job_id, $array_number);
842  }
843 
844  unlink($root_dir."/".$map_file);
845  unlink($root_dir."/".$out_file);
846  unlink($root_dir."/".$err_file);
847 
848  # Run the Mapping.
849  my $obj_name = "XrefMapper::Methods::$method";
850  # check that the appropriate object exists
851  eval "require $obj_name";
852  if($@) {
853 
854  warn("Could not find object $obj_name corresponding to mapping method $method, skipping\n$@");
855 
856  } else {
857 
858  my $obj = $obj_name->new($self->mapper);
859 
860  print "DO resubmit for $array_number\n";
861  my $job_name = $obj->resubmit_exonerate($self->mapper, $command_line, $out_file, $err_file, $job_id, $array_number, $root_dir);
862  print "Job name is $job_name\n";
863  push @job_names, $job_name;
864  push @running_methods, $obj;
865 
866  my $sth = $self->mapper->xref->dbc->prepare('update mapping_jobs set status = "SUBMITTED"'." where job_id = $job_id and array_number = $array_number");
867  $sth->execute();
868  $sth->finish;
869 
870  sleep 1; # make sure unique names really are unique
871 
872  $self->jobcount(1);
873  }
874 
875 
876  }
877  $sth->finish;
878  # submit depend job to wait for all mapping jobs
879  foreach my $method( @running_methods ){
880  # Submit all method-specific depend jobs
881  if( $method->can('submit_depend_job') ){
882  $method->submit_depend_job;
883  }
884  }
885  # Submit generic depend job. Defaults to LSF IF any exist.
886  if(defined($job_names[0])){
887  $self->submit_depend_job($self->core->dir, @job_names);
888  }
889 
890  $self->check_err($self->core->dir);
891 
892 
893 }
894 
895 
896 =head2 run_mapping
897 
898  Arg[1] : List of lists of (method, query, target)
899  Arg[2] :
900  Example : none
901  Description: Create and submit mapping jobs to LSF, and wait for them to finish.
902  Returntype : none
903  Exceptions : none
904  Caller : general
905 
906 =cut
907 
908 sub run_mapping {
909 
910  my ($self, $lists) = @_;
911 
912  # delete old output files in target directory if we're going to produce new ones
913 
914  my $dir = $self->core->dir();
915  print "Deleting out, err and map files from output dir: $dir\n" if($self->verbose());
916  unlink (glob("$dir/*.map"));
917  unlink (glob("$dir/*.out"));
918  unlink (glob("$dir/*.err"));
919 
920  $self->remove_all_old_output_files();
921  #disconnect so that we can then reconnect after the long mapping bit.
922  $self->core->dbc->disconnect_if_idle(1);
923  $self->xref->dbc->disconnect_if_idle(1);
924  $self->core->dbc->disconnect_when_inactive(1);
925  $self->xref->dbc->disconnect_when_inactive(1);
926 
927  # foreach method, submit the appropriate job & keep track of the job name
928  # note we check if use_existing_mappings is set here, not earlier, as we
929  # still need to instantiate the method object in order to fill
930  # method_query_threshold and method_target_threshold
931 
932  my @job_names;
933  my @running_methods;
934  foreach my $list (@$lists){
935 
936  my ($method, $queryfile ,$targetfile) = @$list;
937 
938  my $obj_name = "XrefMapper::Methods::$method";
939  # check that the appropriate object exists
940  eval "require $obj_name";
941  if($@) {
942 
943  warn("Could not find object $obj_name corresponding to mapping method $method, skipping\n$@");
944 
945  } else {
946 
947  my $obj = $obj_name->new($self->mapper);
948 
949  my $job_name = $obj->run($queryfile, $targetfile, $self);
950  push @job_names, $job_name;
951  push @running_methods, $obj;
952  sleep 1; # make sure unique names really are unique
953 
954  $self->jobcount(($self->jobcount||0)+$obj->jobcount);
955  }
956  } # foreach method
957 
958  # submit depend job to wait for all mapping jobs
959  foreach my $method( @running_methods ){
960  # Submit all method-specific depend jobs
961  if( $method->can('submit_depend_job') ){
962  $method->submit_depend_job;
963  }
964  }
965  # Submit generic depend job. Defaults to LSF
966  $self->submit_depend_job($self->core->dir, @job_names);
967  $self->core->dbc->disconnect_if_idle(0);
968  $self->xref->dbc->disconnect_if_idle(0);
969  $self->core->dbc->disconnect_when_inactive(0);
970  $self->xref->dbc->disconnect_when_inactive(0);
971 
972  $self->check_err($self->core->dir);
973 
974 } # run_mapping
975 
976 sub nofarm{
977  my ($self, $arg) = @_;
978 
979  (defined $arg) &&
980  ($self->{_nofarm} = $arg );
981  return $self->{_nofarm};
982 }
983 
984 sub check_err {
985 
986  my ($self, $dir) = @_;
987 
988  foreach my $err (glob("$dir/*.err")) {
989 
990  print STDERR "\n\n*** Warning: $err has non-zero size; may indicate".
991  " problems with exonerate run\n\n\n" if (-s $err);
992 
993  }
994 }
995 
996 
997 =head2 submit_depend_job
998 
999  Arg[1] : List of job names.
1000  Arg[2] :
1001  Example : none
1002  Description: Submit an LSF job that waits for other jobs to finish.
1003  Returntype : none
1004  Exceptions : none
1005  Caller : general
1006 
1007 =cut
1008 
1009 sub submit_depend_job {
1010 
1011  my ($self, $root_dir, @job_names) = @_;
1012 
1013 
1014  if(defined($self->nofarm)){
1015  my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('mapping_finished',now())");
1016  $sth->execute();
1017  $sth->finish;
1018  return;
1019  }
1020 
1021  # Submit a job that does nothing but wait on the main jobs to
1022  # finish. This job is submitted interactively so the exec does not
1023  # return until everything is finished.
1024 
1025  # build up the bsub command; first part
1026 # my @depend_bsub = ('bsub', '-K');
1027 
1028  # build -w 'ended(job1) && ended(job2)' clause
1029  my $ended_str = '-w "';
1030  my $i = 0;
1031  foreach my $job (@job_names) {
1032  $ended_str .= "ended($job)";
1033  $ended_str .= " && " if ($i < $#job_names);
1034  $i++;
1035  }
1036  $ended_str .= '"';
1037 
1038 # push @depend_bsub, $ended_str;
1039 
1040  # rest of command
1041 
1042  my $queue = $self->mapper->farm_queue || 'production-rh74';
1043 # push @depend_bsub, ('-q', $queue, '-o', "$root_dir/depend.out", '-e', "$root_dir/depend.err");
1044 
1045  my $jobid = 0;
1046  my $memory_resources = q{-M 5 -R"select[mem>5] rusage[mem=5]"};
1047  my $com = sprintf "bsub -K %s $memory_resources -o $root_dir/depend.out -e $root_dir/depend.err $ended_str /bin/true", $queue?"-q $queue":'';
1048 
1049 
1050  my $line = `$com`;
1051 
1052  if ($line =~ /^Job <(\d+)> is submitted/) {
1053  $jobid = $1;
1054  print "LSF job ID for Depend job: $jobid (job array with 1 job)\n";
1055  }
1056 
1057 
1058  if (!$jobid) {
1059  # Something went wrong
1060  warn("Job submission failed:\n$@\n");
1061  }
1062  else{
1063  my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('mapping_finished',now())");
1064  $sth->execute();
1065  $sth->finish;
1066  }
1067 }
1068 
1069 sub remove_all_old_output_files{
1070  my ($self) =@_;
1071 
1072  my $dir = $self->core->dir();
1073 
1074  print "Deleting txt and sql files from output dir: $dir\n" if($self->verbose);
1075  unlink(glob("$dir/*.txt $dir/*.sql"));
1076 # $self->cleanup_projections_file(); # now to be done when we load core.
1077 }
1078 
1079 
1080 
1081 1;
Bio::EnsEMBL::DBSQL::DBAdaptor
Definition: DBAdaptor.pm:40
run_mapping
public run_mapping()
XrefMapper::BasicMapper
Definition: BasicMapper.pm:8
Xref
Definition: DB.pm:8
info
public info()