9 A Valley represents a collection of available Meadows.
11 Certain methods fit better with the concept of Valley -
12 such as identifying all dead workers, or killing a particular one given worker_id.
16 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
17 Copyright [2016-2024] EMBL-European Bioinformatics Institute
19 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
20 You may obtain a copy of the License at
24 Unless required by applicable law or agreed to in writing, software distributed under the License
25 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26 See the License
for the specific language governing permissions and limitations under the License.
30 Please subscribe to the
Hive mailing list: http:
35 package Bio::EnsEMBL::Hive::Valley;
39 use List::Util (
'sum');
40 use Sys::Hostname (
'hostname');
44 use base (
'Bio::EnsEMBL::Hive::Configurable');
47 sub meadow_class_path {
49 return 'Bio::EnsEMBL::Hive::Meadow';
53 our $_loaded_meadow_drivers;
55 sub loaded_meadow_drivers {
57 unless( $_loaded_meadow_drivers ) {
59 eval
"require $meadow_class";
60 die $@
if($@); # Even
if the
Meadow is unavailable, we still expect all the drivers that are in the path to compile correctly.
63 return $_loaded_meadow_drivers;
68 my ($class, $config, $default_meadow_type, $pipeline_name) = @_;
70 my $self = bless {}, $class;
73 $self->context( [
'Valley' ] );
75 my $amh = $self->available_meadow_hash( {} );
77 # make sure modules are loaded and available ones are checked prior to setting the current one:
78 foreach my $meadow_class (@{ $self->loaded_meadow_drivers }) {
80 if( $meadow_class->check_version_compatibility
81 and (my $name = $meadow_class->name)) { # the assumption is
if we can get a name, it is available
83 my $meadow_object = $meadow_class->new( $config, $name );
85 $meadow_object->pipeline_name( $pipeline_name )
if($pipeline_name);
87 $amh->{$meadow_class->type} = $meadow_object;
91 $self->set_default_meadow_type($default_meadow_type); #
run this method even
if $default_meadow_type was not specified
97 sub available_meadow_hash {
101 $self->{_available_meadow_hash} = shift @_;
103 return $self->{_available_meadow_hash};
107 sub get_available_meadow_list { #
this beautiful one-liner pushes $local to the bottom of the list
110 my $local = $self->meadow_class_path .
'::LOCAL';
112 return [ sort { (ref($a) eq $local) or -(ref($b) eq $local) } values %{ $self->available_meadow_hash } ];
116 sub set_default_meadow_type {
117 my ($self, $default_meadow_type) = @_;
119 if($default_meadow_type) {
120 if( my $default_meadow = $self->available_meadow_hash->{$default_meadow_type} ) { # store
if available
121 $self->{_default_meadow} = $default_meadow;
123 die
"Meadow '$default_meadow_type' does not seem to be available on this machine, please investigate";
126 $self->{_default_meadow} = $self->get_available_meadow_list->[0]; # take the first from preference list
131 sub get_default_meadow {
134 return $self->{_default_meadow};
138 sub find_available_meadow_responsible_for_worker {
139 my ($self, $worker) = @_;
141 if( my $meadow = $self->available_meadow_hash->{$worker->meadow_type} ) {
142 if($meadow->cached_name eq $worker->meadow_name) {
155 foreach my $meadow (@{ $self->get_available_meadow_list }) {
159 # get_current_worker_process_id() is expected to die if the pid
160 # cannot be determined. With the eval{} and the unless{} it will
161 # skip the meadow and try the next one.
162 $pid = $meadow->get_current_worker_process_id();
163 $meadow_host = $meadow->get_current_hostname();
166 return ($meadow, $pid, $meadow_host, $meadow_user);
169 die
"Could not determine the Meadow, please investigate";
173 sub generate_limiters {
174 my ($self, $reconciled_worker_statuses) = @_;
176 my $valley_running_worker_count = 0;
177 my %meadow_capacity_limiter_hashed_by_type = ();
179 foreach my $meadow (@{ $self->get_available_meadow_list }) {
180 my $this_worker_count = scalar( @{ $reconciled_worker_statuses->{ $meadow->signature }{
'RUN' } || [] } );
182 $valley_running_worker_count += $this_worker_count;
184 my $available_worker_slots = defined($meadow->config_get(
'TotalRunningWorkersMax'))
185 ? $meadow->config_get(
'TotalRunningWorkersMax') - $this_worker_count
188 # so the hash will contain limiters for every meadow_type, but not all of them active:
189 $meadow_capacity_limiter_hashed_by_type{ $meadow->type } =
Bio::EnsEMBL::Hive::Limiter->
new(
"Number of workers in '".$meadow->signature.
"' meadow", $available_worker_slots );
192 return ($valley_running_worker_count, \%meadow_capacity_limiter_hashed_by_type);
196 =head2 query_worker_statuses
198 Arg[1] : Hashref {meadow_type}{meadow_name}{meadow_user}{process_id} => $db_status
199 Output : Hashref {meadow_signature}{meadow_status} => [process_ids]
201 Description : Queries the available meadows to get the (meadow) status of the given workers
205 sub query_worker_statuses {
206 my ($self, $db_registered_workers_from_all_meadows_deemed_alive) = @_;
208 my %reconciled_worker_statuses = ();
210 foreach my $meadow (@{ $self->get_available_meadow_list }) { # only go through the available meadows
211 my $db_registered_workers_this_meadow = $db_registered_workers_from_all_meadows_deemed_alive->{$meadow->type}{$meadow->cached_name};
212 my $involved_users = [keys %$db_registered_workers_this_meadow];
214 next unless @$involved_users;
216 my %meadow_seen_worker_status =
map { ( $_->[0] => $_->[2] ) } @{ $meadow->status_of_all_our_workers( $involved_users ) };
218 my $worker_statuses_of_this_meadow = $reconciled_worker_statuses{ $meadow->signature } = {}; # manually vivify every Meadow
's subhash
220 while(my ($meadow_user, $db_user_subhash) = each %$db_registered_workers_this_meadow) { # start the reconciliation from the DB view and check it against Meadow view
221 while(my ($worker_pid, $db_worker_status) = each %$db_user_subhash) {
222 my $combined_status = $meadow_seen_worker_status{$worker_pid}
223 // ( ($db_worker_status=~/^(?:SUBMITTED|DEAD)$/) ? $db_worker_status : 'LOST
' );
225 push @{ $worker_statuses_of_this_meadow->{ $combined_status } }, $worker_pid;
229 return \%reconciled_worker_statuses;
233 sub status_of_all_our_workers_by_meadow_signature {
234 my ($self, $reconciled_worker_statuses) = @_;
236 my %signature_and_pid_to_worker_status = ();
237 foreach my $meadow (@{ $self->get_available_meadow_list }) {
238 my $meadow_signature = $meadow->signature;
239 $signature_and_pid_to_worker_status{ $meadow_signature } = {};
241 my $status_2_pid_list = $reconciled_worker_statuses->{ $meadow_signature };
242 while(my ($status, $pid_list) = each %$status_2_pid_list) {
243 $signature_and_pid_to_worker_status{$meadow_signature}{$_} = $status for @$pid_list;
246 return \%signature_and_pid_to_worker_status;