ensembl-hive  2.8.1
ProcessMappings.pm
Go to the documentation of this file.
1 =head1 LICENSE
2 
3 See the NOTICE file distributed with this work for additional information
4 regarding copyright ownership.
5 
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 
10  http://www.apache.org/licenses/LICENSE-2.0
11 
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 
18 =cut
19 
20 package XrefMapper::ProcessMappings;
21 use strict;
22 
23 use vars '@ISA';
24 @ISA = qw{ XrefMapper::BasicMapper };
25 
26 use warnings;
28 
29 use Cwd;
30 use DBI;
31 use File::Basename;
32 use IPC::Open3;
33 
34 ##################################################################
35 # JOB 2
36 ##################################################################
37 
38 # process exonerate mapping file (include checks)
39 
40 # Save all data to object_xref. Plus remember to add dependents
41 
42 # process priority xrefs to leave those excluded flagged as such
43 
44 # Do tests on xref database wrt to core before saving data.
45 
46 sub new {
47  my($class, $mapper) = @_;
48 
49  my $self ={};
50  bless $self,$class;
51  $self->core($mapper->core);
52  $self->xref($mapper->xref);
53  $self->verbose($mapper->verbose);
54  return $self;
55 }
56 
57 sub process_mappings {
58  my ($self) = @_;
59 
60  # get the jobs from the mapping table
61  # for i =1 i < jobnum{
62  # if( Not parsed already see mapping_jobs){
63  # check the .err, .out and .map files in that order.
64  # put data into object_xref, identity_xref etc...
65  # add data to mapping_job table
66  # }
67  # }
68 
69 
70 
71  my %query_cutoff;
72  my %target_cutoff;
73  my ($job_id, $percent_query_cutoff, $percent_target_cutoff);
74  my $dbi = $self->xref->dbc();
75  my $sth = $dbi->prepare("select job_id, percent_query_cutoff, percent_target_cutoff from mapping");
76  $sth->execute();
77  $sth->bind_columns(\$job_id, \$percent_query_cutoff, \$percent_target_cutoff);
78  my $object_xref_id;
79 
80  while($sth->fetch){
81  $query_cutoff{$job_id} = $percent_query_cutoff;
82  $target_cutoff{$job_id} = $percent_target_cutoff;
83  }
84  $sth->finish;
85 
86  my ($root_dir, $map, $status, $out, $err, $array_number);
87  my ($map_file, $out_file, $err_file);
88  my $map_sth = $dbi->prepare("select root_dir, map_file, status, out_file, err_file, array_number, job_id from mapping_jobs");
89  $map_sth->execute();
90  $map_sth->bind_columns(\$root_dir, \$map, \$status, \$out, \$err, \$array_number, \$job_id);
91  my $already_processed_count = 0;
92  my $processed_count = 0;
93  my $error_count = 0;
94  my $empty_count = 0;
95 
96  my $stat_sth = $dbi->prepare("update mapping_jobs set status = ? where job_id = ? and array_number = ?");
97 
98  while($map_sth->fetch()){
99  my $err_file = $root_dir."/".$err;
100  my $out_file = $root_dir."/".$out;
101  my $map_file = $root_dir."/".$map;
102  if($status eq "SUCCESS"){
103  $already_processed_count++;
104  }
105  else{
106  if(-s $err_file){
107  $error_count++;
108  print STDERR "Problem $err_file is non zero\n";
109  if(open my $eh ,"<", $err_file){
110  while(<$eh>){
111  print STDERR "#".$_;
112  }
113  close $eh;
114  }
115  else{
116  print STDERR "No file exists $err_file???\n Resubmit this job\n";
117  }
118  if($status eq "SUBMITTED"){
119  $stat_sth->execute('FAILED',$job_id, $array_number);
120  }
121  }
122  else{ #err file checks out so process the mapping file.
123  if(-e $map_file){
124  my $count = $self->process_map_file($map_file, $query_cutoff{$job_id}, $target_cutoff{$job_id}, $job_id, $array_number, $dbi);
125  if( $count > 0){
126  $processed_count++;
127  $stat_sth->execute('SUCCESS',$job_id, $array_number);
128  }
129  elsif($count ==0){
130  print STDERR "WARNING $map_file was empty could be okay but if there are alot of these it is probably a problem\n";
131  $processed_count++;
132  $empty_count++;
133  $stat_sth->execute('SUCCESS',$job_id, $array_number);
134  }
135  else{
136  $error_count++;
137  $stat_sth->execute('FAILED',$job_id, $array_number);
138  }
139  }
140  else{
141  $error_count++;
142  print STDERR "Could not open file $map_file???\n Resubmit this job\n";
143  $stat_sth->execute('FAILED',$job_id, $array_number);
144  }
145  }
146  }
147  }
148  $map_sth->finish;
149  $stat_sth->finish;
150 
151  print "already processed = $already_processed_count, processed = $processed_count, errors = $error_count, empty = $empty_count\n" if($self->verbose);
152 
153  if(!$error_count){
154  my $sth = $dbi->prepare("insert into process_status (status, date) values('mapping_processed',now())");
155  $sth->execute();
156  $sth->finish;
157  }
158 
159 }
160 
161 
162 
163 #return number of lines parsed if succesfull. -1 for fail
164 sub process_map_file{
165  my ($self, $map_file, $query_cutoff, $target_cutoff, $job_id, $array_number, $dbi) = @_;
166  my $ret = 1;
167 
168 
169 
170  my $ensembl_type = "Translation";
171  if($map_file =~ /dna_/){
172  $ensembl_type = "Transcript";
173  }
174 
175  my $mh;
176  if(!(open $mh ,"<",$map_file) ){
177  print STDERR "Could not open file $map_file\n Resubmit this job??\n";
178  return -1;
179  }
180 
181  my $total_lines = 0;
182  my $root_dir = $self->core->dir;
183 
184  my $dep_sth = $dbi->prepare("select dependent_xref_id, linkage_annotation from dependent_xref where master_xref_id = ?");
185  my $start_sth = $dbi->prepare("update mapping_jobs set object_xref_start = ? where job_id = ? and array_number = ?");
186  my $end_sth = $dbi->prepare("update mapping_jobs set object_xref_end = ? where job_id = ? and array_number = ?");
187 # my $update_dependent_xref_sth = $dbi->prepare("update dependent_xref set object_xref_id = ? where master_xref_id = ? and dependent_xref_id =?");
188 
189  my $object_xref_id;
190  my $sth = $dbi->prepare("select max(object_xref_id) from object_xref");
191  $sth->execute();
192  $sth->bind_columns(\$object_xref_id);
193  $sth->fetch();
194  $sth->finish;
195  if(!defined($object_xref_id)){
196  $object_xref_id = 0;
197  }
198 
199 
200  my $object_xref_sth = $dbi->prepare("insert into object_xref (ensembl_id,ensembl_object_type, xref_id, linkage_type, ox_status ) values (?, ?, ?, ?, ?)");
201  my $object_xref_sth2 = $dbi->prepare("insert into object_xref (ensembl_id,ensembl_object_type, xref_id, linkage_type, ox_status, master_xref_id ) values (?, ?, ?, ?, ?, ?)");
202  my $get_object_xref_id_sth = $dbi->prepare("select object_xref_id from object_xref where ensembl_id = ? and ensembl_object_type = ? and xref_id = ? and linkage_type = ? and ox_status = ?");
203  my $get_object_xref_id_master_sth = $dbi->prepare("select object_xref_id from object_xref where ensembl_id = ? and ensembl_object_type = ? and xref_id = ? and linkage_type = ? and ox_status = ? and master_xref_id = ?");
204  local $object_xref_sth->{RaiseError}; #catch duplicates
205  local $object_xref_sth->{PrintError}; # cut down on error messages
206  local $object_xref_sth2->{RaiseError}; #catch duplicates
207  local $object_xref_sth2->{PrintError}; # cut down on error messages
208 
209  my $identity_xref_sth = $dbi->prepare("insert ignore into identity_xref (object_xref_id, query_identity, target_identity, hit_start, hit_end, translation_start, translation_end, cigar_line, score ) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
210 
211  my $ins_dep_ix_sth = $dbi->prepare("insert ignore into identity_xref (object_xref_id, query_identity, target_identity) values(?, ?, ?)");
212 
213  my $source_name_sth = $dbi->prepare("select s.name from xref x join source s using(source_id) where x.xref_id = ?");
214  my $biotype_sth = $dbi->prepare("select biotype from transcript_stable_id where internal_id = ?");
215 
216  my $last_query_id = 0;
217  my $best_match_found = 0;
218  my $best_identity = 0;
219  my $best_score = 0;
220 
221  my %mRNA_biotypes = ( 'protein_coding' => 1,
222  'TR_C_gene' => 1,
223  'IG_V_gene' => 1,
224  'nonsense_mediated_decay' => 1,
225  'polymorphic_pseudogene' => 1);
226 
227  my $first = 1;
228  while(<$mh>){
229  my $load_object_xref = 0;
230  $total_lines++;
231  chomp();
232  my ($label, $query_id, $target_id, $identity, $query_length, $target_length, $query_start, $query_end, $target_start, $target_end, $cigar_line, $score) = split(/:/, $_);
233 
234  if ($last_query_id != $query_id) {
235  $best_match_found = 0;
236  $best_score = 0;
237  $best_identity = 0;
238  } else {
239 
240  #ignore mappings with worse identity or score if we already found a good mapping
241  if ( ($identity < $best_identity || $score < $best_score) && $best_match_found) {
242  next;
243  }
244 
245  }
246 
247  if ($ensembl_type eq "Translation"){
248  $load_object_xref = 1;
249 
250  } else {
251 
252  #check if source name is RefSeq_ncRNA or RefSeq_mRNA
253  #if yes check biotype, if ok store object xref
254  $source_name_sth->execute($query_id);
255  my ($source_name) = $source_name_sth->fetchrow_array;
256 
257  if ($source_name && ($source_name =~ /^RefSeq_(m|nc)RNA/ || $source_name =~ /^miRBase/ || $source_name =~ /^RFAM/)) {
258 
259  #make sure mRNA xrefs are matched to protein_coding biotype only
260  $biotype_sth->execute($target_id);
261  my ($biotype) = $biotype_sth->fetchrow_array;
262 
263  if ($source_name =~ /^RefSeq_mRNA/ && exists($mRNA_biotypes{$biotype})) {
264  $load_object_xref = 1;
265  }
266  if ($source_name =~ /^RefSeq_ncRNA/ && !exists($mRNA_biotypes{$biotype}) ) {
267  $load_object_xref = 1;
268  }
269  if (($source_name =~ /miRBase/ || $source_name =~ /^RFAM/) && $biotype =~ /RNA/) {
270  $load_object_xref = 1;
271  }
272  } else {
273  $load_object_xref = 1;
274  }
275 
276  }
277 
278  $last_query_id = $query_id;
279 
280  if ($score > $best_score || $identity > $best_identity) {
281  $best_score = $score;
282  $best_identity = $identity;
283  }
284 
285  if (!$load_object_xref) {
286  next;
287  } else {
288  $best_match_found = 1;
289  }
290 
291  if(!defined($score)){
292  $end_sth->execute(($object_xref_id),$job_id, $array_number);
293  die "No score on line. Possible file corruption\n$_\n";
294  }
295 
296  # calculate percentage identities
297  my $query_identity = int (100 * $identity / $query_length);
298  my $target_identity = int (100 * $identity / $target_length);
299 
300  my $status = "DUMP_OUT";
301 # Only keep alignments where both sequences match cutoff
302  if($query_identity < $query_cutoff or $target_identity < $target_cutoff){
303  $status = "FAILED_CUTOFF";
304  }
305 
306  $object_xref_sth->execute($target_id, $ensembl_type, $query_id, 'SEQUENCE_MATCH', $status) ;
307  $get_object_xref_id_sth->execute($target_id, $ensembl_type, $query_id, 'SEQUENCE_MATCH', $status);
308  $object_xref_id = ($get_object_xref_id_sth->fetchrow_array())[0];
309  if($object_xref_sth->err){
310  my $err = $object_xref_sth->errstr;
311  if($err =~ /Duplicate/){
312  # can get match from same xref and ensembl entity e.g.
313  # ensembl/ExonerateGappedBest1_dna_569.map:xref:934818:155760:54:1617:9648:73:12:3456:3517: M 61:242
314  # ensembl/ExonerateGappedBest1_dna_569.map:xref:934818:151735:58:1617:10624:73:6:5329:5397: M 48 D 1 M 19:242
315  next;
316  }
317  else{
318  $end_sth->execute(($object_xref_id),$job_id, $array_number);
319  die "Problem loading error is $err\n";
320  }
321  }
322  if($first){
323  $start_sth->execute($object_xref_id,$job_id, $array_number);
324  $first = 0;
325  }
326 
327 
328 
329  $cigar_line =~ s/ //g;
330  $cigar_line =~ s/([MDI])(\d+)/$2$1/ig;
331 
332 
333  if(!$identity_xref_sth->execute($object_xref_id, $query_identity, $target_identity, $query_start+1, $query_end, $target_start+1, $target_end, $cigar_line, $score)){
334  $end_sth->execute(($object_xref_id),$job_id, $array_number);
335  die "Problem loading identity_xref";
336  }
337 
338  my @master_xref_ids;
339  push @master_xref_ids, $query_id;
340  while(my $master_xref_id = pop(@master_xref_ids)){
341  my ($dep_xref_id, $link);
342  $dep_sth->execute($master_xref_id);
343  $dep_sth->bind_columns(\$dep_xref_id, \$link);
344  while($dep_sth->fetch){
345  $object_xref_sth2->execute($target_id, $ensembl_type, $dep_xref_id, 'DEPENDENT', $status, $master_xref_id);
346  $get_object_xref_id_master_sth->execute($target_id, $ensembl_type, $dep_xref_id, 'DEPENDENT', $status, $master_xref_id);
347  $object_xref_id = ($get_object_xref_id_master_sth->fetchrow_array())[0];
348  if($object_xref_sth2->err){
349  my $err = $object_xref_sth->errstr;
350  if($err =~ /Duplicate/){
351 # $duplicate_dependent_count++;
352  next;
353  }
354  else{
355  $end_sth->execute($object_xref_id,$job_id, $array_number);
356  die "Problem loading error is $err\n";
357  }
358  }
359  if($object_xref_sth2->err){
360  print STDERR "WARNING: Should not reach here??? object_xref_id = $object_xref_id\n";
361  }
362 
363  $ins_dep_ix_sth->execute($object_xref_id, $query_identity, $target_identity);
364 # now store in object_xref $update_dependent_xref_sth->execute($object_xref_id, $master_xref_id, $dep_xref_id);
365 
366  push @master_xref_ids, $dep_xref_id; # get the dependent, dependents just in case
367  }
368  }
369 
370  }
371  close $mh;
372  $end_sth->execute($object_xref_id,$job_id, $array_number);
373  $start_sth->finish;
374  $end_sth->finish;
375  $dep_sth->finish;
376  $object_xref_sth->finish;
377  $identity_xref_sth->finish;
378  $ins_dep_ix_sth->finish;
379  $source_name_sth->finish;
380  $biotype_sth->finish;
381 
382  return $total_lines;
383 }
384 
385 
386 
387 
388 1;
XrefMapper::BasicMapper
Definition: BasicMapper.pm:8
XrefMapper::BasicMapper::core
public XrefMapper::db core()