9 The base
class for all other Object- or NakedTable- adaptors.
10 Performs the low-level SQL needed to retrieve and store data in tables.
12 =head1 EXTERNAL DEPENDENCIES
18 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
19 Copyright [2016-2022] EMBL-European Bioinformatics Institute
21 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
22 You may obtain a copy of the License at
26 Unless required by applicable law or agreed to in writing, software distributed under the License
27 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28 See the License
for the specific language governing permissions and limitations under the License.
32 Please subscribe to the
Hive mailing list: http:
37 package Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor;
41 no strict
'refs'; # needed to allow AUTOLOAD create
new methods
42 use DBI 1.6; # the 1.6 functionality is important
for detecting autoincrement fields and other magic.
47 sub default_table_name {
48 throw(
"Please define table_name either by setting it via table_name() method or by redefining default_table_name() in your adaptor class");
52 sub default_insertion_method {
57 sub default_overflow_limit {
59 # 'overflow_column1_name' => column1_size, 60 # 'overflow_column2_name' => column2_size, 65 sub default_input_column_mapping {
67 # 'original_column1' => "original_column1*10 AS c1_times_ten", 68 # 'original_column2' => "original_column2+1 AS c2_plus_one", 73 sub do_not_update_columns {
77 # --------------------------------------------------------------------------- 83 my $self = bless {}, $class;
85 if ( !defined $dbobj || !ref $dbobj ) {
86 throw(
"Don't have a db [$dbobj] for new adaptor");
91 } elsif( UNIVERSAL::can($dbobj,
'dbc') ) {
92 $self->dbc( $dbobj->dbc );
95 throw(
"I was given [$dbobj] for a new adaptor");
100 foreach my $option_name (keys %options) {
101 if( UNIVERSAL::can( $self, $option_name ) ) {
102 if(defined(my $option_value =
delete $options{ $option_name })) {
103 $self->$option_name( $option_value );
116 $self->{_db} = shift @_;
126 $self->{_dbc} = shift @_;
128 return $self->{_dbc};
133 my ( $self, $sql ) = @_;
135 # Uncomment next line to cancel caching on the SQL side. 136 # Needed for timing comparisons etc. 137 #$sql =~ s/SELECT/SELECT SQL_NO_CACHE/i; 139 return $self->dbc->prepare($sql);
147 $self->{_overflow_limit} = shift @_;
149 return $self->{_overflow_limit} || $self->default_overflow_limit();
153 sub input_column_mapping {
157 $self->{_input_column_mapping} = shift @_;
159 return $self->{_input_column_mapping} || $self->default_input_column_mapping();
167 $self->{_table_name} = shift @_;
168 $self->_table_info_loader();
170 return $self->{_table_name} || $self->default_table_name();
174 sub insertion_method {
178 $self->{_insertion_method} = shift @_;
180 return $self->{_insertion_method} || $self->default_insertion_method();
188 $self->{_column_set} = shift @_;
189 } elsif( !defined( $self->{_column_set} ) ) {
190 $self->_table_info_loader();
192 return $self->{_column_set};
196 sub primary_key { # not necessarily
auto-incrementing
200 $self->{_primary_key} = shift @_;
201 } elsif( !defined( $self->{_primary_key} ) ) {
202 $self->_table_info_loader();
204 return $self->{_primary_key};
208 sub updatable_column_list { # it
's just a cashed view, you cannot set it directly 211 unless($self->{_updatable_column_list}) { 212 my %primary_key_set = map { $_ => 1 } @{$self->primary_key}; 213 my %non_updatable_set = map { $_ => 1 } @{$self->do_not_update_columns}; 214 my $column_set = $self->column_set(); 215 $self->{_updatable_column_list} = [ grep { not ($primary_key_set{$_} || $non_updatable_set{$_}) } keys %$column_set ]; 217 return $self->{_updatable_column_list}; 225 $self->{_autoinc_id} = shift @_; 226 } elsif( !defined( $self->{_autoinc_id} ) ) { 227 $self->_table_info_loader(); 229 return $self->{_autoinc_id}; 233 sub _table_info_loader { 236 my $dbc = $self->dbc(); 237 my $driver = $dbc->driver(); 238 my $dbname = $dbc->dbname(); 239 my $table_name = $self->table_name(); 243 my @primary_key = $dbc->primary_key(undef, undef, $table_name); 245 my $sth = $dbc->column_info(undef, undef, $table_name, '%
'); 247 while (my $row = $sth->fetchrow_hashref()) { 248 my ( $column_name, $column_type ) = @$row{'COLUMN_NAME
', 'TYPE_NAME
'}; 250 # warn "ColumnInfo [$table_name/$column_name] = $column_type\n"; 252 $column_set{$column_name} = $column_type; 254 if( ($column_name eq $table_name.'_id
') 255 or ($table_name eq 'analysis_base
' and $column_name eq 'analysis_id
') ) { # a special case (historical) 256 $autoinc_id = $column_name; 261 $self->column_set( \%column_set ); 262 $self->primary_key( \@primary_key ); 263 $self->autoinc_id( $autoinc_id ); 268 my ($self, $constraint, $key_list, @bind_values) = @_; 270 my $table_name = $self->table_name(); 271 my $driver = $self->dbc->driver(); 272 my $count_col_name = $driver eq 'pgsql
' ? 'count
' : 'COUNT(*)
'; 274 my $sql = "SELECT ".($key_list ? join(',
', @$key_list, '') : '')."COUNT(*) FROM $table_name"; 277 # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front: 278 $sql .= (($constraint=~/\bJOIN\b/i) ? ' ' : ' WHERE
') . $constraint; 282 $sql .= " GROUP BY ".join(',
', @$key_list); 284 # warn "SQL: $sql\n"; 286 my $sth = $self->prepare($sql); 287 $sth->execute(@bind_values); 289 my $result_struct; # will be autovivified to the correct data structure 291 while(my $hashref = $sth->fetchrow_hashref) { 293 my $pptr = \$result_struct; 295 foreach my $syll (@$key_list) { 296 $pptr = \$$pptr->{$hashref->{$syll}}; # using pointer-to-pointer to enforce same-level vivification 299 $$pptr = $hashref->{$count_col_name}; 302 unless(defined($result_struct)) { 303 if($key_list and scalar(@$key_list)) { 310 return $result_struct; 315 my ($self, $constraint, $one_per_key, $key_list, $value_column) = @_; 317 my $table_name = $self->table_name(); 318 my $input_column_mapping = $self->input_column_mapping(); 320 my $sql = 'SELECT
' . join(',
', map { $input_column_mapping->{$_} // "$table_name.$_" } keys %{$self->column_set()}) . " FROM $table_name"; 323 # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front: 324 $sql .= (($constraint=~/\bJOIN\b/i or $constraint=~/^LIMIT|ORDER|GROUP/) ? ' ' : ' WHERE
') . $constraint; 327 # warn "SQL: $sql\n"; 329 my $sth = $self->prepare($sql); 332 my @overflow_columns = keys %{ $self->overflow_limit() }; 333 my $overflow_adaptor = scalar(@overflow_columns) && $self->db->get_AnalysisDataAdaptor(); 335 my $result_struct; # will be autovivified to the correct data structure 337 while(my $hashref = $sth->fetchrow_hashref) { 339 foreach my $overflow_key (@overflow_columns) { 340 if($hashref->{$overflow_key} =~ /^_ext(?:\w+)_data_id (\d+)$/) { 341 $hashref->{$overflow_key} = $overflow_adaptor->fetch_by_analysis_data_id_TO_data($1); 345 my $pptr = \$result_struct; 347 foreach my $syll (@$key_list) { 348 $pptr = \$$pptr->{$hashref->{$syll}}; # using pointer-to-pointer to enforce same-level vivification 351 my $object = $value_column 352 ? ( (ref($value_column) eq 'ARRAY
') 353 ? { map { ($_ => $hashref->{$_}) } @$value_column } # project to a subhash 354 : $hashref->{$value_column} # project to just one field 356 : $self->objectify($hashref); # keep the whole object 359 $$pptr = $object; # just return the one value (either the key_list is unique or override) 361 push @$$pptr, $object; # return a list of values that potentially share the same key_list 366 unless(defined($result_struct)) { 367 if($key_list and scalar(@$key_list)) { 369 } elsif(!$one_per_key) { 374 return $result_struct; # either listref or hashref is returned, depending on the call parameters 378 sub primary_key_constraint { 380 my $sliceref = shift @_; 382 my $primary_key = $self->primary_key(); # Attention: the order of primary_key columns of your call should match the order in the table definition! 385 return join (' AND
', map { $primary_key->[$_]."='".$sliceref->[$_]."'" } (0..scalar(@$primary_key)-1)); 387 my $table_name = $self->table_name(); 388 throw("Table '$table_name
' doesn't have a primary_key
"); 394 my $self = shift @_; # the rest in @_ should be primary_key column values 396 return $self->fetch_all( $self->primary_key_constraint( \@_ ), 1 ); 400 sub remove_all { # remove entries by a constraint 402 my $constraint = shift @_ || 1; 404 my $table_name = $self->table_name(); 406 my $sql = "DELETE FROM $table_name WHERE $constraint
"; 407 my $sth = $self->prepare($sql); 413 sub remove { # remove the object by primary_key 415 my $object = shift @_; 417 # the object hasn't actually been stored yet / in this database 418 return if(UNIVERSAL::can($object, 'adaptor') and (!$object->adaptor or $object->adaptor != $self)); 420 my $primary_key_constraint = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) ); 422 return $self->remove_all( $primary_key_constraint ); 426 sub update { # update (some or all) non_primary columns from the primary 428 my $object = shift @_; # the rest in @_ should be the column names to be updated 430 my $table_name = $self->table_name(); 431 my $primary_key_constraint = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) ); 432 my $columns_to_update = scalar(@_) ? \@_ : $self->updatable_column_list(); 433 my $values_to_update = $self->slicer( $object, $columns_to_update ); 435 unless(@$columns_to_update) { 436 throw("There are no dependent columns to update, as everything seems to belong to the primary key
"); 439 my @placeholders = (); 441 foreach my $idx (0..scalar(@$columns_to_update)-1) { 442 my ($column_name, $value) = ($columns_to_update->[$idx], $values_to_update->[$idx]); 444 if($column_name =~ /^when_/ and defined($value) and $value eq 'CURRENT_TIMESTAMP') { 445 push @placeholders, $column_name.'=CURRENT_TIMESTAMP'; 447 push @placeholders, $column_name.'=?'; 448 push @values, $value; 452 my $sql = "UPDATE $table_name SET
".join(', ', @placeholders)." WHERE $primary_key_constraint
"; 453 # warn "SQL: $sql\n
"; 454 my $sth = $self->prepare($sql); 455 # warn "VALUES_TO_UPDATE:
".join(', ', map { "'$_'" } @values)."\n
"; 456 $sth->execute( @values); 462 sub store_or_update_one { 463 my ($self, $object, $filter_columns) = @_; 466 if(UNIVERSAL::can($object, 'adaptor') and $object->adaptor and $object->adaptor==$self) { # looks like it has been previously stored 467 if( @{ $self->primary_key() } and @{ $self->updatable_column_list() } ) { 468 $self->update( $object ); 469 #warn "store_or_update_one: updated [
".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n
"; 471 #warn "store_or_update_one: non-updatable [
".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n
"; 473 } elsif( my $present = $self->check_object_present_in_db_by_content( $object, $filter_columns ) ) { 474 $self->mark_stored($object, $present); 475 #warn "store_or_update_one: found [
".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."] in db by content of (
".join(', ', @$filter_columns).")\n
"; 476 if( @{ $self->primary_key() } and @{ $self->updatable_column_list() } ) { 477 #warn "store_or_update_one: updating the columns (
".join(', ', @{ $self->updatable_column_list() }).")\n
"; 478 $self->update( $object ); 481 $self->store( $object ); 482 #warn "store_or_update_one: stored [
".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n
"; 487 sub check_object_present_in_db_by_content { # return autoinc_id/undef if the table has autoinc_id or just 1/undef if not 488 my ( $self, $object, $filter_columns ) = @_; 490 my $table_name = $self->table_name(); 491 my $column_set = $self->column_set(); 492 my $autoinc_id = $self->autoinc_id(); 494 if($filter_columns) { 495 # make sure all fields exist in the database as columns: 496 $filter_columns = [ map { $column_set->{$_} ? $_ : $_.'_id' } @$filter_columns ]; 498 # we look for identical contents, so must skip the autoinc_id columns when fetching: 499 $filter_columns = [ grep { $_ ne $autoinc_id } keys %$column_set ]; 502 @filter_hash{ @$filter_columns } = @{ $self->slicer( $object, $filter_columns ) }; 504 my @constraints = (); 506 while(my ($column, $value) = each %filter_hash ) { 507 if( defined($value) ) { 508 push @constraints, "$column = ?
"; 509 push @values, $value; 511 push @constraints, "$column IS NULL
"; 515 my $sql = 'SELECT '.($autoinc_id or 1)." FROM $table_name WHERE
". join(' AND ', @constraints); 516 my $sth = $self->prepare( $sql ); 517 $sth->execute( @values ); 519 my ($return_value) = $sth->fetchrow_array(); 520 #warn "check_object_present_in_db_by_content: sql= $sql WITH VALUES (
".join(', ', @values).") ---> return_value=
".($return_value//'undef')."\n
"; 523 return $return_value; 527 sub class_specific_execute { 528 my ($self, $object, $sth, $values) = @_; 530 my $return_code = $sth->execute( @$values ); 537 my ($self, $object_or_list) = @_; 539 my $objects = (ref($object_or_list) eq 'ARRAY') # ensure we get an array of objects to store 541 : [ $object_or_list ]; 542 return ([], 0) unless(scalar(@$objects)); 544 my $table_name = $self->table_name(); 545 my $autoinc_id = $self->autoinc_id(); 546 my $all_storable_columns = [ grep { $_ ne $autoinc_id } keys %{ $self->column_set() } ]; 547 my $driver = $self->dbc->driver(); 548 my $insertion_method = $self->insertion_method; # INSERT, INSERT_IGNORE or REPLACE 549 $insertion_method =~ s/_/ /g; 550 if($driver eq 'sqlite') { 551 $insertion_method =~ s/INSERT IGNORE/INSERT OR IGNORE/ig; 552 } elsif($driver eq 'pgsql') { 553 # Rules have been created to mimic the behaviour INSERT IGNORE / REPLACE 554 # Here we can do fall-back to a standard INSERT 555 $insertion_method = 'INSERT'; 558 my %hashed_sth = (); # do not prepare statements until there is a real need 560 my $stored_this_time = 0; 562 foreach my $object (@$objects) { 563 my ($columns_being_stored, $column_key) = $self->keys_to_columns($object); 564 # warn "COLUMN_KEY=
'$column_key'\n
"; 568 # only prepare (once!) if we get here: 569 unless($this_sth = $hashed_sth{$column_key}) { 570 # By using question marks we can insert true NULLs by setting corresponding values to undefs: 571 my $sql = "$insertion_method INTO $table_name (
".join(', ', @$columns_being_stored).') VALUES ('.join(',', (('?') x scalar(@$columns_being_stored))).')'; 572 # warn "STORE: $sql\n
"; 573 $this_sth = $hashed_sth{$column_key} = $self->prepare( $sql ) or throw("Could not prepare statement: $sql
"); 576 # warn "STORED_COLUMNS:
".stringify($columns_being_stored)."\n
"; 577 my $values_being_stored = $self->slicer( $object, $columns_being_stored ); 578 # warn "STORED_VALUES:
".stringify($values_being_stored)."\n
"; 580 my $return_code = $self->class_specific_execute($object, $this_sth, $values_being_stored ) 581 # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true: 582 or throw("Could not store fields\n\t{$column_key}\nwith data:\n\t(
".join(',', @$values_being_stored).')'); 584 if($return_code > 0) { # <--- for the same reason we have to be explicitly numeric here 585 # FIXME: does this work if the "MySQL server has gone away
" ? 586 my $liid = $autoinc_id && $self->dbc->db_handle->last_insert_id(undef, undef, $table_name, $autoinc_id); 587 $self->mark_stored($object, $liid ); 592 foreach my $sth (values %hashed_sth) { 596 return ($object_or_list, $stored_this_time); 600 sub _multi_column_filter { 601 my ($self, $filter_string, $filter_values, $column_set) = @_; 603 # NB: this filtering happens BEFORE any possible overflow via analysis_data, so will not be done on overflow_columns 604 my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ]; 605 if($filter_components) { 606 foreach my $column_name ( @$filter_components ) { 607 unless($column_set->{$column_name}) { 608 throw("unknown column
'$column_name'"); 613 my $filter_sql = $filter_components && join(' AND ', map { defined($filter_values->[$_]) ? "$filter_components->[$_]=
'$filter_values->[$_]'" : $filter_components->[$_].' IS NULL' } 0..scalar(@$filter_components)-1); 619 sub DESTROY { } # to simplify AUTOLOAD 624 if($AUTOLOAD =~ /::fetch(_all)?(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?(?:_TO_(\w+?))?$/) { 626 my $filter_string = $2; 628 my $value_column = $4; 631 my $column_set = $self->column_set(); 633 my $key_components = $key_string && [ split(/_AND_/i, $key_string) ]; 634 if($key_components) { 635 foreach my $column_name ( @$key_components ) { 636 unless($column_set->{$column_name}) { 637 throw("unknown column
'$column_name'"); 642 if($value_column && !$column_set->{$value_column}) { 643 throw("unknown column
'$value_column'"); 646 # warn "Setting up
'$AUTOLOAD' method\n
"; 649 return $self->fetch_all( 650 $self->_multi_column_filter($filter_string, \@_, $column_set), 656 goto &$AUTOLOAD; # restart the new method 658 } elsif($AUTOLOAD =~ /::count_all(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?$/) { 659 my $filter_string = $1; 663 my $column_set = $self->column_set(); 665 my $key_components = $key_string && [ split(/_AND_/i, $key_string) ]; 666 if($key_components) { 667 foreach my $column_name ( @$key_components ) { 668 unless($column_set->{$column_name}) { 669 throw("unknown column
'$column_name'"); 674 # warn "Setting up
'$AUTOLOAD' method\n
"; 677 return $self->count_all( 678 $self->_multi_column_filter($filter_string, \@_, $column_set), 682 goto &$AUTOLOAD; # restart the new method 684 } elsif($AUTOLOAD =~ /::remove_all_by_(\w+)$/) { 685 my $filter_string = $1; 688 my $column_set = $self->column_set(); 690 # warn "Setting up
'$AUTOLOAD' method\n
"; 693 return $self->remove_all( 694 $self->_multi_column_filter($filter_string, \@_, $column_set), 697 goto &$AUTOLOAD; # restart the new method 699 } elsif($AUTOLOAD =~ /::update_(\w+)$/) { 700 my @columns_to_update = split(/_AND_/i, $1); 701 # warn "Setting up
'$AUTOLOAD' method\n
"; 702 *$AUTOLOAD = sub { my ($self, $object) = @_; return $self->update($object, @columns_to_update); }; 703 goto &$AUTOLOAD; # restart the new method 705 warn "sub
'$AUTOLOAD' not implemented
";