@@ -114,6 +114,10 @@ enum Workload {
114
114
#[ clap( flatten) ]
115
115
cfg : RandReadWriteWorkload ,
116
116
} ,
117
+ /// Run IO, and as soon as we get a final ACK, drop the volume to
118
+ /// see if we can leave IOs outstanding on one of the downstairs.
119
+ /// This test works best if one of the downstairs is running with
120
+ /// lossy option set, which will make it go slower than the others.
117
121
Repair ,
118
122
/// Test the downstairs replay path.
119
123
/// Stop a downstairs, then run some IO, then start that downstairs back
@@ -1006,6 +1010,9 @@ async fn main() -> Result<()> {
1006
1010
let ( volume, mut targets) =
1007
1011
make_a_volume ( & opt, volume_logger. clone ( ) , & test_log, pr) . await ?;
1008
1012
1013
+ let downstairs_in_volume = targets. len ( ) - ( targets. len ( ) % 3 ) ;
1014
+ info ! ( test_log, "Downstairs in volume = {downstairs_in_volume}" ) ;
1015
+
1009
1016
if let Workload :: CliServer { listen, port } = opt. workload {
1010
1017
cli:: start_cli_server (
1011
1018
& volume,
@@ -1315,8 +1322,14 @@ async fn main() -> Result<()> {
1315
1322
bail ! ( "Replay workload requires a dsc endpoint" ) ;
1316
1323
}
1317
1324
} ;
1318
- replay_workload ( & volume, & mut wtq, & mut region_info, dsc_client)
1319
- . await ?;
1325
+ replay_workload (
1326
+ & volume,
1327
+ & mut wtq,
1328
+ & mut region_info,
1329
+ dsc_client,
1330
+ downstairs_in_volume as u32 ,
1331
+ )
1332
+ . await ?;
1320
1333
}
1321
1334
Workload :: Replace {
1322
1335
fast_fill,
@@ -1367,6 +1380,18 @@ async fn main() -> Result<()> {
1367
1380
// Add to the list of targets for our volume the replacement
1368
1381
// target provided on the command line
1369
1382
targets. push ( replacement) ;
1383
+
1384
+ // Verify the number of targets dsc has matches what the number
1385
+ // of targets we found.
1386
+ let res = dsc_client. dsc_get_region_count ( ) . await . unwrap ( ) ;
1387
+ let region_count = res. into_inner ( ) ;
1388
+ if region_count != targets. len ( ) as u32 {
1389
+ bail ! (
1390
+ "Downstairs targets:{} does not match dsc targets: {}" ,
1391
+ region_count,
1392
+ targets. len( ) ,
1393
+ ) ;
1394
+ }
1370
1395
replace_before_active (
1371
1396
& volume,
1372
1397
wtq,
@@ -1401,6 +1426,18 @@ async fn main() -> Result<()> {
1401
1426
// Add to the list of targets for our volume the replacement
1402
1427
// target provided on the command line
1403
1428
targets. push ( replacement) ;
1429
+
1430
+ // Verify the number of targets dsc has matches what the number
1431
+ // of targets we found.
1432
+ let res = dsc_client. dsc_get_region_count ( ) . await . unwrap ( ) ;
1433
+ let region_count = res. into_inner ( ) ;
1434
+ if region_count != targets. len ( ) as u32 {
1435
+ bail ! (
1436
+ "Downstairs targets:{} does not match dsc targets: {}" ,
1437
+ region_count,
1438
+ targets. len( ) ,
1439
+ ) ;
1440
+ }
1404
1441
replace_while_reconcile (
1405
1442
& volume,
1406
1443
wtq,
@@ -1489,7 +1526,7 @@ async fn main() -> Result<()> {
1489
1526
return Ok ( ( ) ) ;
1490
1527
} else if opt. stable
1491
1528
&& wc. up_count + wc. ds_count == 0
1492
- && wc. active_count == 3
1529
+ && wc. active_count == downstairs_in_volume
1493
1530
{
1494
1531
println ! ( "CLIENT: All jobs finished, all DS active." ) ;
1495
1532
return Ok ( ( ) ) ;
@@ -2211,13 +2248,14 @@ async fn replay_workload(
2211
2248
wtq : & mut WhenToQuit ,
2212
2249
ri : & mut RegionInfo ,
2213
2250
dsc_client : Client ,
2251
+ ds_count : u32 ,
2214
2252
) -> Result < ( ) > {
2215
2253
let mut rng = rand_chacha:: ChaCha8Rng :: from_entropy ( ) ;
2216
2254
let mut generic_wtq = WhenToQuit :: Count { count : 300 } ;
2217
2255
2218
2256
for c in 1 .. {
2219
2257
// Pick a DS at random
2220
- let stopped_ds = rng. gen_range ( 0 ..3 ) ;
2258
+ let stopped_ds = rng. gen_range ( 0 ..ds_count ) ;
2221
2259
dsc_client. dsc_stop ( stopped_ds) . await . unwrap ( ) ;
2222
2260
loop {
2223
2261
let res = dsc_client. dsc_get_ds_state ( stopped_ds) . await . unwrap ( ) ;
@@ -2290,10 +2328,12 @@ async fn replace_while_reconcile(
2290
2328
mut gen : u64 ,
2291
2329
log : Logger ,
2292
2330
) -> Result < ( ) > {
2293
- assert ! ( targets. len( ) == 4 ) ;
2331
+ assert ! ( targets. len( ) % 3 == 1 ) ;
2294
2332
2333
+ // The total number of downstairs we have that are part of the Volume.
2334
+ let ds_total = targets. len ( ) - 1 ;
2295
2335
let mut old_ds = 0 ;
2296
- let mut new_ds = 3 ;
2336
+ let mut new_ds = targets . len ( ) - 1 ;
2297
2337
let mut c = 1 ;
2298
2338
// How long we wait for reconcile to start before we replace
2299
2339
let mut active_wait = 6 ;
@@ -2431,15 +2471,15 @@ async fn replace_while_reconcile(
2431
2471
wc. ds_count,
2432
2472
wc. active_count
2433
2473
) ;
2434
- if wc. up_count + wc. ds_count == 0 && wc. active_count == 3 {
2435
- info ! ( log, "[{c}] Replay: All jobs finished, all DS active." ) ;
2474
+ if wc. up_count + wc. ds_count == 0 && wc. active_count == ds_total {
2475
+ info ! ( log, "[{c}] All jobs finished, all DS active." ) ;
2436
2476
break ;
2437
2477
}
2438
2478
tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 4 ) ) . await ;
2439
2479
}
2440
2480
2441
- old_ds = ( old_ds + 1 ) % 4 ;
2442
- new_ds = ( new_ds + 1 ) % 4 ;
2481
+ old_ds = ( old_ds + 1 ) % ( ds_total as u32 + 1 ) ;
2482
+ new_ds = ( new_ds + 1 ) % ( ds_total + 1 ) ;
2443
2483
2444
2484
c += 1 ;
2445
2485
match wtq {
@@ -2485,11 +2525,18 @@ async fn replace_before_active(
2485
2525
mut gen : u64 ,
2486
2526
log : Logger ,
2487
2527
) -> Result < ( ) > {
2488
- assert ! ( targets. len( ) == 4 ) ;
2528
+ assert ! ( targets. len( ) % 3 == 1 ) ;
2489
2529
2490
2530
info ! ( log, "Begin replacement before activation test" ) ;
2531
+ // We need to start from a known state and be sure that all three of the
2532
+ // current downstairs are consistent with each other. To guarantee this
2533
+ // we write to every block, then flush, then read. This way we know
2534
+ // that the initial downstairs are all synced up on the same flush and
2535
+ // generation numbers.
2536
+ fill_workload ( volume, ri, true ) . await ?;
2537
+ let ds_total = targets. len ( ) - 1 ;
2491
2538
let mut old_ds = 0 ;
2492
- let mut new_ds = 3 ;
2539
+ let mut new_ds = targets . len ( ) - 1 ;
2493
2540
for c in 1 .. {
2494
2541
info ! ( log, "[{c}] Touch every extent" ) ;
2495
2542
fill_sparse_workload ( volume. as_ref ( ) , ri) . await ?;
@@ -2586,15 +2633,15 @@ async fn replace_before_active(
2586
2633
wc. ds_count,
2587
2634
wc. active_count
2588
2635
) ;
2589
- if wc. up_count + wc. ds_count == 0 && wc. active_count == 3 {
2590
- info ! ( log, "[{c}] Replay: All jobs finished, all DS active." ) ;
2636
+ if wc. up_count + wc. ds_count == 0 && wc. active_count == ds_total {
2637
+ info ! ( log, "[{c}] All jobs finished, all DS active." ) ;
2591
2638
break ;
2592
2639
}
2593
2640
tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 4 ) ) . await ;
2594
2641
}
2595
2642
2596
- old_ds = ( old_ds + 1 ) % 4 ;
2597
- new_ds = ( new_ds + 1 ) % 4 ;
2643
+ old_ds = ( old_ds + 1 ) % ( ds_total as u32 + 1 ) ;
2644
+ new_ds = ( new_ds + 1 ) % ( ds_total + 1 ) ;
2598
2645
2599
2646
match wtq {
2600
2647
WhenToQuit :: Count { count } => {
@@ -2637,7 +2684,10 @@ async fn replace_workload(
2637
2684
targets : Vec < SocketAddr > ,
2638
2685
fill : bool ,
2639
2686
) -> Result < ( ) > {
2640
- assert ! ( targets. len( ) == 4 ) ;
2687
+ assert ! ( targets. len( ) % 3 == 1 ) ;
2688
+
2689
+ // The total number of downstairs we have that are part of the Volume.
2690
+ let ds_total = targets. len ( ) - 1 ;
2641
2691
2642
2692
if fill {
2643
2693
fill_sparse_workload ( volume. as_ref ( ) , ri) . await ?;
@@ -2656,7 +2706,7 @@ async fn replace_workload(
2656
2706
let volume_c = volume. clone ( ) ;
2657
2707
let handle = tokio:: spawn ( async move {
2658
2708
let mut old_ds = 0 ;
2659
- let mut new_ds = 3 ;
2709
+ let mut new_ds = ds_total ;
2660
2710
let mut c = 1 ;
2661
2711
loop {
2662
2712
println ! (
@@ -2678,7 +2728,7 @@ async fn replace_workload(
2678
2728
}
2679
2729
// Wait for the replacement to be reflected in the downstairs status.
2680
2730
let mut wc = volume_c. show_work ( ) . await ?;
2681
- while wc. active_count == 3 {
2731
+ while wc. active_count == ds_total {
2682
2732
// Wait for one of the DS to start repair
2683
2733
println ! (
2684
2734
"[{c}] Waiting for replace to start: up:{} ds:{} act:{}" ,
@@ -2689,7 +2739,7 @@ async fn replace_workload(
2689
2739
}
2690
2740
2691
2741
// We have started live repair, now wait for it to finish.
2692
- while wc. active_count != 3 {
2742
+ while wc. active_count != ds_total {
2693
2743
println ! (
2694
2744
"[{c}] Waiting for replace to finish: up:{} ds:{} act:{}" ,
2695
2745
wc. up_count, wc. ds_count, wc. active_count
@@ -2714,8 +2764,8 @@ async fn replace_workload(
2714
2764
}
2715
2765
2716
2766
// No stopping yet, let's do another loop.
2717
- old_ds = ( old_ds + 1 ) % 4 ;
2718
- new_ds = ( new_ds + 1 ) % 4 ;
2767
+ old_ds = ( old_ds + 1 ) % ( ds_total + 1 ) ;
2768
+ new_ds = ( new_ds + 1 ) % ( ds_total + 1 ) ;
2719
2769
c += 1 ;
2720
2770
}
2721
2771
println ! ( "Replace tasks ends after {c} loops" ) ;
@@ -2771,7 +2821,7 @@ async fn replace_workload(
2771
2821
"Replace test done: up:{} ds:{} act:{}" ,
2772
2822
wc. up_count, wc. ds_count, wc. active_count
2773
2823
) ;
2774
- if wc. up_count + wc. ds_count == 0 && wc. active_count == 3 {
2824
+ if wc. up_count + wc. ds_count == 0 && wc. active_count == ds_total {
2775
2825
println ! ( "Replace: All jobs finished, all DS active." ) ;
2776
2826
break ;
2777
2827
}
0 commit comments