my $self = shift @_;
my $pipeline = shift @_;
$self->print_debug( "Adding hive_meta table entries ...\n" );
my $new_meta_entries = $self->hive_meta_table();
while( my ($meta_key, $meta_value) = each %$new_meta_entries ) {
$pipeline->add_new_or_update( 'MetaParameters', $self->o('hive_debug_init'),
'meta_key' => $meta_key,
'meta_value' => $meta_value,
);
}
$self->print_debug( "Done.\n\n" );
$self->print_debug( "Adding pipeline-wide parameters ...\n" );
my $new_pwp_entries = $self->pipeline_wide_parameters();
while( my ($param_name, $param_value) = each %$new_pwp_entries ) {
$pipeline->add_new_or_update( 'PipelineWideParameters', $self->o('hive_debug_init'),
'param_name' => $param_name,
'param_value' => stringify($param_value),
);
}
$self->print_debug( "Done.\n\n" );
$self->print_debug( "Adding Resources ...\n" );
my $resource_classes_hash = $self->resource_classes;
unless( exists $resource_classes_hash->{'default'} ) {
warn "\tNB:'default' resource class is not in the database (did you forget to inherit from SUPER::resource_classes ?) - creating it for you\n";
$resource_classes_hash->{'default'} = {};
}
my @resource_classes_order = sort { ($b eq 'default') or -($a eq 'default') or ($a cmp $b) } keys %$resource_classes_hash; # put 'default' to the front
my %cached_resource_classes = map {$_->name => $_} $pipeline->collection_of('ResourceClass')->list();
foreach my $rc_name (@resource_classes_order) {
if($rc_name=~/^\d+$/) {
die "-rc_id syntax is no longer supported, please use the new resource notation (-rc_name)";
}
my ($resource_class) = $pipeline->add_new_or_update( 'ResourceClass', # NB: add_new_or_update returns a list
'name' => $rc_name,
);
$cached_resource_classes{$rc_name} = $resource_class;
while( my($meadow_type, $resource_param_list) = each %{ $resource_classes_hash->{$rc_name} } ) {
$resource_param_list = [ $resource_param_list ] unless(ref($resource_param_list)); # expecting either a scalar or a 2-element array
my ($resource_description) = $pipeline->add_new_or_update( 'ResourceDescription', $self->o('hive_debug_init'), # NB: add_new_or_update returns a list
'resource_class' => $resource_class,
'meadow_type' => $meadow_type,
'submission_cmd_args' => $resource_param_list->[0],
'worker_cmd_args' => $resource_param_list->[1],
);
}
}
$self->print_debug( "Done.\n\n" );
my %seen_logic_name = ();
my %analyses_by_logic_name = map {$_->logic_name => $_} $pipeline->collection_of('Analysis')->list();
$self->print_debug( "Adding Analyses ...\n" );
foreach my $aha (@{$self->pipeline_analyses}) {
my %aha_copy = %$aha;
my ($logic_name, $module, $parameters_hash, $comment, $tags, $input_ids, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance,
$max_retry_count, $can_be_empty, $rc_id, $rc_name, $priority, $meadow_type, $analysis_capacity, $language, $wait_for, $flow_into)
= delete @aha_copy{qw(-logic_name -module -parameters -comment -tags -input_ids -blocked -batch_size -hive_capacity -failed_job_tolerance
-max_retry_count -can_be_empty -rc_id -rc_name -priority -meadow_type -analysis_capacity -language -wait_for -flow_into)}; # slicing a hash reference
my @unparsed_attribs = keys %aha_copy;
if(@unparsed_attribs) {
die "Could not parse the following analysis attributes: ".join(', ',@unparsed_attribs);
}
if( not $logic_name ) {
die "'-logic_name' must be defined in every analysis";
} elsif( $logic_name =~ /[+\-\%\.,]/ ) {
die "Characters + - % . , are no longer allowed to be a part of an Analysis name. Please rename Analysis '$logic_name' and try again.\n";
} elsif( looks_like_number($logic_name) ) {
die "Numeric Analysis names are not allowed because they may clash with dbIDs. Please rename Analysis '$logic_name' and try again.\n";
}
if($seen_logic_name{$logic_name}++) {
die "an entry with -logic_name '$logic_name' appears at least twice in the same configuration file, probably a typo";
}
if($rc_id) {
die "(-rc_id => $rc_id) syntax is deprecated, please use (-rc_name => 'your_resource_class_name')";
}
my $analysis = $analyses_by_logic_name{$logic_name}; # the analysis with this logic_name may have already been stored in the db
my $stats;
if( $analysis ) {
warn "Skipping creation of already existing analysis '$logic_name'.\n";
next;
} else {
$rc_name ||= 'default';
my $resource_class = $cached_resource_classes{$rc_name}
or die "Could not find local resource with name '$rc_name', please check that resource_classes() method of your PipeConfig either contains or inherits it from the parent class";
if ($meadow_type and not exists $amh->{$meadow_type}) {
warn "The meadow '$meadow_type' is currently not registered (analysis '$logic_name')\n";
}
$parameters_hash ||= {}; # in case nothing was given
die "'-parameters' has to be a hash" unless(ref($parameters_hash) eq 'HASH');
($analysis) = $pipeline->add_new_or_update( 'Analysis', $self->o('hive_debug_init'), # NB: add_new_or_update returns a list
'logic_name' => $logic_name,
'module' => $module,
'language' => $language,
'parameters' => $parameters_hash,
'comment' => $comment,
'tags' => ( (ref($tags) eq 'ARRAY') ? join(',', @$tags) : $tags ),
'resource_class' => $resource_class,
'failed_job_tolerance' => $failed_job_tolerance,
'max_retry_count' => $max_retry_count,
'can_be_empty' => $can_be_empty,
'priority' => $priority,
'meadow_type' => $meadow_type,
'analysis_capacity' => $analysis_capacity,
'hive_capacity' => $hive_capacity,
'batch_size' => $batch_size,
);
$analysis->get_compiled_module_name(); # check if it compiles and is named correctly
($stats) = $pipeline->add_new_or_update( 'AnalysisStats', $self->o('hive_debug_init'), # NB: add_new_or_update returns a list
'analysis' => $analysis,
'status' => $blocked ? 'BLOCKED' : 'EMPTY', # be careful, as this "soft" way of blocking may be accidentally unblocked by deep sync
'total_job_count' => 0,
'semaphored_job_count' => 0,
'ready_job_count' => 0,
'done_job_count' => 0,
'failed_job_count' => 0,
'num_running_workers' => 0,
'sync_lock' => 0,
);
}
# Keep a link to the analysis object to speed up the creation of control and dataflow rules
$analyses_by_logic_name{$logic_name} = $analysis;
# now create the corresponding jobs (if there are any):
if($input_ids) {
'prev_job' => undef, # these jobs are created by the initialization script, not by another job
'analysis' => $analysis,
'input_id' => $_, # input_ids are now centrally stringified in the AnalysisJob itself
) } @$input_ids;
unless( $pipeline->hive_use_triggers() ) {
$stats->recalculate_from_job_counts( { 'READY' => scalar(@$input_ids) } );
}
}
}
$self->print_debug( "Done.\n\n" );
$self->print_debug( "Adding Control and Dataflow Rules ...\n" );
foreach my $aha (@{$self->pipeline_analyses}) {
my ($logic_name, $wait_for, $flow_into)
= @{$aha}{qw(-logic_name -wait_for -flow_into)}; # slicing a hash reference
my $analysis = $analyses_by_logic_name{$logic_name};
if($wait_for) {
}
if($flow_into) {
}
}
$self->print_debug( "Done.\n\n" );
# Block the analyses that should be blocked
$self->print_debug( "Blocking the analyses that should be ...\n" );
foreach my $stats ($pipeline->collection_of('AnalysisStats')->list()) {
$stats->check_blocking_control_rules('no_die');
$stats->determine_status();
}
$self->print_debug( "Done.\n\n" );
}