ensembl-hive  2.5
Accumulator.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  A data container object that defines parameters for accumulated dataflow.
10  This object is generated from specially designed datalow URLs.
11 
12 =head1 LICENSE
13 
14  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
15  Copyright [2016-2022] EMBL-European Bioinformatics Institute
16 
17  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
18  You may obtain a copy of the License at
19 
20  http://www.apache.org/licenses/LICENSE-2.0
21 
22  Unless required by applicable law or agreed to in writing, software distributed under the License
23  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24  See the License for the specific language governing permissions and limitations under the License.
25 
26 =head1 CONTACT
27 
28  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
29 
30 =cut
31 
32 
33 package Bio::EnsEMBL::Hive::Accumulator;
34 
35 use strict;
36 use warnings;
37 
38 use Bio::EnsEMBL::Hive::Utils ('stringify');
39 
40 use base ( 'Bio::EnsEMBL::Hive::Storable' );
41 
42 
43 sub unikey { # override the default from Cacheable parent
44  return [ 'accu_name', 'accu_address', 'accu_input_variable' ];
45 }
46 
47 
48 sub accu_name {
49  my $self = shift @_;
50 
51  if(@_) {
52  $self->{'_accu_name'} = shift @_;
53  }
54  return $self->{'_accu_name'};
55 }
56 
57 
58 sub accu_address {
59  my $self = shift @_;
60 
61  if(@_) {
62  $self->{'_accu_address'} = shift @_;
63  }
64  return ( $self->{'_accu_address'} // '' );
65 }
66 
67 
68 sub accu_input_variable {
69  my $self = shift @_;
70 
71  if(@_) {
72  $self->{'_accu_input_variable'} = shift @_;
73  }
74  return ( $self->{'_accu_input_variable'} // $self->accu_name );
75 }
76 
77 
78 sub url_query_params {
79  my ($self) = @_;
80 
81  return { # direct access to the actual (possibly missing) values
82  'accu_name' => $self->accu_name,
83  'accu_address' => $self->{'_accu_address'},
84  'accu_input_variable' => $self->{'_accu_input_variable'},
85  };
86 }
87 
88 
89 sub display_name {
90  my ($self) = @_;
91  return $self->accu_name
92  . $self->accu_address
93  . ':='
94  . $self->accu_input_variable;
95 }
96 
97 
98 sub dataflow {
99  my ( $self, $output_ids, $emitting_job ) = @_;
100 
101  if(my $receiving_semaphore = $emitting_job->controlled_semaphore) {
102 
103  my $sending_job_id = $emitting_job->dbID;
104  my $receiving_semaphore_id = $receiving_semaphore->dbID;
105  my $accu_adaptor = $receiving_semaphore->adaptor->db->get_AccumulatorAdaptor;
106 
107  my $accu_name = $self->accu_name;
108  my $accu_address = $self->accu_address;
109  my $accu_input_variable = $self->accu_input_variable;
110 
111  my @rows = ();
112 
113  foreach my $output_id (@$output_ids) {
114 
115  my $key_signature = $accu_address;
116  $key_signature=~s/(\w+)/$emitting_job->_param_possibly_overridden($1,$output_id)/eg;
117 
118  push @rows, {
119  'sending_job_id' => $sending_job_id,
120  'receiving_semaphore_id' => $receiving_semaphore_id,
121  'struct_name' => $accu_name,
122  'key_signature' => $key_signature,
123  'value' => stringify( $emitting_job->_param_possibly_overridden($accu_input_variable, $output_id) ),
124  };
125  }
126 
127  $accu_adaptor->store( \@rows );
128 
129  } else {
130  die "No controlled semaphore, cannot perform accumulated dataflow";
131  }
132 }
133 
134 
135 sub toString {
136  my $self = shift @_;
137 
138  return 'Accumulator(' . $self->display_name . ')';
139 }
140 
141 1;
142