ensembl-hive  2.6
Semaphore.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  A Semaphore object is our main instrument of fine-grained job control.
10  It is controlled (blocked) by a group of "fan" jobs and remote semaphores and has
11  either a dependent local job or a dependent remote semaphore
12  that will be unblocked when both local_jobs_counter and remote_jobs_counter reach zeros.
13 
14 =head1 LICENSE
15 
16  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
17  Copyright [2016-2024] EMBL-European Bioinformatics Institute
18 
19  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
20  You may obtain a copy of the License at
21 
22  http://www.apache.org/licenses/LICENSE-2.0
23 
24  Unless required by applicable law or agreed to in writing, software distributed under the License
25  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26  See the License for the specific language governing permissions and limitations under the License.
27 
28 =head1 CONTACT
29 
30  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
31 
32 =head1 APPENDIX
33 
34  The rest of the documentation details each of the object methods.
35  Internal methods are usually preceded with a _
36 
37 =cut
38 
39 
40 package Bio::EnsEMBL::Hive::Semaphore;
41 
42 use strict;
43 use warnings;
44 
46 
47 use base ( 'Bio::EnsEMBL::Hive::Storable' );
48 
49 
50 =head1 AUTOLOADED
51 
52  dependent_job_id / dependent_job
53 
54 =cut
55 
56 # ---------------------------------------------------------------------------
57 
58 sub local_jobs_counter {
59  my $self = shift;
60  $self->{'_local_jobs_counter'} = shift if(@_);
61  return $self->{'_local_jobs_counter'};
62 }
63 
64 
65 sub remote_jobs_counter {
66  my $self = shift;
67  $self->{'_remote_jobs_counter'} = shift if(@_);
68  return $self->{'_remote_jobs_counter'};
69 }
70 
71 
72 sub dependent_semaphore_url {
73  my $self = shift;
74  $self->{'_dependent_semaphore_url'} = shift if(@_);
75  return $self->{'_dependent_semaphore_url'};
76 }
77 
78 # ---------------------------------------------------------------------------
79 
80 sub dependent_semaphore {
81  my $self = shift @_;
82 
83  if(my $dependent_semaphore_url = $self->dependent_semaphore_url) {
84  return Bio::EnsEMBL::Hive::TheApiary->find_by_url( $dependent_semaphore_url );
85  } else {
86  return undef;
87  }
88 }
89 
90 
91 sub ultimate_dependent_job {
92  my $self = shift @_;
93 
94  return $self->dependent_job || $self->dependent_semaphore->ultimate_dependent_job;
95 }
96 
97 
98 sub url_query_params {
99  my ($self) = @_;
100 
101  return {
102  'semaphore_id' => $self->dbID,
103  };
104 }
105 
106 # ---------------------------------------------------------------------------
107 
108 sub check_if_ripe {
109  my $self = shift @_;
110 
111  $self->adaptor->refresh( $self );
112 
113  return ($self->local_jobs_counter + $self->remote_jobs_counter <= 0);
114 }
115 
116 
117 sub increase_by {
118  my $self = shift @_;
119  my $blocking_objects_or_local_delta = shift @_;
120  my $sign = shift @_ || 1;
121 
122  if(my $semaphore_adaptor = $self->adaptor) {
123 
124  my ($local_delta, $remote_delta) = ref($blocking_objects_or_local_delta)
125  ? $self->count_local_and_remote_objects( $blocking_objects_or_local_delta )
126  : ($blocking_objects_or_local_delta,0);
127  my $semaphore_id = $self->dbID;
128 
129  if($local_delta) {
130  $semaphore_adaptor->increment_column_by_inc_and_id( 'local_jobs_counter', $sign * $local_delta, $semaphore_id );
131  }
132  if($remote_delta) {
133  $semaphore_adaptor->increment_column_by_inc_and_id( 'remote_jobs_counter', $sign * $remote_delta, $semaphore_id );
134  }
135 
136  } else {
137  die "Local semaphore objects are not yet supported"; # but they could be, eventually!
138  }
139 }
140 
141 
142 sub reblock_by {
143  my $self = shift @_;
144  my $blocking_objects_or_local_delta = shift @_;
145 
146  my $was_ripe = $self->check_if_ripe;
147 
148  $self->increase_by( $blocking_objects_or_local_delta );
149 
150  if( $was_ripe ) {
151 
152  if(my $dependent_job = $self->dependent_job) {
153 
154  if(my $dependent_job_adaptor = $dependent_job->adaptor) {
155  $dependent_job_adaptor->semaphore_job_by_id( $dependent_job->dbID );
156  } else {
157  die "Dependent job is expected to have a working JobAdaptor";
158  }
159 
160  } elsif(my $dependent_semaphore = $self->dependent_semaphore) {
161 
162  $dependent_semaphore->reblock( [ $self ] ); # recursion
163 
164  } else {
165  warn "The semaphore is not blocking anything, possibly the end of execution.\n";
166  }
167  }
168 }
169 
170 
171 sub fetch_my_raw_accu_data {
172  my $self = shift @_;
173 
174  return $self->adaptor->db->get_AccumulatorAdaptor->fetch_all_by_receiving_semaphore_id( $self->dbID );
175 }
176 
177 
178 sub fetch_my_local_controlling_jobs {
179  my $self = shift @_;
180 
181  return $self->adaptor->db->get_AnalysisJobAdaptor->fetch_all_by_controlled_semaphore_id( $self->dbID );
182 }
183 
184 
185 sub release_if_ripe {
186  my $self = shift @_;
187 
188  if( $self->check_if_ripe ) {
189 
190  if(my $dependent_job = $self->dependent_job) {
191 
192  if(my $dependent_job_adaptor = $dependent_job->adaptor) {
193  $dependent_job_adaptor->unsemaphore_job_by_id( $dependent_job->dbID );
194  } else {
195  die "Dependent job is expected to have a working JobAdaptor";
196  }
197 
198  } elsif(my $dependent_semaphore = $self->dependent_semaphore) {
199 
200  my $dependent_semaphore_adaptor = $dependent_semaphore->adaptor;
201  my $ocean_separated = $dependent_semaphore_adaptor->db ne $self->adaptor->db;
202 
203  # pass the accumulated data here:
204  if(my $raw_accu_data = $self->fetch_my_raw_accu_data) {
205  foreach my $vector ( @$raw_accu_data ) {
206  $vector->{'receiving_semaphore_id'} = $dependent_semaphore->dbID; # set the new consumer
207  $vector->{'sending_job_id'} = undef if($ocean_separated); # dissociate from the sending job as it was local
208  }
209  $dependent_semaphore_adaptor->db->get_AccumulatorAdaptor->store( $raw_accu_data );
210  }
211 
212  $dependent_semaphore_adaptor->increment_column_by_inc_and_id( 'remote_jobs_counter', -1, $dependent_semaphore->dbID );
213 
214  $dependent_semaphore->release_if_ripe(); # recursion
215 
216  } else {
217  warn "The semaphore is not blocking anything, possibly the end of execution.\n";
218  }
219  }
220 }
221 
222 
223 sub decrease_by {
224  my $self = shift @_;
225  my $blocking_objects_or_local_delta = shift @_;
226 
227  $self->increase_by( $blocking_objects_or_local_delta, -1);
228 
229  $self->release_if_ripe();
230 }
231 
232 1;
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::Semaphore
Definition: Semaphore.pm:13
main
public main()
Bio::EnsEMBL::Hive::TheApiary::find_by_url
public find_by_url()
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::TheApiary
Definition: TheApiary.pm:16