9 A Semaphore
object is our
main instrument of fine-grained job control.
10 It is controlled (blocked) by a group of
"fan" jobs and remote semaphores and has
11 either a dependent local job or a dependent remote semaphore
12 that will be unblocked when both local_jobs_counter and remote_jobs_counter reach zeros.
16 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
17 Copyright [2016-2024] EMBL-European Bioinformatics Institute
19 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
20 You may obtain a copy of the License at
24 Unless required by applicable law or agreed to in writing, software distributed under the License
25 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26 See the License
for the specific language governing permissions and limitations under the License.
30 Please subscribe to the
Hive mailing list: http:
34 The rest of the documentation details each of the
object methods.
35 Internal methods are usually preceded with a _
40 package Bio::EnsEMBL::Hive::Semaphore;
47 use base (
'Bio::EnsEMBL::Hive::Storable' );
52 dependent_job_id / dependent_job
56 # ---------------------------------------------------------------------------
58 sub local_jobs_counter {
60 $self->{
'_local_jobs_counter'} = shift
if(@_);
61 return $self->{
'_local_jobs_counter'};
65 sub remote_jobs_counter {
67 $self->{
'_remote_jobs_counter'} = shift
if(@_);
68 return $self->{
'_remote_jobs_counter'};
72 sub dependent_semaphore_url {
74 $self->{
'_dependent_semaphore_url'} = shift
if(@_);
75 return $self->{
'_dependent_semaphore_url'};
78 # ---------------------------------------------------------------------------
80 sub dependent_semaphore {
83 if(my $dependent_semaphore_url = $self->dependent_semaphore_url) {
91 sub ultimate_dependent_job {
94 return $self->dependent_job || $self->dependent_semaphore->ultimate_dependent_job;
98 sub url_query_params {
102 'semaphore_id' => $self->dbID,
106 # ---------------------------------------------------------------------------
111 $self->adaptor->refresh( $self );
113 return ($self->local_jobs_counter + $self->remote_jobs_counter <= 0);
119 my $blocking_objects_or_local_delta = shift @_;
120 my $sign = shift @_ || 1;
122 if(my $semaphore_adaptor = $self->adaptor) {
124 my ($local_delta, $remote_delta) = ref($blocking_objects_or_local_delta)
125 ? $self->count_local_and_remote_objects( $blocking_objects_or_local_delta )
126 : ($blocking_objects_or_local_delta,0);
127 my $semaphore_id = $self->dbID;
130 $semaphore_adaptor->increment_column_by_inc_and_id(
'local_jobs_counter', $sign * $local_delta, $semaphore_id );
133 $semaphore_adaptor->increment_column_by_inc_and_id(
'remote_jobs_counter', $sign * $remote_delta, $semaphore_id );
137 die
"Local semaphore objects are not yet supported"; # but they could be, eventually!
144 my $blocking_objects_or_local_delta = shift @_;
146 my $was_ripe = $self->check_if_ripe;
148 $self->increase_by( $blocking_objects_or_local_delta );
152 if(my $dependent_job = $self->dependent_job) {
154 if(my $dependent_job_adaptor = $dependent_job->adaptor) {
155 $dependent_job_adaptor->semaphore_job_by_id( $dependent_job->dbID );
157 die
"Dependent job is expected to have a working JobAdaptor";
160 } elsif(my $dependent_semaphore = $self->dependent_semaphore) {
162 $dependent_semaphore->reblock( [ $self ] ); # recursion
165 warn
"The semaphore is not blocking anything, possibly the end of execution.\n";
171 sub fetch_my_raw_accu_data {
174 return $self->adaptor->db->get_AccumulatorAdaptor->fetch_all_by_receiving_semaphore_id( $self->dbID );
178 sub fetch_my_local_controlling_jobs {
181 return $self->adaptor->db->get_AnalysisJobAdaptor->fetch_all_by_controlled_semaphore_id( $self->dbID );
185 sub release_if_ripe {
188 if( $self->check_if_ripe ) {
190 if(my $dependent_job = $self->dependent_job) {
192 if(my $dependent_job_adaptor = $dependent_job->adaptor) {
193 $dependent_job_adaptor->unsemaphore_job_by_id( $dependent_job->dbID );
195 die
"Dependent job is expected to have a working JobAdaptor";
198 } elsif(my $dependent_semaphore = $self->dependent_semaphore) {
200 my $dependent_semaphore_adaptor = $dependent_semaphore->adaptor;
201 my $ocean_separated = $dependent_semaphore_adaptor->db ne $self->adaptor->db;
203 # pass the accumulated data here:
204 if(my $raw_accu_data = $self->fetch_my_raw_accu_data) {
205 foreach my $vector ( @$raw_accu_data ) {
206 $vector->{
'receiving_semaphore_id'} = $dependent_semaphore->dbID; # set the
new consumer
207 $vector->{
'sending_job_id'} = undef
if($ocean_separated); # dissociate from the sending job as it was local
209 $dependent_semaphore_adaptor->db->get_AccumulatorAdaptor->store( $raw_accu_data );
212 $dependent_semaphore_adaptor->increment_column_by_inc_and_id(
'remote_jobs_counter', -1, $dependent_semaphore->dbID );
214 $dependent_semaphore->release_if_ripe(); # recursion
217 warn
"The semaphore is not blocking anything, possibly the end of execution.\n";
225 my $blocking_objects_or_local_delta = shift @_;
227 $self->increase_by( $blocking_objects_or_local_delta, -1);
229 $self->release_if_ripe();