ensembl-hive  2.8.1
SpeciesToVolumes.pm
Go to the documentation of this file.
1 =head1 LICENSE
2 
3 See the NOTICE file distributed with this work for additional information
4 regarding copyright ownership.
5 
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 
10  http://www.apache.org/licenses/LICENSE-2.0
11 
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 
18 =cut
19 
20 package EnsCloud::Cmd::Command::SpeciesToVolumes;
21 use Moose;
22 use autodie qw(:all);
23 use Data::Dump qw(dump);
24 use Data::Dumper;
25 use File::Spec;
26 use Moose::Util::TypeConstraints;
27 use EnsCloud::Image;
31 use namespace::autoclean;
32 use IPC::Cmd qw[can_run run];
33 
34 # use List::Utils qw(first);
35 use Log::Log4perl qw(:easy);
36 with 'MooseX::Log::Log4perl';
37 extends qw(MooseX::App::Cmd::Command);
38 
39 BEGIN {
40  Log::Log4perl->easy_init();
41 }
42 
43 # ABSTRACT: list the application's commands
44 sub abstract {
45 
46  return "build ensembl MySQL instances by species";
47 
48  }
49 
50 
51 # Instructions
52 #
53 # * This expects a volume containing the Ensembl MYDs attached to the instance (Public Snapshot of 65 dbs = snap-56c9ab32)
54 # - default path is /vols/ensembl_mysql_data
55 # Run like this: ( N.B --base_snapshot is the base AMI with the OS)
56 #
57 # ecloud speciestovolumes --base_snapshot snap-e36fde86 --species saccharomyces_cerevisiae (omit --species to do all)
58 # or by dbtype
59 # ecloud speciestovolumes --base_snapshot snap-e36fde86 --species homo_sapiens --dbtype variation
60 #
61 # * Let it run until completion. then wait until all PENDING snapshots are complete.
62 # Once all PENDING snapshots are complete you will have one snapshot per species
63 # (Do not start process again while there are are still PENDING snapshots)
64 # Now run it again with the same parameters - this will loop the completed snapshots to combine each with the base_image OS
65 
66 # has 'type' => (
67 # is => 'rw',
68 # isa => 'Str',
69 # traits => ['Getopt'],
70 #
71 # # cmd_aliases => "h",
72 # documentation => "instances volumes or images",
73 # required => 1,
74 #
75 # # default => sub { die "bucket name required" },
76 # );
77 
78 has 'region_alias' => (
79  is => 'rw',
80  isa => 'Str',
81  traits => ['Getopt'],
82 
83  # cmd_aliases => "h",
84  documentation => "asia, useast, uswest or eu",
85  required => 1,
86  default => 'useast',
87 );
88 
89 has 'volume_path' => (
90 
91  is => 'ro',
92  isa => 'Str',
93  traits => ['Getopt'],
94 
95  # cmd_aliases => "h",
96  documentation => "path to the species DBs",
97  required => 1,
98  default => '/vols/ensembl_mysql_data',
99 
100 );
101 
102 enum 'CompositeGroup' => qw(core coreplusvariation corepluscompara variation compara all);
103 has 'dbtypes' => (
104 
105  is => 'ro',
106  isa => 'CompositeGroup',
107  traits => ['Getopt'],
108 
109  # cmd_aliases => "h",
110  documentation => "database types to copy: default=core",
111  required => 1,
112  default => 'all',
113 
114 );
115 
116 has db_type_lookup => (
117  is => 'ro',
118  isa => 'HashRef',
119  required => 1,
120  default => sub {
121  {
122  core => [qw(core coreexpressionatlas coreexpressionest funcgen otherfeatures rnaseq vega )],
123  coreplusvariation =>
124  [qw(core coreexpressionatlas coreexpressionest funcgen otherfeatures rnaseq vega variation)],
125  corepluscompara =>
126  [qw(core coreexpressionatlas coreexpressionest funcgen otherfeatures rnaseq vega compara )],
127  all =>
128  [qw(core coreexpressionatlas coreexpressionest funcgen otherfeatures rnaseq vega compara variation )],
129  variation => [qw(variation)],
130  compara => [qw(compara)],
131  };
132  },
133 
134 );
135 
136 has 'species' => (
137  is => 'ro',
138  isa => 'Str',
139  traits => ['Getopt'],
140 
141  # cmd_aliases => "h",
142  documentation => "the species to copy. default=all",
143  required => 1,
144  default => 'all',
145 
146 );
147 
148 has 'start_from' => (
149  is => 'ro',
150  isa => 'Str',
151  traits => ['Getopt'],
152 
153  # cmd_aliases => "h",
154  documentation => "the species to start dumping from",
155  default => '.*'
156 );
157 
158 has 'end_at' => (
159  is => 'ro',
160  isa => 'Str',
161  traits => ['Getopt'],
162 
163  # cmd_aliases => "h",
164  documentation => "the species to stop dumping at",
165  default => '^$'
166 );
167 has 'volume_dump_queue' => (
168  traits => ['Array'],
169  handles => {
170  all_queued_volumes => 'elements',
171  volume_add_to_queue => 'push',
172  next_volume_from_queue => 'shift',
173  sort_queue_by_size_desc => [ sort_in_place => ( sub { $_[1]->total_size <=> $_[0]->total_size } ) ],
174  find_tag => 'first',
175  queue_length => 'count',
176  filter_queue => 'grep'
177  },
178  isa => 'ArrayRef[EnsCloud::Image::VolumeBundle::Volume]',
179 );
180 
181 has db_species_details => (
182  is => 'rw',
183 
184  #isa => 'HashRef',
185  isa => 'HashRef[EnsCloud:::DatabaseDetails]',
186  default => sub { {} }
187 );
188 
189 has this_instance_id => (
190  is => 'ro',
191  isa => 'Str',
192  default => sub { return `curl -s 169.254.169.254/latest/meta-data/instance-id` },
193  required => 1
194 
195 );
196 
197 has myd_destination_folder => (
198  is => 'ro',
199  isa => 'Str',
200  default => '/vols/MYDCOPY',
201  required => 1
202 
203 );
204 has device => (
205  is => 'ro',
206  isa => 'Str',
207  default => '/dev/sdi',
208  required => 1
209 
210 );
211 has base_snapshot => (
212  is => 'ro',
213  isa => 'Str',
214  required => 1
215 
216 );
217 has this_zone => (
218  is => 'ro',
219  isa => 'Str',
220  default => sub { return `curl -s 169.254.169.254/latest/meta-data/placement/availability-zone/` },
221  required => 1
222 
223 );
224 with 'EnsCloud::Describer';
225 
226 sub execute {
227  my ($self) = @_;
228  $self->build_queue();
229 
230  $self->log->fatal("No databases in the queue");
231  my @image_list = ();
232 
233  $self->sort_queue_by_size_desc;
234 
235  foreach my $volume ( $self->all_queued_volumes ) {
236  $self->log->info( "Doing: " . $volume->species );
237  $self->update_queue;
238  if ( $volume->snapshot_id && $volume->status eq 'completed' ) {
239 
240  # $self->log->info($volume->snapshot_id): $self->log->warn("No snapshot for: " . $volume->tag);
241  my $image = $self->make_image($volume);
242  push @image_list, $image;
243  }
244  else {
245  $self->make_snapshot($volume);
246  }
247 
248  }
249  $self->update_queue;
250  my $image_count = @image_list > 0 ? scalar @image_list : 0;
251  $self->log->info( "Finished, made ", $self->queue_length, " Volumes" );
252  $self->log->info("Finished, made $image_count Images");
253  my @no_snapshots = $self->filter_queue( sub { $_->status =~ /no snapshot/ } );
254  $self->log->info( scalar @no_snapshots . " Snapshots still waiting" );
255 }
256 
257 sub update_queue {
258  my ($self) = @_;
259 
260  my $ec2_snapshots = $self->ec2->describe_snapshots( Owner => 'self' );
261  foreach my $ec2_snap (@$ec2_snapshots) {
262  my $description = $ec2_snap->{description} || next;
263  $description =~ s/\s+$//;
264  if ( my $has_snapshot = $self->find_tag( sub { $_->tag eq $description } ) ) {
265  $has_snapshot->snapshot_id( $ec2_snap->{snapshot_id} );
266  $has_snapshot->status( $ec2_snap->{status} );
267  }
268 
269  }
270 
271 }
272 
273 sub make_image {
274  my ( $self, $snapshot ) = @_;
275 
276  $self->log->debug("MAKE IMAGE");
277 
278  my $tag =
279  "Ensembl"
280  . $snapshot->ensembl_release . " "
281  . $snapshot->species . " ["
282  . ( join " ", map { $_->type } $snapshot->all_dbs ) . "]";
283  my $name = "Ensembl" . $snapshot->ensembl_release . " " . $snapshot->species . " MySQL AMI";
284 
285  # has it already been created
286  my $existing_images = $self->ec2->describe_images( Owner => 'self' );
287  foreach my $i (@$existing_images) {
288  if ( $i->{description} eq $tag ) {
289  $self->log->warn( "Image " . $i->{image_id} . " already exists for " . $tag );
290  return;
291  }
292  }
293  my $create_image = $self->ec2->register_image(
294  Name => $name,
295  Description => $tag,
296  Architecture => 'x86_64',
297  KernelId => 'aki-427d952b',
298  RootDeviceName => '/dev/sda1',
299  BlockDeviceMapping => [
300  {
301  deviceName => '/dev/sda1',
302  ebs => {
303  snapshotId => $self->base_snapshot,
304  deleteOnTermination => 'true'
305  }
306  },
307  {
308  deviceName => '/dev/sdh',
309  ebs => { snapshotId => $snapshot->{snapshot_id}, deleteOnTermination => 'true' }
310  },
311  ]
312  );
313  if ( $create_image->can('errors') ) {
314  $self->log->error( "[Error Creating Image] " . $self->pp_ec2_errors( $create_image->errors ) );
315  return;
316  }
317  $self->log->info("Created Image $create_image for $tag");
318  return $create_image;
319 }
320 
321 sub make_snapshot {
322  my ( $self, $bag_of_dbs ) = @_;
323  my $size_as_float = $bag_of_dbs->total_size;
324  $self->log->debug( "Making SNAPSHOT " . $bag_of_dbs->tag . " Current status=" . $bag_of_dbs->status );
325 
326  # round up volumesize to nearest Gb +1
327  # rounding to nearest Gig doesn't seem to enough
328  my $round_up_size = int( $size_as_float + 2 );
329 
330  #$self->clear_ec2;
331  my $volume = $self->ec2->create_volume( Size => $round_up_size, AvailabilityZone => $self->this_zone );
332 
333  if ( $volume->can('errors') ) {
334  $self->log->error( "[Error Creating Volume] " . $self->pp_ec2_errors( $volume->errors ) );
335  return;
336  }
337  else {
338  $self->log->info( "Created ", $volume->volume_id, " ", $volume->size, " Gb" );
339  }
340  my $do_attach = $self->ec2->attach_volume(
341  VolumeId => $volume->volume_id,
342  InstanceId => $self->this_instance_id,
343  Device => $self->device
344  );
345  $self->log->info( "Attaching " . $volume->volume_id . " Device " . $self->device );
346  if ( $do_attach->can('errors') ) {
347  $self->log->error( "[Error attaching Volume] " . $self->pp_ec2_errors( $do_attach->errors ) );
348  $self->log->info( "Deleting " . $volume->volume_id );
349  $self->ec2->delete_volume( VolumeId => $volume->volume_id );
350  return;
351  }
352  else {
353  my $wait_time = 0;
354  my $attach_status = '';
355  while ( $attach_status ne 'attached' && $wait_time < 30 ) {
356 
357  $self->log->info("Waiting 10 seconds for volume to become available");
358  sleep 10;
359  $wait_time += 10;
360  $attach_status =
361  $self->ec2->describe_volumes( VolumeId => $volume->volume_id )->[0]->attachments->[0]->{status};
362  $self->log->info("Volume $attach_status");
363  }
364  unless ( $attach_status eq 'attached' ) {
365  $self->log->error("ERROR ATTACHING VOLUME AFTER 30 seconds... DETACHING AND DELETING");
366  $self->clean_up_by_volume($volume);
367 
368  # todo, check for errors in the delete call
369  $self->log->fatal("COULD NOT ATTACH VOLUME. THIS IS BAD. EXITING") && die;
370  }
371 
372  }
373 
374  # Make filesystem on the new volume
375  $self->log->info( "Making filesystem on " . $self->device );
376  my $mkfs_path = can_run('mkfs.xfs') or $self->log->warn('mkfs.xfs not installed!');
377  my $mkfs_cmd = [ 'sudo', $mkfs_path, $self->device ];
378  unless ( $self->run_command($mkfs_cmd) ) {
379  $self->log->info("Cleaning up after failed mkfs");
380  $self->clean_up_by_volume($volume);
381  return;
382 
383  }
384 
385  # Make the directory
386  my $mkdir_cmd = [ 'sudo', 'mkdir', '-p', $self->myd_destination_folder ];
387  unless ( $self->run_command($mkdir_cmd) ) {
388  $self->log->info("Cleaning up after failed mkdir");
389  $self->clean_up_by_volume($volume);
390  return;
391 
392  }
393 
394  # Mount new volume to it
395  $self->log->info( "Mounting " . $self->device );
396  my $mount_path = can_run('mount') or $self->log->warn('mount not installed!');
397  my $mount_cmd = [ 'sudo', $mount_path, $self->device, $self->myd_destination_folder ];
398  unless ( $self->run_command($mount_cmd) ) {
399  $self->log->info("Cleaning up after failed mount");
400  $self->clean_up_by_volume($volume);
401  return;
402  }
403 
404  # Copy the each MYD dir
405  foreach my $myd_dir ( $bag_of_dbs->all_dbs ) {
406  my $copy_cmd = [ 'sudo', 'cp', '-r', $myd_dir->myd_path, $self->myd_destination_folder ];
407  $self->log->debug( join " ", @$copy_cmd );
408  unless ( $self->run_command($copy_cmd) ) {
409 
410  $self->log->info("Cleaning up after failed copy");
411  my $umount_path = can_run('umount') or $self->log->warn('umount not installed!');
412  my $umount_cmd = [ 'sudo', $umount_path, $self->device ];
413  unless ( $self->run_command($umount_cmd) ) {
414  $self->log->fatal( "CANNOT UMOUNT " . $self->device . "EXITING" ) && die;
415  }
416  $self->clean_up_by_volume($volume);
417  return;
418 
419  }
420  $myd_dir->is_copied(1);
421 
422  # $snapshot_description .= $myd_dir->name . " ";
423  }
424 
425  $self->log->info( "umounting " . $self->device );
426  my $umount_path = can_run('umount') or $self->log->warn('umount not installed!');
427  my $umount_cmd = [ 'sudo', $umount_path, $self->device ];
428  return unless $self->run_command($umount_cmd);
429 
430  my $wait_time = 0;
431  $self->ec2->detach_volume( VolumeId => $volume->volume_id );
432  while ( defined eval { $self->ec2->describe_volumes( VolumeId => $volume->volume_id )->[0]->attachments }
433  && $wait_time < 60 )
434  {
435  $self->log->info("Waiting 10 seconds for volume to detach");
436  sleep 10;
437  $wait_time += 10;
438  }
439 
440  my $snapshot_description = $bag_of_dbs->{tag};
441  $self->log->info("Creating Snapshot");
442  my $snapshot = $self->ec2->create_snapshot( VolumeId => $volume->volume_id, Description => $snapshot_description );
443 
444  if ( $snapshot->can('errors') ) {
445  $self->log->error( "[Snapshot Creation Error] " . $self->pp_ec2_errors( $snapshot->errors ) );
446  return;
447  }
448  else {
449  $self->log->info( "Created Snapshot ", $snapshot->snapshot_id, " ", $volume->size, " Gb" );
450  $self->log->info( "Deleting " . $volume->volume_id );
451  $self->ec2->delete_volume( VolumeId => $volume->volume_id );
452  $self->log->info("Tagging the Snapshot");
453  my $tag_path = can_run('ec2-create-tags') or $self->log->warn('ec2-describe-tags not found');
454  my $tag_cmd = [ $tag_path, $snapshot->snapshot_id, '-t', "Name=$snapshot_description" ];
455  return unless $self->run_command($tag_cmd);
456 
457  # $self->log->info("Detaching Temp Volume");
458  # $self->ec2->detach_volume(VolumeId => $volume->volume_id);
459  # $self->log->info("Deleting Temp Volume");
460  # $self->ec2->delete_volume(VolumeId => $volume->volume_id);
461 
462  }
463 
464 }
465 
466 sub clean_up_by_volume {
467  my ( $self, $volume ) = @_;
468 
469  #detach volume
470  $self->log->info( "CLEANUP: Detaching " . $volume->volume_id );
471  my $wait_time = 0;
472  my $do_detach = $self->ec2->detach_volume( VolumeId => $volume->volume_id );
473 
474  if ( $do_detach->can('errors') ) {
475  $self->log->error( "[Error detaching Volume] " . $self->pp_ec2_errors( $do_detach->errors ) );
476 
477  $self->log->fatal( "CANNOT DETACH VOLUME DURING CLEANUP. THIS IS BAD. EXITING :" . $volume->volume_id )
478  && die;
479  }
480 
481  while (
482  defined eval { $self->ec2->describe_volumes( VolumeId => $volume->volume_id )->[0]->attachments }
483 
484  && $wait_time < 60
485  )
486  {
487 
488  # $self->log->info("Volume $attach_status");
489 
490  $self->log->info("Waiting 10 seconds for volume to detach");
491  sleep 10;
492  $wait_time += 10;
493  }
494 
495  # delete it
496  $self->log->info( "CLEANUP: Deleting " . $volume->volume_id );
497  $self->ec2->delete_volume( VolumeId => $volume->volume_id );
498  return;
499 
500 }
501 
502 sub pp_ec2_errors {
503  my ( $self, $error_obj ) = @_;
504  my $full_message;
505  foreach my $error (@$error_obj) {
506  $full_message .= $error->message . "\n";
507 
508  }
509  return $full_message;
510 }
511 
512 sub run_command {
513  my ( $self, $cmd ) = @_;
514 
515  ### in list context ###
516  my ( $success, $error_message, $full_buf, $stdout_buf, $stderr_buf ) = run( command => $cmd, verbose => 0 );
517 
518  if ($success) {
519 
520  $self->log->info( join " ", @$cmd, " Successful" );
521 
522  # return $stdout_buf;
523  # print " is what the command printed:\n";
524  my $stdout_str = join "", @$stdout_buf;
525  return length $stdout_str > 0 ? $stdout_str : $success;
526  }
527  else {
528 
529  $self->log->error( "Failed!!: $error_message\n" . ( join "", @$stderr_buf ) );
530  return 0;
531  }
532 
533 }
534 
535 sub build_queue {
536  my ($self) = @_;
537 
538  my $db_list;
539  my @du = ( "du", $self->volume_path, );
540 
541  # Get a list of files
542  $self->log->info("Getting Directory Sizes");
543  my $du_path = can_run('du') or $self->log->warn('du not installed!');
544  my $du_command = [ 'du', $self->volume_path, '|', 'sort', '-n', '-k', '1' ];
545  my $du_output = $self->run_command($du_command);
546 
547  # my @dir_size = qx{@du};
548  my $compara_db;
549  my $species_filter = $self->species eq 'all' ? '.*' : $self->species;
550  my $start_from_filter = $self->start_from ? $self->start_from : '.*';
551  my $dbgroup_to_dbnames = {
552  core => [qw(core coreexpressionatlas coreexpressionest funcgen otherfeatures rnaseq vega )],
553  coreplusvariation =>
554  [qw(core coreexpressionatlas coreexpressionest funcgen otherfeatures rnaseq vega variation)],
555  corepluscompara => [qw(core coreexpressionatlas coreexpressionest funcgen otherfeatures rnaseq vega compara )],
556  all => [qw(core coreexpressionatlas coreexpressionest funcgen otherfeatures rnaseq vega compara variation )],
557  variation => [qw(variation)],
558  compara => [qw(compara)],
559 
560  };
561  my $wanted_dbs = $dbgroup_to_dbnames->{ $self->dbtypes };
562 
563  my @lines;
564  my ( $start, $end ) = ( $self->start_from, $self->end_at );
565  map { push @lines, $_ if ( /$start/ .. /$end/ ) } split "\n", $du_output;
566  $self->log->fatal( "Nothing found at " . $self->volume_path ) && die if @lines == 0;
567 
568  my $db_hash;
569 
570  foreach my $du_line (@lines) {
571  my ( $db_size, $db_path ) = split /\s+/, $du_line;
572  my ( $volume, $directories, $db_name ) = File::Spec->splitpath($db_path);
573  my ( $db_species, $db_release, $db_type );
574  if ( ( $db_species, $db_type, $db_release ) = $db_name =~ /^([a-z]+_[a-z]+)_([a-z]+)_(\d+)_\w+$/ ) {
575  next unless $db_species =~ /$species_filter/;
576  next unless grep {$_ eq $db_type} @$wanted_dbs;
577  push @{ $db_hash->{$db_species} }, [ $db_size, $db_path, $db_type, $db_release, $db_name ];
578  }
579  elsif ( $db_name =~ /ensembl_compara_(\d+)/ && 'compara' ~~ @$wanted_dbs ) {
580  push @{ $db_hash->{$db_name} }, [ $db_size, $db_path, 'compara', $1, $db_name ];
581  }
582  }
583  foreach my $species ( keys %$db_hash ) {
584 
585  my $dbs = $db_hash->{$species};
586  my $volume = EnsCloud::Image::VolumeBundle::Volume->new( species => $species );
587  foreach my $db (@$dbs) {
589  myd_path => $db->[1],
590  name => $db->[4],
591  type => $db->[2],
592  size => $db->[0] / 1024 / 1024
593 
594  );
595  $volume->add_db($db_detail);
596  $volume->add_size( $db_detail->size );
597  my $tag = "e$db->[3] MYD $species [" . ( join " ", map { $_->type } $volume->all_dbs ) . "]";
598 
599  # my $tag = join " ", map { $_->name } $volume->all_dbs;
600  $volume->tag($tag);
601  $volume->ensembl_release( $db->[3] );
602  }
603  $volume->sort_in_place_curried;
604  $self->volume_add_to_queue($volume);
605  }
606  return;
607 }
608 
609 __PACKAGE__->meta->make_immutable;
610 
611 1;
612 __END__
613 
614 
615 
map
public map()
EnsCloud::Image::VolumeBundle::Volume::DatabaseDetails
Definition: DatabaseDetails.pm:7
EnsCloud::Image::VolumeBundle
Definition: DatabaseDetails.pm:5
EnsCloud::Image
Definition: DatabaseDetails.pm:4
BEGIN
public BEGIN()
EnsCloud::Image::VolumeBundle::Volume
Definition: DatabaseDetails.pm:6
run
public run()