@@ -415,7 +415,6 @@ func (this *Migrator) Migrate() (err error) {
415
415
this .consumeRowCopyComplete ()
416
416
log .Infof ("Row copy complete" )
417
417
this .printStatus (ForcePrintStatusRule )
418
- this .migrationContext .MarkPointOfInterest ()
419
418
420
419
if err := this .cutOver (); err != nil {
421
420
return err
@@ -435,10 +434,12 @@ func (this *Migrator) cutOver() (err error) {
435
434
log .Debugf ("Noop operation; not really swapping tables" )
436
435
return nil
437
436
}
437
+ this .migrationContext .MarkPointOfInterest ()
438
438
this .throttle (func () {
439
439
log .Debugf ("throttling before swapping tables" )
440
440
})
441
441
442
+ this .migrationContext .MarkPointOfInterest ()
442
443
this .sleepWhileTrue (
443
444
func () (bool , error ) {
444
445
if this .migrationContext .PostponeCutOverFlagFile == "" {
@@ -454,6 +455,7 @@ func (this *Migrator) cutOver() (err error) {
454
455
},
455
456
)
456
457
atomic .StoreInt64 (& this .migrationContext .IsPostponingCutOver , 0 )
458
+ this .migrationContext .MarkPointOfInterest ()
457
459
458
460
if this .migrationContext .TestOnReplica {
459
461
// With `--test-on-replica` we stop replication thread, and then proceed to use
@@ -478,15 +480,20 @@ func (this *Migrator) cutOver() (err error) {
478
480
return err
479
481
}
480
482
if this .migrationContext .CutOverType == base .CutOverTwoStep {
481
- err := this .retryOperation (this .cutOverTwoStep )
483
+ err := this .retryOperation (
484
+ func () error {
485
+ return this .executeAndThrottleOnError (this .cutOverTwoStep )
486
+ },
487
+ )
482
488
return err
483
489
}
484
- return nil
490
+ return log . Fatalf ( "Unknown cut-over type: %d; should never get here!" , this . migrationContext . CutOverType )
485
491
}
486
492
487
493
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
488
494
// make sure the queue is drained.
489
495
func (this * Migrator ) waitForEventsUpToLock () (err error ) {
496
+ this .migrationContext .MarkPointOfInterest ()
490
497
waitForEventsUpToLockStartTime := time .Now ()
491
498
492
499
log .Infof ("Writing changelog state: %+v" , AllEventsUpToLockProcessed )
@@ -541,19 +548,28 @@ func (this *Migrator) safeCutOver() (err error) {
541
548
542
549
okToUnlockTable := make (chan bool , 2 )
543
550
originalTableRenamed := make (chan error , 1 )
551
+ var originalTableRenameIntended int64
544
552
defer func () {
553
+ log .Infof ("Checking to see if we need to roll back" )
545
554
// The following is to make sure we unlock the table no-matter-what!
546
555
// There's enough buffer in the channel to support a redundant write here.
547
556
okToUnlockTable <- true
548
- // We need to make sure we wait for the original-rename, successful or not,
549
- // so as to be able to rollback in case the ghost-rename fails.
550
- <- originalTableRenamed
551
-
557
+ if atomic .LoadInt64 (& originalTableRenameIntended ) == 1 {
558
+ log .Infof ("Waiting for original table rename result" )
559
+ // We need to make sure we wait for the original-rename, successful or not,
560
+ // so as to be able to rollback in case the ghost-rename fails.
561
+ // But we only wait on this queue if there's actually going to be a rename.
562
+ // As an example, what happens should the initial `lock tables` fail? We would
563
+ // never proceed to rename the table, hence this queue is never written to.
564
+ <- originalTableRenamed
565
+ }
552
566
// Rollback operation
553
567
if ! this .applier .tableExists (this .migrationContext .OriginalTableName ) {
554
568
log .Infof ("Cannot find %s, rolling back" , this .migrationContext .OriginalTableName )
555
569
err := this .applier .RenameTable (this .migrationContext .GetOldTableName (), this .migrationContext .OriginalTableName )
556
570
log .Errore (err )
571
+ } else {
572
+ log .Info ("No need for rollback" )
557
573
}
558
574
}()
559
575
lockOriginalSessionIdChan := make (chan int64 , 1 )
@@ -577,6 +593,8 @@ func (this *Migrator) safeCutOver() (err error) {
577
593
// We now attempt a RENAME on the original table, and expect it to block
578
594
renameOriginalSessionIdChan := make (chan int64 , 1 )
579
595
this .migrationContext .RenameTablesStartTime = time .Now ()
596
+ atomic .StoreInt64 (& originalTableRenameIntended , 1 )
597
+
580
598
go func () {
581
599
this .applier .RenameOriginalTable (renameOriginalSessionIdChan , originalTableRenamed )
582
600
}()
0 commit comments