@@ -20,6 +20,10 @@ import (
20
20
"github.com/outbrain/golib/sqlutils"
21
21
)
22
22
23
+ const (
24
+ atomicCutOverMagicHint = "ghost-cut-over-sentry"
25
+ )
26
+
23
27
// Applier connects and writes the the applier-server, which is the server where migration
24
28
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
25
29
// `--execute-on-replica` are given.
@@ -77,15 +81,21 @@ func (this *Applier) validateConnection(db *gosql.DB) error {
77
81
return nil
78
82
}
79
83
80
- // tableExists checks if a given table exists in database
81
- func (this * Applier ) tableExists (tableName string ) (tableFound bool ) {
84
+ // showTableStatus returns the output of `show table status like '...'` command
85
+ func (this * Applier ) showTableStatus (tableName string ) (rowMap sqlutils.RowMap ) {
86
+ rowMap = nil
82
87
query := fmt .Sprintf (`show /* gh-ost */ table status from %s like '%s'` , sql .EscapeName (this .migrationContext .DatabaseName ), tableName )
83
-
84
88
sqlutils .QueryRowsMap (this .db , query , func (m sqlutils.RowMap ) error {
85
- tableFound = true
89
+ rowMap = m
86
90
return nil
87
91
})
88
- return tableFound
92
+ return rowMap
93
+ }
94
+
95
+ // tableExists checks if a given table exists in database
96
+ func (this * Applier ) tableExists (tableName string ) (tableFound bool ) {
97
+ m := this .showTableStatus (tableName )
98
+ return (m != nil )
89
99
}
90
100
91
101
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
@@ -775,6 +785,195 @@ func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string)
775
785
return nil
776
786
}
777
787
788
+ // DropAtomicCutOverSentryTableIfExists checks if the "old" table name
789
+ // happens to be a cut-over magic table; if so, it drops it.
790
+ func (this * Applier ) DropAtomicCutOverSentryTableIfExists () error {
791
+ log .Infof ("Looking for magic cut-over table" )
792
+ tableName := this .migrationContext .GetOldTableName ()
793
+ rowMap := this .showTableStatus (tableName )
794
+ if rowMap == nil {
795
+ // Table does not exist
796
+ return nil
797
+ }
798
+ if rowMap ["Comment" ].String != atomicCutOverMagicHint {
799
+ return fmt .Errorf ("Expected magic comment on %s, did not find it" , tableName )
800
+ }
801
+ log .Infof ("Dropping magic cut-over table" )
802
+ return this .dropTable (tableName )
803
+ }
804
+
805
+ // DropAtomicCutOverSentryTableIfExists checks if the "old" table name
806
+ // happens to be a cut-over magic table; if so, it drops it.
807
+ func (this * Applier ) CreateAtomicCutOverSentryTable () error {
808
+ if err := this .DropAtomicCutOverSentryTableIfExists (); err != nil {
809
+ return err
810
+ }
811
+ tableName := this .migrationContext .GetOldTableName ()
812
+
813
+ query := fmt .Sprintf (`create /* gh-ost */ table %s.%s (
814
+ id int auto_increment primary key
815
+ ) comment='%s'
816
+ ` ,
817
+ sql .EscapeName (this .migrationContext .DatabaseName ),
818
+ sql .EscapeName (tableName ),
819
+ atomicCutOverMagicHint ,
820
+ )
821
+ log .Infof ("Creating magic cut-over table %s.%s" ,
822
+ sql .EscapeName (this .migrationContext .DatabaseName ),
823
+ sql .EscapeName (tableName ),
824
+ )
825
+ if _ , err := sqlutils .ExecNoPrepare (this .db , query ); err != nil {
826
+ return err
827
+ }
828
+ log .Infof ("Magic cut-over table created" )
829
+
830
+ return nil
831
+ }
832
+
833
+ // AtomicCutOverMagicLock
834
+ func (this * Applier ) AtomicCutOverMagicLock (sessionIdChan chan int64 , tableLocked chan <- error , okToUnlockTable <- chan bool , tableUnlocked chan <- error ) error {
835
+ tx , err := this .db .Begin ()
836
+ if err != nil {
837
+ tableLocked <- err
838
+ return err
839
+ }
840
+ defer func () {
841
+ sessionIdChan <- - 1
842
+ tableLocked <- fmt .Errorf ("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads" )
843
+ tableUnlocked <- fmt .Errorf ("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads" )
844
+ tx .Rollback ()
845
+ }()
846
+
847
+ var sessionId int64
848
+ if err := tx .QueryRow (`select connection_id()` ).Scan (& sessionId ); err != nil {
849
+ tableLocked <- err
850
+ return err
851
+ }
852
+ sessionIdChan <- sessionId
853
+
854
+ lockResult := 0
855
+ query := `select get_lock(?, 0)`
856
+ lockName := this .GetSessionLockName (sessionId )
857
+ log .Infof ("Grabbing voluntary lock: %s" , lockName )
858
+ if err := tx .QueryRow (query , lockName ).Scan (& lockResult ); err != nil || lockResult != 1 {
859
+ err := fmt .Errorf ("Unable to acquire lock %s" , lockName )
860
+ tableLocked <- err
861
+ return err
862
+ }
863
+
864
+ tableLockTimeoutSeconds := this .migrationContext .SwapTablesTimeoutSeconds * 2
865
+ log .Infof ("Setting LOCK timeout as %d seconds" , tableLockTimeoutSeconds )
866
+ query = fmt .Sprintf (`set session lock_wait_timeout:=%d` , tableLockTimeoutSeconds )
867
+ if _ , err := tx .Exec (query ); err != nil {
868
+ tableLocked <- err
869
+ return err
870
+ }
871
+
872
+ if err := this .CreateAtomicCutOverSentryTable (); err != nil {
873
+ tableLocked <- err
874
+ return err
875
+ }
876
+
877
+ query = fmt .Sprintf (`lock /* gh-ost */ tables %s.%s write, %s.%s write` ,
878
+ sql .EscapeName (this .migrationContext .DatabaseName ),
879
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
880
+ sql .EscapeName (this .migrationContext .DatabaseName ),
881
+ sql .EscapeName (this .migrationContext .GetOldTableName ()),
882
+ )
883
+ log .Infof ("Locking %s.%s, %s.%s" ,
884
+ sql .EscapeName (this .migrationContext .DatabaseName ),
885
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
886
+ sql .EscapeName (this .migrationContext .DatabaseName ),
887
+ sql .EscapeName (this .migrationContext .GetOldTableName ()),
888
+ )
889
+ this .migrationContext .LockTablesStartTime = time .Now ()
890
+ if _ , err := tx .Exec (query ); err != nil {
891
+ tableLocked <- err
892
+ return err
893
+ }
894
+ log .Infof ("Tables locked" )
895
+ tableLocked <- nil // No error.
896
+
897
+ // From this point on, we are committed to UNLOCK TABLES. No matter what happens,
898
+ // the UNLOCK must execute (or, alternatively, this connection dies, which gets the same impact)
899
+
900
+ // The cut-over phase will proceed to apply remaining backlog onto ghost table,
901
+ // and issue RENAME. We wait here until told to proceed.
902
+ <- okToUnlockTable
903
+ log .Infof ("Will now proceed to drop magic table and unlock tables" )
904
+
905
+ // The magic table is here because we locked it. And we are the only ones allowed to drop it.
906
+ // And in fact, we will:
907
+ log .Infof ("Dropping magic cut-over table" )
908
+ query = fmt .Sprintf (`drop /* gh-ost */ table if exists %s.%s` ,
909
+ sql .EscapeName (this .migrationContext .DatabaseName ),
910
+ sql .EscapeName (this .migrationContext .GetOldTableName ()),
911
+ )
912
+ if _ , err := tx .Exec (query ); err != nil {
913
+ log .Errore (err )
914
+ // We DO NOT return here because we must `UNLOCK TABLES`!
915
+ }
916
+
917
+ // Tables still locked
918
+ log .Infof ("Releasing lock from %s.%s, %s.%s" ,
919
+ sql .EscapeName (this .migrationContext .DatabaseName ),
920
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
921
+ sql .EscapeName (this .migrationContext .DatabaseName ),
922
+ sql .EscapeName (this .migrationContext .GetOldTableName ()),
923
+ )
924
+ query = `unlock tables`
925
+ if _ , err := tx .Exec (query ); err != nil {
926
+ tableUnlocked <- err
927
+ return log .Errore (err )
928
+ }
929
+ log .Infof ("Tables unlocked" )
930
+ tableUnlocked <- nil
931
+ return nil
932
+ }
933
+
934
+ // RenameOriginalTable will attempt renaming the original table into _old
935
+ func (this * Applier ) AtomicCutoverRename (sessionIdChan chan int64 , tablesRenamed chan <- error ) error {
936
+ tx , err := this .db .Begin ()
937
+ if err != nil {
938
+ return err
939
+ }
940
+ defer func () {
941
+ tx .Rollback ()
942
+ sessionIdChan <- - 1
943
+ tablesRenamed <- fmt .Errorf ("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads" )
944
+ }()
945
+ var sessionId int64
946
+ if err := tx .QueryRow (`select connection_id()` ).Scan (& sessionId ); err != nil {
947
+ return err
948
+ }
949
+ sessionIdChan <- sessionId
950
+
951
+ log .Infof ("Setting RENAME timeout as %d seconds" , this .migrationContext .SwapTablesTimeoutSeconds )
952
+ query := fmt .Sprintf (`set session lock_wait_timeout:=%d` , this .migrationContext .SwapTablesTimeoutSeconds )
953
+ if _ , err := tx .Exec (query ); err != nil {
954
+ return err
955
+ }
956
+
957
+ query = fmt .Sprintf (`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s` ,
958
+ sql .EscapeName (this .migrationContext .DatabaseName ),
959
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
960
+ sql .EscapeName (this .migrationContext .DatabaseName ),
961
+ sql .EscapeName (this .migrationContext .GetOldTableName ()),
962
+ sql .EscapeName (this .migrationContext .DatabaseName ),
963
+ sql .EscapeName (this .migrationContext .GetGhostTableName ()),
964
+ sql .EscapeName (this .migrationContext .DatabaseName ),
965
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
966
+ )
967
+ log .Infof ("Issuing and expecting this to block: %s" , query )
968
+ if _ , err := tx .Exec (query ); err != nil {
969
+ tablesRenamed <- err
970
+ return log .Errore (err )
971
+ }
972
+ tablesRenamed <- nil
973
+ log .Infof ("Tables renamed" )
974
+ return nil
975
+ }
976
+
778
977
func (this * Applier ) ShowStatusVariable (variableName string ) (result int64 , err error ) {
779
978
query := fmt .Sprintf (`show global status like '%s'` , variableName )
780
979
if err := this .db .QueryRow (query ).Scan (& variableName , & result ); err != nil {
0 commit comments