diff --git a/pkg/cmd/roachtest/cluster_test.go b/pkg/cmd/roachtest/cluster_test.go index 3d6822c3f88a..222692533d29 100644 --- a/pkg/cmd/roachtest/cluster_test.go +++ b/pkg/cmd/roachtest/cluster_test.go @@ -8,6 +8,7 @@ package main import ( "context" "fmt" + "math/rand" "strconv" "strings" "testing" @@ -66,6 +67,35 @@ func TestClusterNodes(t *testing.T) { } } +func TestSeededRandGroups(t *testing.T) { + rng := rand.New(rand.NewSource(1)) + testCases := []struct { + numNodes int + numGroups int + expected []string + }{ + {numNodes: 1, numGroups: 1, expected: []string{":1"}}, + {numNodes: 10, numGroups: 1, expected: []string{":1-10"}}, + {numNodes: 10, numGroups: 2, expected: []string{":1,3,8,10", ":2,4-7,9"}}, + {numNodes: 3, numGroups: 3, expected: []string{":3", ":2", ":1"}}, + {numNodes: 5, numGroups: 3, expected: []string{":2", ":1,3-4", ":5"}}, + } + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + c := &clusterImpl{spec: spec.MakeClusterSpec(tc.numNodes)} + nodes := c.All() + groups, err := nodes.SeededRandGroups(rng, tc.numGroups) + require.NoError(t, err) + for i, group := range groups { + nodeList := c.MakeNodes(group) + if tc.expected[i] != nodeList { + t.Errorf("expected %s, but found %s", tc.expected[i], nodeList) + } + } + }) + } +} + type testWrapper struct { *testing.T l *logger.Logger diff --git a/pkg/cmd/roachtest/option/node_list_option.go b/pkg/cmd/roachtest/option/node_list_option.go index 0fea800398f6..26915a9e68c5 100644 --- a/pkg/cmd/roachtest/option/node_list_option.go +++ b/pkg/cmd/roachtest/option/node_list_option.go @@ -83,6 +83,29 @@ func (n NodeListOption) SeededRandList(rand *rand.Rand, size int) (NodeListOptio return nodes[:size], nil } +// SeededRandGroups splits up the NodeListOption into numGroups groups of nodes using +// a seeded rand object. Nodes are not guaranteed to be evenly distributed among groups, +// but groups are guaranteed to have at least one node. +func (n NodeListOption) SeededRandGroups(rand *rand.Rand, numGroups int) ([]NodeListOption, error) { + if numGroups > len(n) { + return nil, fmt.Errorf("cannot partition nodes - numGroups: %d > len: %d", numGroups, len(n)) + } + + nodes := append([]int{}, n...) + rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] }) + + groups := make([]NodeListOption, numGroups) + // Assign each group at least one node. + for i := 0; i < numGroups; i++ { + groups[i] = NodeListOption{nodes[i]} + } + for i := numGroups; i < len(nodes); i++ { + groupIdx := rand.Intn(numGroups) + groups[groupIdx] = append(groups[groupIdx], nodes[i]) + } + return groups, nil +} + // NodeIDsString returns the nodes in the NodeListOption, separated by spaces. func (n NodeListOption) NodeIDsString() string { result := "" diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 19f5c9b9eb05..d3de7dbfdbd5 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -226,6 +226,7 @@ go_library( "//pkg/clusterversion", "//pkg/cmd/cmpconn", "//pkg/cmd/roachprod-microbench/util", + "//pkg/cmd/roachprod/grafana", "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/clusterstats", "//pkg/cmd/roachtest/grafana", diff --git a/pkg/cmd/roachtest/tests/failure_injection.go b/pkg/cmd/roachtest/tests/failure_injection.go index 4c546c75b650..516233c772a5 100644 --- a/pkg/cmd/roachtest/tests/failure_injection.go +++ b/pkg/cmd/roachtest/tests/failure_injection.go @@ -9,8 +9,10 @@ import ( "context" "fmt" "math/rand" + "strings" "time" + "github.com/cockroachdb/cockroach/pkg/cmd/roachprod/grafana" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" @@ -21,17 +23,35 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/failureinjection/failures" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" ) +// failureSmokeTest is a validation test for a FailureMode. The order of steps is: +// 1. Start workload in the background. +// 2. Setup() +// 3. Inject() +// 4. validateFailure() +// 5. WaitForFailureToPropagate() +// 6. Recover() +// 7. WaitForFailureToRecover() +// 8. validateRecover() +// 9. Cleanup() +// +// Note the asymmetry in the order of validateFailure() being called before +// WaitForFailureToPropagate() and validateRecover() being called after WaitForFailureToRecover(). +// Both wait methods block until the cluster has stabilized, so we want to validate the failure +// before the cluster stabilizes but validate the recover after. type failureSmokeTest struct { testName string failureName string args failures.FailureArgs - // Validate that the failure was injected correctly, called after Setup() and Inject(). - validateFailure func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error - // Validate that the failure was restored correctly, called after Restore(). - validateRestore func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error + // Validate that the failure was injected correctly. + validateFailure func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error + // Validate that the failure was recovered correctly. + validateRecover func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error + // The workload to be run during the failureSmokeTest, if nil, defaultSmokeTestWorkload is used. + workload func(ctx context.Context, c cluster.Cluster, args ...string) error } func (t *failureSmokeTest) run( @@ -52,7 +72,8 @@ func (t *failureSmokeTest) run( quietLogger = l } l.Printf("%s: Running Cleanup(); details in %s.log", t.failureName, file) - err = errors.CombineErrors(err, failureMode.Cleanup(ctx, quietLogger, t.args)) + // Best effort try to clean up the test even if the test context gets cancelled. + err = errors.CombineErrors(err, failureMode.Cleanup(context.Background(), quietLogger, t.args)) }() quietLogger, file, err := roachtestutil.LoggerForCmd(l, c.CRDBNodes(), t.testName, "setup") @@ -69,11 +90,21 @@ func (t *failureSmokeTest) run( return err } l.Printf("%s: Running Inject(); details in %s.log", t.failureName, file) + if err = c.AddGrafanaAnnotation(ctx, l, grafana.AddAnnotationRequest{ + Text: fmt.Sprintf("%s injected", t.testName), + }); err != nil { + return err + } if err = failureMode.Inject(ctx, quietLogger, t.args); err != nil { return err } - // Allow the failure to take effect. + l.Printf("validating failure was properly injected") + if err = t.validateFailure(ctx, l, c, failureMode); err != nil { + return err + } + + // Allow the cluster to stabilize after the failure. quietLogger, file, err = roachtestutil.LoggerForCmd(l, c.CRDBNodes(), t.testName, "wait for propagate") if err != nil { return err @@ -83,42 +114,46 @@ func (t *failureSmokeTest) run( return err } - l.Printf("validating failure was properly injected") - if err = t.validateFailure(ctx, l, c); err != nil { + quietLogger, file, err = roachtestutil.LoggerForCmd(l, c.CRDBNodes(), t.testName, "recover") + if err != nil { return err } - - quietLogger, file, err = roachtestutil.LoggerForCmd(l, c.CRDBNodes(), t.testName, "restore") - if err != nil { + l.Printf("%s: Running Recover(); details in %s.log", t.failureName, file) + if err = c.AddGrafanaAnnotation(ctx, l, grafana.AddAnnotationRequest{ + Text: fmt.Sprintf("%s recovered", t.testName), + }); err != nil { return err } - l.Printf("%s: Running Restore(); details in %s.log", t.failureName, file) - if err = failureMode.Restore(ctx, quietLogger, t.args); err != nil { + if err = failureMode.Recover(ctx, quietLogger, t.args); err != nil { return err } // Allow the cluster to return to normal. - quietLogger, file, err = roachtestutil.LoggerForCmd(l, c.CRDBNodes(), t.testName, "wait for restore") + quietLogger, file, err = roachtestutil.LoggerForCmd(l, c.CRDBNodes(), t.testName, "wait for recover") if err != nil { return err } - l.Printf("%s: Running WaitForFailureToRestore(); details in %s.log", t.failureName, file) - if err = failureMode.WaitForFailureToRestore(ctx, quietLogger, t.args); err != nil { + l.Printf("%s: Running WaitForFailureToRecover(); details in %s.log", t.failureName, file) + if err = failureMode.WaitForFailureToRecover(ctx, quietLogger, t.args); err != nil { return err } - l.Printf("validating failure was properly restored") - return t.validateRestore(ctx, l, c) + l.Printf("validating failure was properly recovered") + return t.validateRecover(ctx, l, c, failureMode) } func (t *failureSmokeTest) noopRun( - ctx context.Context, l *logger.Logger, c cluster.Cluster, _ *failures.FailureRegistry, + ctx context.Context, l *logger.Logger, c cluster.Cluster, fr *failures.FailureRegistry, ) error { - if err := t.validateFailure(ctx, l, c); err == nil { + failureMode, err := fr.GetFailureMode(c.MakeNodes(c.CRDBNodes()), t.failureName, l, c.IsSecure()) + if err != nil { + return err + } + if err := t.validateFailure(ctx, l, c, failureMode); err == nil { return errors.New("no failure was injected but validation still passed") } - if err := t.validateRestore(ctx, l, c); err != nil { - return errors.Wrapf(err, "no failure was injected but post restore validation still failed") + if err := t.validateRecover(ctx, l, c, failureMode); err != nil { + return errors.Wrapf(err, "no failure was injected but post recover validation still failed") } return nil } @@ -141,7 +176,7 @@ var bidirectionalNetworkPartitionTest = func(c cluster.Cluster) failureSmokeTest Type: failures.Bidirectional, }}, }, - validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error { + validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode)) if err != nil { return err @@ -159,7 +194,7 @@ var bidirectionalNetworkPartitionTest = func(c cluster.Cluster) failureSmokeTest } return nil }, - validateRestore: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error { + validateRecover: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode)) if err != nil { return err @@ -189,7 +224,7 @@ var asymmetricIncomingNetworkPartitionTest = func(c cluster.Cluster) failureSmok Type: failures.Incoming, }}, }, - validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error { + validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode)) if err != nil { return err @@ -207,7 +242,7 @@ var asymmetricIncomingNetworkPartitionTest = func(c cluster.Cluster) failureSmok } return nil }, - validateRestore: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error { + validateRecover: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode)) if err != nil { return err @@ -237,7 +272,7 @@ var asymmetricOutgoingNetworkPartitionTest = func(c cluster.Cluster) failureSmok Type: failures.Outgoing, }}, }, - validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error { + validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode)) if err != nil { return err @@ -255,7 +290,7 @@ var asymmetricOutgoingNetworkPartitionTest = func(c cluster.Cluster) failureSmok } return nil }, - validateRestore: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error { + validateRecover: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode)) if err != nil { return err @@ -293,7 +328,7 @@ var latencyTest = func(c cluster.Cluster) failureSmokeTest { }, }, }, - validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error { + validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { // Note that this is one way latency, since the sender doesn't have the matching port. delayedLatency, err := roachtestutil.PortLatency(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode)) if err != nil { @@ -311,7 +346,7 @@ var latencyTest = func(c cluster.Cluster) failureSmokeTest { } return nil }, - validateRestore: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error { + validateRecover: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { delayedLatency, err := roachtestutil.PortLatency(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode)) if err != nil { return err @@ -324,58 +359,322 @@ var latencyTest = func(c cluster.Cluster) failureSmokeTest { return errors.Errorf("expected latency between nodes with artificial latency (n%d and n%d) to be close to latency between nodes without (n%d and n%d)", srcNode, destNode, unaffectedNode, destNode) } if delayedLatency > 500*time.Millisecond { - return errors.Errorf("expected latency between nodes with artificial latency (n%d and n%d) to have restored to at least less than 500ms", srcNode, destNode) + return errors.Errorf("expected latency between nodes with artificial latency (n%d and n%d) to have recovred to at least less than 500ms", srcNode, destNode) + } + return nil + }, + } +} + +var cgroupsDiskStallTests = func(c cluster.Cluster) []failureSmokeTest { + type rwBytes struct { + stalledRead int + stalledWrite int + unaffectedRead int + unaffectedWrite int + } + getRWBytes := func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f *failures.CGroupDiskStaller, stalledNode, unaffectedNode option.NodeListOption) (rwBytes, error) { + stalledReadBytes, stalledWriteBytes, err := f.GetReadWriteBytes(ctx, l, stalledNode.InstallNodes()) + if err != nil { + return rwBytes{}, err + } + unaffectedNodeReadBytes, unaffectedNodeWriteBytes, err := f.GetReadWriteBytes(ctx, l, unaffectedNode.InstallNodes()) + if err != nil { + return rwBytes{}, err + } + + return rwBytes{ + stalledRead: stalledReadBytes, + stalledWrite: stalledWriteBytes, + unaffectedRead: unaffectedNodeReadBytes, + unaffectedWrite: unaffectedNodeWriteBytes, + }, nil + } + + // Returns the read and write bytes read/written to disk over the last 30 seconds of the + // stalled node and a control unaffected node. + getRWBytesOverTime := func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f *failures.CGroupDiskStaller, stalledNode, unaffectedNode option.NodeListOption) (rwBytes, error) { + beforeRWBytes, err := getRWBytes(ctx, l, c, f, stalledNode, unaffectedNode) + if err != nil { + return rwBytes{}, err + } + // Evict the store from memory on each VM to force them to read from disk. + // Without this, we don't run the workload long enough for the process to read + // from disk instead of memory. + c.Run(ctx, option.WithNodes(c.CRDBNodes()), "vmtouch -ve /mnt/data1") + select { + case <-ctx.Done(): + return rwBytes{}, ctx.Err() + case <-time.After(30 * time.Second): + } + + afterRWBytes, err := getRWBytes(ctx, l, c, f, stalledNode, unaffectedNode) + if err != nil { + return rwBytes{}, err + } + + return rwBytes{ + stalledRead: afterRWBytes.stalledRead - beforeRWBytes.stalledRead, + stalledWrite: afterRWBytes.stalledWrite - beforeRWBytes.stalledWrite, + unaffectedRead: afterRWBytes.unaffectedRead - beforeRWBytes.unaffectedRead, + unaffectedWrite: afterRWBytes.unaffectedWrite - beforeRWBytes.unaffectedWrite, + }, nil + } + + assertRWBytes := func(ctx context.Context, l *logger.Logger, bytes rwBytes, readsStalled, writesStalled bool) error { + // The threshold of bytes that we consider i/o to be "stalled". It would be nice to + // assert that it is 0 (it usually is), but because of how cgroups throttles io, + // (throughput is not a hard limit and limits can accumulate for bursts of io) we + // sometimes see some throughput. The threshold of 90k bytes was chosen as double + // the estimated throughput cgroups will allow over 30 seconds. + threshold := 90000 + + // If writes are stalled, assert that we observe no writes to disk on the stalled node. + if writesStalled { + if bytes.stalledWrite > threshold { + return errors.Errorf("expected stalled node to have written no bytes to disk, but wrote %d", bytes.stalledWrite) + } + } else { + // If writes are not stalled, e.g. read only stall or failure mode was recovered, + // then assert that we observe writes to disk on the stalled node. + if bytes.stalledWrite < threshold { + return errors.Errorf("writes were not stalled on the stalled node, but only %d bytes were written", bytes.stalledWrite) + } + } + // The unaffected node should always be writing to disk. + if bytes.unaffectedWrite < threshold { + return errors.Errorf("writes were not stalled on the unaffected node, but only %d bytes were written", bytes.unaffectedWrite) + } + + // When checking that reads _aren't_ stalled, we have to use a weaker assertion. + // Trying to force the process to read a consistent amount of bytes from disk + // proves to be difficult and sometimes the process simply doesn't read that + // many bytes. One reason is that the smoke test may choose to stall a majority + // of nodes, so we lose quorum greatly decreasing overall throughput. However, + // even within the same test configuration we still see high variation. + readLowerThreshold := 4096 + + // Same assertions as above but for reads instead. + if readsStalled { + if bytes.stalledRead > threshold { + return errors.Errorf("expected stalled node to have read no bytes from disk, but read %d", bytes.stalledRead) + } + } else { + if bytes.stalledRead < readLowerThreshold { + return errors.Errorf("reads were not stalled on the stalled node, but only %d bytes were read", bytes.stalledRead) + } + } + if bytes.unaffectedRead < readLowerThreshold { + return errors.Errorf("reads were not stalled on the unaffected node, but only %d bytes were read", bytes.unaffectedRead) + } + return nil + } + + var tests []failureSmokeTest + for _, stallWrites := range []bool{true, false} { + for _, stallReads := range []bool{true, false} { + if !stallWrites && !stallReads { + continue + } + + rng, _ := randutil.NewPseudoRand() + // SeededRandGroups only returns an error if the requested size is larger than the + // number of nodes, so we can safely ignore the error. + groups, _ := c.CRDBNodes().SeededRandGroups(rng, 2 /* numGroups */) + stalledNodeGroup := groups[0] + unaffectedNodeGroup := groups[1] + // To simplify the smoke test, only run validation on these two + // randomly chosen nodes. + stalledNode := stalledNodeGroup.SeededRandNode(rng) + unaffectedNode := unaffectedNodeGroup.SeededRandNode(rng) + + testName := fmt.Sprintf("%s/WritesStalled=%t/ReadsStalled=%t", failures.CgroupsDiskStallName, stallWrites, stallReads) + tests = append(tests, failureSmokeTest{ + testName: testName, + failureName: failures.CgroupsDiskStallName, + args: failures.DiskStallArgs{ + StallWrites: stallWrites, + StallReads: stallReads, + RestartNodes: true, + Nodes: stalledNodeGroup.InstallNodes(), + }, + validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { + l.Printf("Stalled nodes: %d, Unaffected nodes: %d, Stalled validation node: %d, Unaffected validation node: %d", stalledNodeGroup, unaffectedNodeGroup, stalledNode, unaffectedNode) + res, err := getRWBytesOverTime(ctx, l, c, f.(*failures.CGroupDiskStaller), stalledNode, unaffectedNode) + if err != nil { + return err + } + l.Printf("ReadBytes and WriteBytes over last 30 seconds:%+v", res) + + return assertRWBytes(ctx, l, res, stallReads, stallWrites) + }, + validateRecover: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { + // Wait for replication since the stalled node may have just restarted. + // TODO(darryl): The failure mode itself should do this in WaitForFailureToRecover. + // It should also wait for replicas to rebalance, although this test is not large + // enough for that to matter. + db := c.Conn(ctx, l, stalledNode[0]) + defer db.Close() + if err := roachtestutil.WaitForReplication(ctx, l, db, 3 /* replicationFactor */, roachtestutil.AtLeastReplicationFactor); err != nil { + return err + } + res, err := getRWBytesOverTime(ctx, l, c, f.(*failures.CGroupDiskStaller), stalledNode, unaffectedNode) + if err != nil { + return err + } + l.Printf("ReadBytes and WriteBytes over last 30 seconds:%+v", res) + + // The cluster should be fully recovered from the stalls, so assert that + // reads and writes are not stalled on both the stalled and unaffected nodes. + return assertRWBytes(ctx, l, res, false /*readsStalled*/, false /*writesStalled*/) + }, + workload: func(ctx context.Context, c cluster.Cluster, args ...string) error { + return defaultFailureSmokeTestWorkload(ctx, c, + // Tolerate errors as we expect nodes to fatal. + "--tolerate-errors", + // Without this, kv workload will read from keys with no values. + "--sequential", "--cycle-length=1000", "--read-percent=50", + // Bump up the block bytes in attempt to make the test more stable. This will + // both increase the throughput and reduce the chance we see cgroups allow io + // since any given io request will be much larger. + "--min-block-bytes=65536", "--max-block-bytes=65536") + }, + }) + } + } + + return tests +} + +var dmsetupDiskStallTest = func(c cluster.Cluster) failureSmokeTest { + rng, _ := randutil.NewPseudoRand() + // SeededRandGroups only returns an error if the requested size is larger than the + // number of nodes, so we can safely ignore the error. + groups, _ := c.CRDBNodes().SeededRandGroups(rng, 2 /* numGroups */) + stalledNodeGroup := groups[0] + unaffectedNodeGroup := groups[1] + // These are the nodes that we will run validation on. + stalledNode := stalledNodeGroup.SeededRandNode(rng) + unaffectedNode := unaffectedNodeGroup.SeededRandNode(rng) + + // touchFile attempts to create a file and returns whether it succeeded. + touchFile := func(ctx context.Context, l *logger.Logger, c cluster.Cluster, node option.NodeListOption) bool { + timeoutCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + err := c.RunE(timeoutCtx, option.WithNodes(node), "touch /mnt/data1/test.txt") + if err != nil { + l.Printf("failed to create file on node %d: %v", node, err) + return false + } + return true + } + + return failureSmokeTest{ + testName: failures.DmsetupDiskStallName, + failureName: failures.DmsetupDiskStallName, + args: failures.DiskStallArgs{ + Nodes: stalledNodeGroup.InstallNodes(), + RestartNodes: true, + }, + validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { + l.Printf("Stalled nodes: %d, Unaffected nodes: %d, Stalled validation node: %d, Unaffected validation node: %d", stalledNodeGroup, unaffectedNodeGroup, stalledNode, unaffectedNode) + if touchFile(ctx, l, c, stalledNode) { + return errors.Errorf("expected node %d to be stalled and creating a file to hang", stalledNode) + } + if !touchFile(ctx, l, c, unaffectedNode) { + return errors.Errorf("expected creating a file to work on unaffected node %d", stalledNode) } return nil }, + validateRecover: func(ctx context.Context, l *logger.Logger, c cluster.Cluster, f failures.FailureMode) error { + if !touchFile(ctx, l, c, stalledNode) { + return errors.Errorf("expected creating a file to work on stalled node %d", stalledNode) + } + return nil + }, + workload: func(ctx context.Context, c cluster.Cluster, args ...string) error { + // Tolerate errors as we expect nodes to fatal. + return defaultFailureSmokeTestWorkload(ctx, c, "--tolerate-errors") + }, } } -func setupFailureSmokeTests(ctx context.Context, t test.Test, c cluster.Cluster) error { +func defaultFailureSmokeTestWorkload(ctx context.Context, c cluster.Cluster, args ...string) error { + workloadArgs := strings.Join(args, " ") + cmd := roachtestutil.NewCommand("./cockroach workload run kv %s", workloadArgs). + Arg("{pgurl%s}", c.CRDBNodes()). + String() + return c.RunE(ctx, option.WithNodes(c.WorkloadNode()), cmd) +} + +func setupFailureSmokeTests( + ctx context.Context, t test.Test, c cluster.Cluster, fr *failures.FailureRegistry, +) error { // Download any dependencies needed. if err := c.Install(ctx, t.L(), c.CRDBNodes(), "nmap"); err != nil { return err } - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.CRDBNodes()) - // Run a light workload in the background so we have some traffic in the database. + if err := c.Install(ctx, t.L(), c.CRDBNodes(), "vmtouch"); err != nil { + return err + } + startSettings := install.MakeClusterSettings() + startSettings.Env = append(startSettings.Env, + // Increase the time writes must be stalled before a node fatals. Disk stall tests + // want to query read/write bytes for 30 seconds to validate that the stall is working, + // so we set max sync duration to 2 minutes to make sure the node doesn't die before then. + // Don't disable outright as we still want to test that the node eventually dies. + fmt.Sprintf("COCKROACH_LOG_MAX_SYNC_DURATION=%s", 2*time.Minute), + fmt.Sprintf("COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=%s", 2*time.Minute)) + c.Start(ctx, t.L(), option.DefaultStartOpts(), startSettings, c.CRDBNodes()) + + // Initialize the workloads we will use. c.Run(ctx, option.WithNodes(c.WorkloadNode()), "./cockroach workload init kv {pgurl:1}") - t.Go(func(goCtx context.Context, l *logger.Logger) error { - return c.RunE(goCtx, option.WithNodes(c.WorkloadNode()), "./cockroach workload run kv {pgurl:1-3}") - }, task.WithContext(ctx)) return nil } func runFailureSmokeTest(ctx context.Context, t test.Test, c cluster.Cluster, noopFailer bool) { - if err := setupFailureSmokeTests(ctx, t, c); err != nil { - t.Fatal(err) - } fr := failures.NewFailureRegistry() fr.Register() + if err := setupFailureSmokeTests(ctx, t, c, fr); err != nil { + t.Error(err) + } var failureSmokeTests = []failureSmokeTest{ bidirectionalNetworkPartitionTest(c), asymmetricIncomingNetworkPartitionTest(c), asymmetricOutgoingNetworkPartitionTest(c), latencyTest(c), + dmsetupDiskStallTest(c), } + failureSmokeTests = append(failureSmokeTests, cgroupsDiskStallTests(c)...) // Randomize the order of the tests in case any of the failures have unexpected side - // effects that may mask failures, e.g. a cgroups disk stall isn't properly restored + // effects that may mask failures, e.g. a cgroups disk stall isn't properly recovered // which causes a dmsetup disk stall to appear to work even if it doesn't. rand.Shuffle(len(failureSmokeTests), func(i, j int) { failureSmokeTests[i], failureSmokeTests[j] = failureSmokeTests[j], failureSmokeTests[i] }) for _, test := range failureSmokeTests { - t.L().Printf("running %s test", test.testName) + t.L().Printf("\n=====running %s test=====", test.testName) if noopFailer { if err := test.noopRun(ctx, t.L(), c, fr); err != nil { t.Fatal(err) } } else { - if err := test.run(ctx, t.L(), c, fr); err != nil { + backgroundWorkload := defaultFailureSmokeTestWorkload + if test.workload != nil { + backgroundWorkload = test.workload + } + cancel := t.GoWithCancel(func(goCtx context.Context, l *logger.Logger) error { + return backgroundWorkload(goCtx, c) + }, task.Name(fmt.Sprintf("%s-workload", test.testName))) + err := test.run(ctx, t.L(), c, fr) + cancel() + if err != nil { t.Fatal(errors.Wrapf(err, "%s failed", test.testName)) } + } t.L().Printf("%s test complete", test.testName) } diff --git a/pkg/roachprod/failureinjection/failures/BUILD.bazel b/pkg/roachprod/failureinjection/failures/BUILD.bazel index 0f2e2e76cc6c..f889e1eac21e 100644 --- a/pkg/roachprod/failureinjection/failures/BUILD.bazel +++ b/pkg/roachprod/failureinjection/failures/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "failures", srcs = [ + "disk_stall.go", "failure.go", "latency.go", "network_partition.go", @@ -14,6 +15,8 @@ go_library( "//pkg/roachprod", "//pkg/roachprod/install", "//pkg/roachprod/logger", + "//pkg/util/retry", + "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/roachprod/failureinjection/failures/disk_stall.go b/pkg/roachprod/failureinjection/failures/disk_stall.go new file mode 100644 index 000000000000..da33f7ea6de2 --- /dev/null +++ b/pkg/roachprod/failureinjection/failures/disk_stall.go @@ -0,0 +1,495 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package failures + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachprod" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/errors" +) + +var cockroachIOController = filepath.Join("/sys/fs/cgroup/system.slice", install.VirtualClusterLabel(install.SystemInterfaceName, 0)+".service", "io.max") + +const CgroupsDiskStallName = "cgroup-disk-stall" + +type CGroupDiskStaller struct { + GenericFailure +} + +func MakeCgroupDiskStaller(clusterName string, l *logger.Logger, secure bool) (FailureMode, error) { + c, err := roachprod.GetClusterFromCache(l, clusterName, install.SecureOption(secure)) + if err != nil { + return nil, err + } + genericFailure := GenericFailure{c: c, runTitle: CgroupsDiskStallName} + return &CGroupDiskStaller{GenericFailure: genericFailure}, nil +} + +func registerCgroupDiskStall(r *FailureRegistry) { + r.add(CgroupsDiskStallName, DiskStallArgs{}, MakeCgroupDiskStaller) +} + +type DiskStallArgs struct { + StallLogs bool + StallReads bool + StallWrites bool + // If true, allow the failure mode to restart nodes as needed. E.g. dmsetup requires + // the cockroach process to not be running to properly setup. If RestartNodes is true, + // then the failure mode will restart the cluster for the user. + RestartNodes bool + // Throughput is the bytes per second to throttle reads/writes to. If unset, will + // stall reads/write completely. Supported only for cgroup disk staller, dmsetup + // only supports fully stalling reads/writes. + Throughput int + Nodes install.Nodes +} + +func (s *CGroupDiskStaller) Description() string { + return CgroupsDiskStallName +} + +func (s *CGroupDiskStaller) Setup(ctx context.Context, l *logger.Logger, args FailureArgs) error { + diskStallArgs := args.(DiskStallArgs) + + // To stall logs we need to create a symlink that points to our stalled + // store directory. In order to do that we need to temporarily move the + // existing logs directory and copy the contents over after. If a symlink + // already exists, don't attempt to recreate it. + if diskStallArgs.StallLogs { + createSymlinkCmd := ` +if [ ! -L logs ]; then + echo "creating symlink"; + mkdir -p {store-dir}/logs; + ln -s {store-dir}/logs logs; +fi +` + if err := s.Run(ctx, l, diskStallArgs.Nodes, createSymlinkCmd); err != nil { + return err + } + } + return nil +} +func (s *CGroupDiskStaller) Cleanup(ctx context.Context, l *logger.Logger, args FailureArgs) error { + stallType := []bandwidthType{readBandwidth, writeBandwidth} + nodes := args.(DiskStallArgs).Nodes + + // Setting cgroup limits is idempotent so attempt to unlimit reads/writes in case + // something went wrong in Recover. + err := s.setThroughput(ctx, l, stallType, throughput{limited: false}, nodes, cockroachIOController) + if err != nil { + l.PrintfCtx(ctx, "error unstalling the disk; stumbling on: %v", err) + } + if args.(DiskStallArgs).StallLogs { + if err = s.Run(ctx, l, nodes, "unlink logs/logs"); err != nil { + return err + } + } + return nil +} + +func getStallTypes(diskStallArgs DiskStallArgs) ([]bandwidthType, error) { + var stallTypes []bandwidthType + if diskStallArgs.StallWrites { + stallTypes = []bandwidthType{writeBandwidth} + } else if diskStallArgs.StallLogs { + return nil, errors.New("stalling logs is not supported without stalling writes") + } + if diskStallArgs.StallReads { + stallTypes = append(stallTypes, readBandwidth) + } + if len(stallTypes) == 0 { + return nil, errors.New("at least one of reads or writes must be stalled") + } + return stallTypes, nil +} + +func (s *CGroupDiskStaller) Inject(ctx context.Context, l *logger.Logger, args FailureArgs) error { + diskStallArgs := args.(DiskStallArgs) + stallTypes, err := getStallTypes(diskStallArgs) + if err != nil { + return err + } + + // N.B. Although the cgroupsv2 documentation states that "limits are in the range [0, max]", + // attempting to set a bytesPerSecond=0 results in a `Numerical result out of range` error + // from the io.max cgroupv2 API. Upon inspection of the blk-throttle implementation, we can + // see an explicit `if (!val)` error check disallowing 0 values. + // + // Similarly, attempting to set a bytesPerSecond=1 results in an `Invalid argument` error + // due to an additional check that `val > 1`. Interestingly, this appears to be an Ubunutu + // 22.04+ addition, as older distributions and the upstream cgroup implementation do not + // have this check. + // + // This additional check appears to protect against the io hanging when allowing bursts + // of io, i.e. allowing io limits to gradually accumulate even if the soft limit is too low + // to serve the system's request. Said burst allowance is calculated roughly as: + // `adj_limit = limit + (limit >> 1) * adj_limit`. When the limit is 1, we can see + // the adjusted limit will never increase, potentially blocking io requests indefinitely. + // While this is exactly what we want, it's not the intended use case and is invalid. + bytesPerSecond := 2 + if diskStallArgs.Throughput == 1 { + return errors.New("cgroups v2 requires a io throughput of at least 2 bytes per second") + } else if diskStallArgs.Throughput > 1 { + bytesPerSecond = diskStallArgs.Throughput + } + + nodes := diskStallArgs.Nodes + + // Shuffle the order of read and write stall initiation. + rand.Shuffle(len(stallTypes), func(i, j int) { + stallTypes[i], stallTypes[j] = stallTypes[j], stallTypes[i] + }) + + defer func() { + // Log the cgroup bandwidth limits for debugging purposes. + err = s.Run(ctx, l, nodes, "cat", cockroachIOController) + if err != nil { + l.Printf("failed to log cgroup bandwidth limits: %v", err) + } + }() + + l.Printf("stalling disk I/O on nodes %d", nodes) + if err := s.setThroughput(ctx, l, stallTypes, throughput{limited: true, bytesPerSecond: fmt.Sprintf("%d", bytesPerSecond)}, nodes, cockroachIOController); err != nil { + return err + } + + return nil +} + +func (s *CGroupDiskStaller) Recover(ctx context.Context, l *logger.Logger, args FailureArgs) error { + diskStallArgs := args.(DiskStallArgs) + stallTypes, err := getStallTypes(diskStallArgs) + if err != nil { + return err + } + + nodes := diskStallArgs.Nodes + + cockroachIOController := filepath.Join("/sys/fs/cgroup/system.slice", install.VirtualClusterLabel(install.SystemInterfaceName, 0)+".service", "io.max") + + l.Printf("unstalling disk I/O on nodes %d", nodes) + // N.B. cgroups v2 relies on systemd running, however if our disk stall + // was injected for too long, the cockroach process will detect a disk stall + // and exit. This deletes the cockroach service and there is no need to + // unlimit anything. Instead, restart the node if RestartNodes is true. + err = s.setThroughput(ctx, l, stallTypes, throughput{limited: false}, nodes, cockroachIOController) + return forEachNode(diskStallArgs.Nodes, func(n install.Nodes) error { + err = s.Run(ctx, l, n, "cat", cockroachIOController) + if err != nil && diskStallArgs.RestartNodes { + l.Printf("failed to log cgroup bandwidth limits, assuming n%d exited and restarting: %v", n, err) + return s.StartNodes(ctx, l, n) + } + return nil + }) +} + +func (s *CGroupDiskStaller) WaitForFailureToPropagate( + ctx context.Context, l *logger.Logger, args FailureArgs, +) error { + diskStallArgs := args.(DiskStallArgs) + if diskStallArgs.StallWrites { + // If writes are stalled, we expect the disk stall detection to kick in + // and kill the node. + return forEachNode(diskStallArgs.Nodes, func(n install.Nodes) error { + return s.WaitForSQLUnavailable(ctx, l, n, 3*time.Minute) + }) + } + return nil +} + +func (s *CGroupDiskStaller) WaitForFailureToRecover( + ctx context.Context, l *logger.Logger, args FailureArgs, +) error { + nodes := args.(DiskStallArgs).Nodes + return forEachNode(nodes, func(n install.Nodes) error { + return s.WaitForSQLReady(ctx, l, n, time.Minute) + }) +} + +type throughput struct { + limited bool + bytesPerSecond string +} + +type bandwidthType int8 + +const ( + readBandwidth bandwidthType = iota + writeBandwidth +) + +func (rw bandwidthType) cgroupV2BandwidthProp() string { + switch rw { + case readBandwidth: + return "rbps" + case writeBandwidth: + return "wbps" + default: + panic("unreachable") + } +} + +func (s *CGroupDiskStaller) setThroughput( + ctx context.Context, + l *logger.Logger, + readOrWrite []bandwidthType, + bw throughput, + nodes install.Nodes, + cockroachIOController string, +) error { + maj, min, err := s.DiskDeviceMajorMinor(ctx, l) + if err != nil { + return err + } + + var limits []string + for _, rw := range readOrWrite { + bytesPerSecondStr := "max" + if bw.limited { + bytesPerSecondStr = bw.bytesPerSecond + } + limits = append(limits, fmt.Sprintf("%s=%s", rw.cgroupV2BandwidthProp(), bytesPerSecondStr)) + } + l.Printf("setting cgroup bandwith limits:\n%v", limits) + + return s.Run(ctx, l, nodes, "sudo", "/bin/bash", "-c", fmt.Sprintf( + `'echo %d:%d %s > %s'`, + maj, + min, + strings.Join(limits, " "), + cockroachIOController, + )) +} + +// GetReadWriteBytes parses the io.stat file to get the number of bytes read and written. +// TODO(darryl): switch to using a lightweight exporter instead: https://github.com/cockroachdb/cockroach/issues/144052 +func (s *CGroupDiskStaller) GetReadWriteBytes( + ctx context.Context, l *logger.Logger, node install.Nodes, +) (int, int, error) { + maj, min, err := s.DiskDeviceMajorMinor(ctx, l) + if err != nil { + return 0, 0, err + } + // Check the number of bytes read and written to disk. + res, err := s.RunWithDetails( + ctx, l, node, + fmt.Sprintf(`grep -E '%d:%d' /sys/fs/cgroup/system.slice/io.stat |`, maj, min), + `grep -oE 'rbytes=[0-9]+|wbytes=[0-9]+' |`, + `awk -F= '{printf "%s ", $2} END {print ""}'`, + ) + if err != nil { + return 0, 0, err + } + fields := strings.Fields(res.Stdout) + if len(fields) != 2 { + return 0, 0, errors.Errorf("expected 2 fields, got %d: %s", len(fields), res.Stdout) + } + + readBytes, err := strconv.Atoi(fields[0]) + if err != nil { + return 0, 0, err + } + writeBytes, err := strconv.Atoi(fields[1]) + if err != nil { + return 0, 0, err + } + + return readBytes, writeBytes, nil +} + +const DmsetupDiskStallName = "dmsetup-disk-stall" + +type DmsetupDiskStaller struct { + GenericFailure +} + +func MakeDmsetupDiskStaller( + clusterName string, l *logger.Logger, secure bool, +) (FailureMode, error) { + c, err := roachprod.GetClusterFromCache(l, clusterName, install.SecureOption(secure)) + if err != nil { + return nil, err + } + + genericFailure := GenericFailure{c: c, runTitle: DmsetupDiskStallName} + return &DmsetupDiskStaller{GenericFailure: genericFailure}, nil +} + +func registerDmsetupDiskStall(r *FailureRegistry) { + r.add(DmsetupDiskStallName, DiskStallArgs{}, MakeDmsetupDiskStaller) +} + +func (s *DmsetupDiskStaller) Description() string { + return "dmsetup disk staller" +} + +func (s *DmsetupDiskStaller) Setup(ctx context.Context, l *logger.Logger, args FailureArgs) error { + diskStallArgs := args.(DiskStallArgs) + var err error + + // Disabling journaling requires the cockroach process to not have been started yet. + if diskStallArgs.RestartNodes { + // Use the default stop opts, if the user wants more control, they should manage + // the cluster restart themselves. + stopOpts := roachprod.DefaultStopOpts() + if err = s.StopCluster(ctx, l, stopOpts); err != nil { + return err + } + } + + dev, err := s.DiskDeviceName(ctx, l) + if err != nil { + return err + } + + // snapd will run "snapd auto-import /dev/dm-0" via udev triggers when + // /dev/dm-0 is created. This possibly interferes with the dmsetup create + // reload, so uninstall snapd. + if err = s.Run(ctx, l, s.c.Nodes, `sudo apt-get purge -y snapd`); err != nil { + return err + } + if err = s.Run(ctx, l, s.c.Nodes, `sudo umount -f /mnt/data1 || true`); err != nil { + return err + } + if err = s.Run(ctx, l, s.c.Nodes, `sudo dmsetup remove_all`); err != nil { + return err + } + // See https://github.com/cockroachdb/cockroach/issues/129619#issuecomment-2316147244. + if err = s.Run(ctx, l, s.c.Nodes, `sudo tune2fs -O ^has_journal `+dev); err != nil { + return errors.WithHintf(err, "disabling journaling fails if the cluster has been started") + } + if err = s.Run(ctx, l, s.c.Nodes, `echo "0 $(sudo blockdev --getsz `+dev+`) linear `+dev+` 0" | `+ + `sudo dmsetup create data1`); err != nil { + return err + } + // This has occasionally been seen to fail with "Device or resource busy", + // with no clear explanation. Try to find out who it is. + if err = s.Run(ctx, l, s.c.Nodes, "sudo bash -c 'ps aux; dmsetup status; mount; lsof'"); err != nil { + return err + } + + if err = s.Run(ctx, l, s.c.Nodes, `sudo mount /dev/mapper/data1 /mnt/data1`); err != nil { + return err + } + + if diskStallArgs.RestartNodes { + if err = s.StartCluster(ctx, l); err != nil { + return err + } + } + return nil +} + +func (s *DmsetupDiskStaller) Inject(ctx context.Context, l *logger.Logger, args FailureArgs) error { + nodes := args.(DiskStallArgs).Nodes + l.Printf("stalling disk I/O on nodes %d", nodes) + return s.Run(ctx, l, nodes, `sudo dmsetup suspend --noflush --nolockfs data1`) +} + +func (s *DmsetupDiskStaller) Recover( + ctx context.Context, l *logger.Logger, args FailureArgs, +) error { + diskStallArgs := args.(DiskStallArgs) + nodes := diskStallArgs.Nodes + l.Printf("unstalling disk I/O on nodes %d", nodes) + if err := s.Run(ctx, l, nodes, `sudo dmsetup resume data1`); err != nil { + return err + } + // If the disk stall was injected for long enough that the cockroach process + // detected it and shut down the node, then restart it. + return forEachNode(nodes, func(n install.Nodes) error { + if err := s.PingNode(ctx, l, n); err != nil && diskStallArgs.RestartNodes { + l.Printf("failed to connect to n%d, assuming node exited and restarting: %v", n, err) + return s.StartNodes(ctx, l, n) + } + return nil + }) +} + +func (s *DmsetupDiskStaller) Cleanup( + ctx context.Context, l *logger.Logger, args FailureArgs, +) error { + diskStallArgs := args.(DiskStallArgs) + if diskStallArgs.RestartNodes { + stopOpts := roachprod.DefaultStopOpts() + if err := s.StopCluster(ctx, l, stopOpts); err != nil { + return err + } + } + + dev, err := s.DiskDeviceName(ctx, l) + if err != nil { + return err + } + + if err := s.Run(ctx, l, s.c.Nodes, `sudo dmsetup resume data1`); err != nil { + return err + } + if err := s.Run(ctx, l, s.c.Nodes, `sudo umount /mnt/data1`); err != nil { + return err + } + if err := s.Run(ctx, l, s.c.Nodes, `sudo dmsetup remove data1`); err != nil { + return err + } + if err := s.Run(ctx, l, s.c.Nodes, `sudo tune2fs -O has_journal `+dev); err != nil { + return err + } + if err := s.Run(ctx, l, s.c.Nodes, `sudo mount /mnt/data1`); err != nil { + return err + } + // Reinstall snapd. + if err := s.Run(ctx, l, s.c.Nodes, `sudo apt-get install -y snapd`); err != nil { + return err + } + + // When we unmounted the disk in setup, the cgroups controllers may have been removed, re-add them. + if err := s.Run(ctx, l, s.c.Nodes, "sudo", "/bin/bash", "-c", + `'echo "+cpuset +cpu +io +memory +pids" > /sys/fs/cgroup/cgroup.subtree_control'`); err != nil { + return err + } + if err := s.Run(ctx, l, s.c.Nodes, "sudo", "/bin/bash", "-c", + `'echo "+cpuset +cpu +io +memory +pids" > /sys/fs/cgroup/system.slice/cgroup.subtree_control'`); err != nil { + return err + } + + if diskStallArgs.RestartNodes { + if err := s.StartCluster(ctx, l); err != nil { + return err + } + } + return nil +} + +func (s *DmsetupDiskStaller) WaitForFailureToPropagate( + ctx context.Context, l *logger.Logger, args FailureArgs, +) error { + nodes := args.(DiskStallArgs).Nodes + return forEachNode(nodes, func(n install.Nodes) error { + // If writes are stalled, we expect the disk stall detection to kick in + // and kill the node. + return forEachNode(nodes, func(n install.Nodes) error { + return s.WaitForSQLUnavailable(ctx, l, n, 3*time.Minute) + }) + }) +} + +func (s *DmsetupDiskStaller) WaitForFailureToRecover( + ctx context.Context, l *logger.Logger, args FailureArgs, +) error { + nodes := args.(DiskStallArgs).Nodes + return forEachNode(nodes, func(n install.Nodes) error { + return s.WaitForSQLReady(ctx, l, n, time.Minute) + }) +} diff --git a/pkg/roachprod/failureinjection/failures/failure.go b/pkg/roachprod/failureinjection/failures/failure.go index 7bf7eceb9d9d..acc2f1cfc0a8 100644 --- a/pkg/roachprod/failureinjection/failures/failure.go +++ b/pkg/roachprod/failureinjection/failures/failure.go @@ -7,10 +7,16 @@ package failures import ( "context" + "fmt" + "strconv" "strings" + "time" + "github.com/cockroachdb/cockroach/pkg/roachprod" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -36,9 +42,9 @@ type FailureMode interface { // Inject a failure into the system. Inject(ctx context.Context, l *logger.Logger, args FailureArgs) error - // Restore reverses the effects of Inject. The same args passed to Inject - // must be passed to Restore. - Restore(ctx context.Context, l *logger.Logger, args FailureArgs) error + // Recover reverses the effects of Inject. The same args passed to Inject + // must be passed to Recover. + Recover(ctx context.Context, l *logger.Logger, args FailureArgs) error // Cleanup uninstalls any dependencies that were installed by Setup. Cleanup(ctx context.Context, l *logger.Logger, args FailureArgs) error @@ -46,8 +52,14 @@ type FailureMode interface { // WaitForFailureToPropagate waits until the failure is at full effect. WaitForFailureToPropagate(ctx context.Context, l *logger.Logger, args FailureArgs) error - // WaitForFailureToRestore waits until the failure was restored completely along with any side effects. - WaitForFailureToRestore(ctx context.Context, l *logger.Logger, args FailureArgs) error + // WaitForFailureToRecover waits until the failure was recovered completely along with any side effects. + WaitForFailureToRecover(ctx context.Context, l *logger.Logger, args FailureArgs) error +} + +type diskDevice struct { + name string + major int + minor int } // GenericFailure is a generic helper struct that FailureModes can embed to @@ -59,18 +71,20 @@ type GenericFailure struct { // runTitle is the title to prefix command output with. runTitle string networkInterfaces []string + diskDevice diskDevice } func (f *GenericFailure) Run( ctx context.Context, l *logger.Logger, node install.Nodes, args ...string, ) error { cmd := strings.Join(args, " ") + l.Printf("running cmd: %s", cmd) // In general, most failures shouldn't be run locally out of caution. if f.c.IsLocal() { - l.Printf("Local cluster detected, logging command instead of running:\n%s", cmd) + l.Printf("Local cluster detected, skipping command execution") return nil } - return f.c.Run(ctx, l, l.Stdout, l.Stderr, install.WithNodes(node), f.runTitle, cmd) + return f.c.Run(ctx, l, l.Stdout, l.Stderr, install.WithNodes(node), fmt.Sprintf("%s-%d", f.runTitle, node), cmd) } func (f *GenericFailure) RunWithDetails( @@ -82,7 +96,7 @@ func (f *GenericFailure) RunWithDetails( l.Printf("Local cluster detected, logging command instead of running:\n%s", cmd) return install.RunResultDetails{}, nil } - res, err := f.c.RunWithDetails(ctx, l, install.WithNodes(node), f.runTitle, cmd) + res, err := f.c.RunWithDetails(ctx, l, install.WithNodes(node), fmt.Sprintf("%s-%d", f.runTitle, node), cmd) if err != nil { return install.RunResultDetails{}, err } @@ -107,3 +121,135 @@ func (f *GenericFailure) NetworkInterfaces( } return f.networkInterfaces, nil } + +func getDiskDevice(ctx context.Context, f *GenericFailure, l *logger.Logger) error { + if f.diskDevice.name == "" { + res, err := f.c.RunWithDetails(ctx, l, install.WithNodes(f.c.Nodes[:1]), "Get Disk Device", "lsblk -o NAME,MAJ:MIN,MOUNTPOINTS | grep /mnt/data1 | awk '{print $1, $2}'") + if err != nil { + return errors.Wrapf(err, "error when determining block device") + } + parts := strings.Split(strings.TrimSpace(res[0].Stdout), " ") + if len(parts) != 2 { + return errors.Newf("unexpected output from lsblk: %s", res[0].Stdout) + } + f.diskDevice.name = strings.TrimSpace(parts[0]) + major, minor, found := strings.Cut(parts[1], ":") + if !found { + return errors.Newf("unexpected output from lsblk: %s", res[0].Stdout) + } + if f.diskDevice.major, err = strconv.Atoi(major); err != nil { + return err + } + if f.diskDevice.minor, err = strconv.Atoi(minor); err != nil { + return err + } + } + return nil +} + +func (f *GenericFailure) DiskDeviceName(ctx context.Context, l *logger.Logger) (string, error) { + if err := getDiskDevice(ctx, f, l); err != nil { + return "", err + } + return "/dev/" + f.diskDevice.name, nil +} + +func (f *GenericFailure) DiskDeviceMajorMinor( + ctx context.Context, l *logger.Logger, +) (int, int, error) { + if err := getDiskDevice(ctx, f, l); err != nil { + return 0, 0, err + } + return f.diskDevice.major, f.diskDevice.minor, nil +} + +func (f *GenericFailure) PingNode( + ctx context.Context, l *logger.Logger, nodes install.Nodes, +) error { + // TODO(darryl): Consider having failure modes accept a db connection pool + // in makeFailureFunc() or having each failureMode manage it's own pool. + res, err := f.c.ExecSQL( + ctx, l, nodes, install.SystemInterfaceName, + 0, install.AuthUserCert, "", /* database */ + []string{"-e", "SELECT 1"}, + ) + + return errors.CombineErrors(err, res[0].Err) +} + +func (f *GenericFailure) WaitForSQLReady( + ctx context.Context, l *logger.Logger, node install.Nodes, timeout time.Duration, +) error { + start := timeutil.Now() + err := retryForDuration(ctx, timeout, func() error { + if err := f.PingNode(ctx, l, node); err == nil { + l.Printf("Connected to node %d after %s", node, timeutil.Since(start)) + return nil + } + return errors.Newf("unable to connect to node %d", node) + }) + + return errors.Wrapf(err, "never connected to node %d after %s", node, timeout) +} + +// WaitForSQLUnavailable pings a node until the SQL connection is unavailable. +func (f *GenericFailure) WaitForSQLUnavailable( + ctx context.Context, l *logger.Logger, node install.Nodes, timeout time.Duration, +) error { + start := timeutil.Now() + err := retryForDuration(ctx, timeout, func() error { + if err := f.PingNode(ctx, l, node); err != nil { + l.Printf("Connections to node %d unavailable after %s", node, timeutil.Since(start)) + //nolint:returnerrcheck + return nil + } + return errors.Newf("unable to connect to node %d", node) + }) + + return errors.Wrapf(err, "connections to node %d never unavailable after %s", node, timeout) +} + +func (f *GenericFailure) StopCluster( + ctx context.Context, l *logger.Logger, stopOpts roachprod.StopOpts, +) error { + return f.c.Stop(ctx, l, stopOpts.Sig, stopOpts.Wait, stopOpts.GracePeriod, "" /* VirtualClusterName*/) +} + +func (f *GenericFailure) StartCluster(ctx context.Context, l *logger.Logger) error { + return f.StartNodes(ctx, l, f.c.Nodes) +} + +func (f *GenericFailure) StartNodes( + ctx context.Context, l *logger.Logger, nodes install.Nodes, +) error { + // Invoke the cockroach start script directly so we restart the nodes with the same + // arguments as before. + return f.Run(ctx, l, nodes, "./cockroach.sh") +} + +// retryForDuration retries the given function until it returns nil or +// the context timeout is exceeded. +func retryForDuration(ctx context.Context, timeout time.Duration, fn func() error) error { + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + retryOpts := retry.Options{MaxRetries: 0} + r := retry.StartWithCtx(timeoutCtx, retryOpts) + for r.Next() { + if err := fn(); err == nil { + return nil + } + } + return errors.Newf("failed after %s", timeout) +} + +// forEachNode is a helper function that calls fn for each node in nodes. +func forEachNode(nodes install.Nodes, fn func(install.Nodes) error) error { + // TODO (darryl): Consider parallelizing this, for now all usages + // are fast enough for sequential calls. + for _, node := range nodes { + if err := fn(install.Nodes{node}); err != nil { + return err + } + } + return nil +} diff --git a/pkg/roachprod/failureinjection/failures/latency.go b/pkg/roachprod/failureinjection/failures/latency.go index bc7c52fa9b60..64799ac93296 100644 --- a/pkg/roachprod/failureinjection/failures/latency.go +++ b/pkg/roachprod/failureinjection/failures/latency.go @@ -80,6 +80,7 @@ func (f *NetworkLatency) Setup(ctx context.Context, l *logger.Logger, args Failu if err != nil { return err } + l.Printf("Setting up root htb qdisc on interfaces: %s", interfaces) cmd := failScriptEarlyCmd for _, iface := range interfaces { // Ignore the loopback interface since no CRDB traffic should go through it @@ -89,7 +90,6 @@ func (f *NetworkLatency) Setup(ctx context.Context, l *logger.Logger, args Failu } cmd += fmt.Sprintf(setupQdiscsCmd, iface) } - l.Printf("Setting up root htb qdisc with cmd: %s", cmd) if err := f.Run(ctx, l, f.c.Nodes, cmd); err != nil { return err } @@ -195,7 +195,7 @@ func (f *NetworkLatency) Inject(ctx context.Context, l *logger.Logger, args Fail } cmd += fmt.Sprintf(addFilterCmd, iface, class, handle, latency.Delay, dest) } - l.Printf("Adding artificial latency from nodes %d to node %d with cmd: %s", latency.Source, dest, cmd) + l.Printf("Adding artificial latency from nodes %d to node %d", latency.Source, dest) if err := f.Run(ctx, l, latency.Source, cmd); err != nil { return err } @@ -204,7 +204,7 @@ func (f *NetworkLatency) Inject(ctx context.Context, l *logger.Logger, args Fail return nil } -func (f *NetworkLatency) Restore(ctx context.Context, l *logger.Logger, args FailureArgs) error { +func (f *NetworkLatency) Recover(ctx context.Context, l *logger.Logger, args FailureArgs) error { latencies := args.(NetworkLatencyArgs).ArtificialLatencies for _, latency := range latencies { for _, dest := range latency.Destination { @@ -215,7 +215,7 @@ func (f *NetworkLatency) Restore(ctx context.Context, l *logger.Logger, args Fai class, ok := f.filterNameToClassMap[latency.String()] if !ok { - return errors.New("failed trying to restore latency failure, ArtificialLatency rule was not found: %+v") + return errors.New("failed trying to recover latency failure, ArtificialLatency rule was not found: %+v") } cmd := failScriptEarlyCmd @@ -266,7 +266,7 @@ func (f *NetworkLatency) WaitForFailureToPropagate( return nil } -func (f *NetworkLatency) WaitForFailureToRestore( +func (f *NetworkLatency) WaitForFailureToRecover( ctx context.Context, l *logger.Logger, args FailureArgs, ) error { // TODO(Darryl): Monitor cluster (e.g. for replica convergence) and block until it's stable. diff --git a/pkg/roachprod/failureinjection/failures/network_partition.go b/pkg/roachprod/failureinjection/failures/network_partition.go index cdb7edc09933..c5fc0842b298 100644 --- a/pkg/roachprod/failureinjection/failures/network_partition.go +++ b/pkg/roachprod/failureinjection/failures/network_partition.go @@ -40,9 +40,9 @@ type NetworkPartitionArgs struct { // where not all packets are dropped. Partitions []NetworkPartition - // List of nodes to drop iptables rules for when restoring. If empty, all nodes + // List of nodes to drop iptables rules for when recovering. If empty, all nodes // will have their rules dropped. - NodesToRestore install.Nodes + NodesToRecover install.Nodes } type IPTablesPartitionFailure struct { @@ -138,13 +138,13 @@ func (f *IPTablesPartitionFailure) Inject( switch partition.Type { case Bidirectional: cmd = constructIPTablesRule(bidirectionalPartitionCmd, destinationNode, true /* addRule */) - l.Printf("Dropping packets between nodes %d and node %d with cmd: %s", partition.Source, destinationNode, cmd) + l.Printf("Dropping packets between nodes %d and node %d", partition.Source, destinationNode) case Incoming: cmd = constructIPTablesRule(asymmetricInputPartitionCmd, destinationNode, true /* addRule */) - l.Printf("Dropping packets from node %d to nodes %d with cmd: %s", destinationNode, partition.Source, cmd) + l.Printf("Dropping packets from node %d to nodes %d", destinationNode, partition.Source) case Outgoing: cmd = constructIPTablesRule(asymmetricOutputPartitionCmd, destinationNode, true /* addRule */) - l.Printf("Dropping packets from nodes %d to node %d with cmd: %s", partition.Source, destinationNode, cmd) + l.Printf("Dropping packets from nodes %d to node %d", partition.Source, destinationNode) default: panic("unhandled default case") } @@ -156,7 +156,7 @@ func (f *IPTablesPartitionFailure) Inject( return nil } -func (f *IPTablesPartitionFailure) Restore( +func (f *IPTablesPartitionFailure) Recover( ctx context.Context, l *logger.Logger, args FailureArgs, ) error { partitions := args.(NetworkPartitionArgs).Partitions @@ -166,13 +166,13 @@ func (f *IPTablesPartitionFailure) Restore( switch partition.Type { case Bidirectional: cmd = constructIPTablesRule(bidirectionalPartitionCmd, destinationNode, false /* addRule */) - l.Printf("Resuming packets between nodes %d and node %d with cmd: %s", partition.Source, destinationNode, cmd) + l.Printf("Resuming packets between nodes %d and node %d", partition.Source, destinationNode) case Incoming: cmd = constructIPTablesRule(asymmetricInputPartitionCmd, destinationNode, false /* addRule */) - l.Printf("Resuming packets from node %d to nodes %d with cmd: %s", destinationNode, partition.Source, cmd) + l.Printf("Resuming packets from node %d to nodes %d", destinationNode, partition.Source) case Outgoing: cmd = constructIPTablesRule(asymmetricOutputPartitionCmd, destinationNode, false /* addRule */) - l.Printf("Resuming packets from nodes %d to node %d with cmd: %s", partition.Source, destinationNode, cmd) + l.Printf("Resuming packets from nodes %d to node %d", partition.Source, destinationNode) default: panic("unhandled default case") } @@ -197,7 +197,7 @@ func (f *IPTablesPartitionFailure) WaitForFailureToPropagate( return nil } -func (f *IPTablesPartitionFailure) WaitForFailureToRestore( +func (f *IPTablesPartitionFailure) WaitForFailureToRecover( _ context.Context, _ *logger.Logger, _ FailureArgs, ) error { // TODO(Darryl): Monitor cluster (e.g. for replica convergence) and block until it's stable. diff --git a/pkg/roachprod/failureinjection/failures/registry.go b/pkg/roachprod/failureinjection/failures/registry.go index dc4b87e4953c..b4b0543e96ef 100644 --- a/pkg/roachprod/failureinjection/failures/registry.go +++ b/pkg/roachprod/failureinjection/failures/registry.go @@ -27,6 +27,8 @@ func NewFailureRegistry() *FailureRegistry { } func (r *FailureRegistry) Register() { + registerCgroupDiskStall(r) + registerDmsetupDiskStall(r) registerIPTablesPartitionFailure(r) registerNetworkLatencyFailure(r) } diff --git a/pkg/roachprod/install/install.go b/pkg/roachprod/install/install.go index 86a1e0a18bc0..f3e4f230bacb 100644 --- a/pkg/roachprod/install/install.go +++ b/pkg/roachprod/install/install.go @@ -113,6 +113,11 @@ sudo apt-get install -y bzip2; "nmap": ` sudo apt-get update; sudo apt-get install -y nmap; +`, + + "vmtouch": ` +sudo apt-get update; +sudo apt-get install -y vmtouch; `, }