ensembl-hive  2.5
MySQLTransfer.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 SYNOPSIS
8 
9  standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::MySQLTransfer --table meta_foo \
10  --src_db_conn mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2913/lg4_compara_homology_merged_64 \
11  --dest_db_conn mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2912/lg4_compara_families_64
12 
13 =head1 DESCRIPTION
14 
15  This RunnableDB module lets you copy/merge rows from a table in one database into table with the same name in another.
16  There are three modes ('overwrite', 'topup' and 'insertignore') that do it very differently.
17  Also, 'where' parameter allows to select subset of rows to be copied/merged over.
18 
19 =head1 LICENSE
20 
21  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
22  Copyright [2016-2022] EMBL-European Bioinformatics Institute
23 
24  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
25  You may obtain a copy of the License at
26 
27  http://www.apache.org/licenses/LICENSE-2.0
28 
29  Unless required by applicable law or agreed to in writing, software distributed under the License
30  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
31  See the License for the specific language governing permissions and limitations under the License.
32 
33 =head1 CONTACT
34 
35  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
36 
37 =cut
38 
39 
40 package Bio::EnsEMBL::Hive::RunnableDB::MySQLTransfer;
41 
42 use strict;
43 use warnings;
44 
45 use Bio::EnsEMBL::Hive::Utils ('go_figure_dbc', 'stringify');
46 
47 use base ('Bio::EnsEMBL::Hive::RunnableDB::SystemCmd');
48 
49 sub param_defaults {
50  my $self = shift;
51  return {
52  %{ $self->SUPER::param_defaults() },
53 
54  'src_db_conn' => '',
55  'dest_db_conn' => '',
56  'mode' => 'overwrite',
57  'where' => undef,
58  'filter_cmd' => undef,
59  'renamed_table' => undef, # optional argument - lets you rename the table
60  'rename_filter' => 'sed "s/\`#table#\`/\`#renamed_table#\`/" | sed "s/\`#table#_ibfk/\`#renamed_table#_ibfk/"', # NB: only change this if your data contains backticked table name AND you know how to fix it
61 
62  # Overrides default values of SystemCmd
63  'use_bash_pipefail' => 1,
64  'lock_tables' => 0,
65  };
66 }
67 
68 
69 =head2 fetch_input
70 
71  Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data.
72  Here it parses parameters, creates up to two database handles and finds the pre-execution row counts filtered by '$where'.
73 
74  param('src_db_conn'): connection parameters to the source database (if different from hive_db)
75 
76  param('dest_db_conn'): connection parameters to the destination database (if different from hive_db - at least one of the two will have to be different)
77 
78  param('mode'): 'overwrite' (default), 'topup' or 'insertignore'
79 
80  param('where'): filter for rows to be copied/merged.
81 
82  param('table'): table name to be copied/merged.
83 
84  param('lock_tables'): [boolean] when 1, lock tables when dumping the source database. Default if not set (or set to 0) is to not lock (runs mysqldump with --skip-lock-tables)
85 
86 =cut
87 
88 sub fetch_input {
89  my $self = shift;
90 
91  my $src_db_conn = $self->param('src_db_conn');
92  my $dest_db_conn = $self->param('dest_db_conn');
93 
94  $self->input_job->transient_error(0);
95  if($src_db_conn eq $dest_db_conn) {
96  die "Please either specify 'src_db_conn' or 'dest_db_conn' or make them different\n";
97  }
98  my $table = $self->param_required('table');
99  $self->input_job->transient_error(1);
100 
101  my $src_dbc = $src_db_conn ? go_figure_dbc( $src_db_conn ) : $self->data_dbc;
102  my $dest_dbc = $dest_db_conn ? go_figure_dbc( $dest_db_conn ) : $self->data_dbc;
103 
104  $self->param('src_dbc', $src_dbc);
105  $self->param('dest_dbc', $dest_dbc);
106 
107  my $where = $self->param('where');
108  my $mode = $self->param_required('mode');
109 
110  $self->param('src_before', $self->get_row_count($src_dbc, $table, $where) );
111 
112  if($mode ne 'overwrite') {
113  $self->_assert_same_table_schema($src_dbc, $dest_dbc, $table);
114  }
115 
116  my $mode_options = { 'overwrite' => [], 'topup' => [qw(--no-create-info --insert-ignore)], 'insertignore' => [qw(--no-create-info --insert-ignore)] }->{$mode};
117  die "Mode '$mode' not recognized. Should be 'overwrite', 'topup' or 'insertignore'\n" unless $mode_options;
118 
119  my $lock_tables = $self->param('lock_tables');
120  push(@{$mode_options}, '--skip-lock-tables') unless ($lock_tables);
121 
122  my $filter_cmd = $self->param('filter_cmd');
123 
124  my $renamed_table = $self->param('renamed_table');
125  my $rename_filter = $renamed_table && $self->param('rename_filter').' | ';
126 
127  # Must be joined because of the pipe
128  my $cmd = join(' ',
129  @{$src_dbc->to_cmd('mysqldump', $mode_options, undef, undef, 1)},
130  $table,
131  (defined($where) ? "--where '$where' " : ''),
132  '|',
133  ($rename_filter || ''),
134  ($filter_cmd ? "$filter_cmd | " : ''),
135  @{$dest_dbc->to_cmd(undef, undef, undef, undef, 1)}
136  );
137  $self->param('cmd', $cmd);
138 }
139 
140 =head2 write_output
141 
142  Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
143  Here we compare the number of rows and detect problems.
144 
145 =cut
146 
147 sub write_output {
148  my $self = shift;
149 
150  # Error processing
151  $self->SUPER::write_output();
152 
153  my $dest_dbc = $self->param('dest_dbc');
154 
155  my $mode = $self->param('mode');
156  my $table = $self->param('table');
157  my $ren_table = $self->param('renamed_table') || $table;
158  my $where = $self->param('where');
159 
160  my $src_before = $self->param('src_before');
161  my $dest_after = $self->get_row_count($dest_dbc, $ren_table, $where);
162 
163  if($mode eq 'overwrite') {
164 
165  if($src_before == $dest_after) {
166  $self->warning("Successfully copied $src_before '$table' rows");
167  } else {
168  die "Could not copy '$table' rows: $src_before rows from source copied into $dest_after rows in target\n";
169  }
170  } elsif ($mode eq 'topup') {
171 
172  if($dest_after >= $src_before) {
173  $self->warning("Cannot check success in this mode, but the number of '$ren_table' rows in target is indeed higher than $src_before ($dest_after)");
174  } else {
175  die "Could not copy '$table' rows: $src_before rows from source copied into $dest_after rows in target\n";
176  }
177 
178  } else {
179 
180  $self->warning("Cannot check success/failure in this mode, but the number of '$ren_table' rows in target increased by ".($dest_after-$src_before));
181  }
182 }
183 
184 ########################### private subroutines ####################################
185 
186 sub get_row_count {
187  my ($self, $dbc, $table, $where) = @_;
188 
189  my $sql = "SELECT count(*) FROM $table" . (defined($where) ? " WHERE $where" : '');
190 
191  my $sth = $dbc->prepare($sql);
192  $sth->execute();
193  my ($row_count) = $sth->fetchrow_array();
194  $sth->finish;
195 
196  $dbc->disconnect_if_idle();
197 
198  return $row_count;
199 }
200 
201 sub _assert_same_table_schema {
202  my ($self, $src_dbc, $dest_dbc, $table) = @_;
203 
204  my $src_sth = $src_dbc->column_info(undef, undef, $table, '%');
205  my $src_schema = $src_sth->fetchall_arrayref;
206  $src_sth->finish();
207 
208  my $dest_sth = $dest_dbc->column_info(undef, undef, $table, '%');
209  my $dest_schema = $dest_sth->fetchall_arrayref;
210  $dest_sth->finish();
211 
212  die "'$table' has a different schema in the two databases. Do SHOW CREATE TABLE in both databases and compare the outputs." if stringify($src_schema) ne stringify($dest_schema);
213 }
214 
215 
216 1;
217