Skip to content

Commit b5431b2

Browse files
committed
HDFS-17703. Change the lock level of the invalidateMissingBlock method to read lock.
1 parent 7e67358 commit b5431b2

File tree

2 files changed

+58
-4
lines changed
  • hadoop-hdfs-project/hadoop-hdfs/src

2 files changed

+58
-4
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -2431,17 +2431,20 @@ public void invalidateMissingBlock(String bpid, Block block, boolean checkFiles)
24312431
// deleted local block file here may lead to missing-block
24322432
// when it with only 1 replication left now.
24332433
// So remove if from volume map notify namenode is ok.
2434-
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
2435-
bpid)) {
2434+
ReplicaInfo replica;
2435+
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
24362436
// Check if this block is on the volume map.
2437-
ReplicaInfo replica = volumeMap.get(bpid, block);
2437+
replica = volumeMap.get(bpid, block);
24382438
// Double-check block or meta file existence when checkFiles as true.
24392439
if (replica != null && (!checkFiles ||
24402440
(!replica.blockDataExists() || !replica.metadataExists()))) {
24412441
volumeMap.remove(bpid, block);
2442-
invalidate(bpid, replica);
24432442
}
24442443
}
2444+
// Call invalidate method outside the lock
2445+
if (replica != null) {
2446+
invalidate(bpid, replica);
2447+
}
24452448
}
24462449

24472450
public void invalidateMissingBlock(String bpid, Block block) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

+51
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,57 @@ public void run() {
685685
}
686686
}
687687

688+
@Test(timeout = 30000)
689+
public void testInvalidateMissingBlockConcurrent() throws Exception {
690+
// Feed FsDataset with block metadata.
691+
final int numBlocks = 1000;
692+
final int threadCount = 10;
693+
694+
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
695+
List<Future<?>> futureList = new ArrayList<>();
696+
697+
// Random write block and use invalidateMissingBlock() method half of them.
698+
Random random = new Random();
699+
for (int i = 0; i < threadCount; i++) {
700+
class BlockProcessor implements Runnable {
701+
@Override
702+
public void run() {
703+
try {
704+
String bpid = BLOCK_POOL_IDS[random.nextInt(BLOCK_POOL_IDS.length)];
705+
for (int blockId = 0; blockId < numBlocks; blockId++) {
706+
ExtendedBlock eb = new ExtendedBlock(bpid, blockId);
707+
ReplicaHandler replica = null;
708+
try {
709+
replica = dataset.createRbw(StorageType.DEFAULT, null, eb, false);
710+
if (blockId % 2 > 0) {
711+
dataset.invalidateMissingBlock(bpid, eb.getLocalBlock());
712+
}
713+
} finally {
714+
if (replica != null) {
715+
replica.close();
716+
}
717+
}
718+
}
719+
} catch (Exception ignore) {
720+
// ignore
721+
}
722+
}
723+
}
724+
futureList.add(pool.submit(new BlockProcessor()));
725+
}
726+
727+
// Wait for data generation
728+
for (Future<?> f : futureList) {
729+
f.get();
730+
}
731+
// Wait for the async deletion task finish.
732+
GenericTestUtils.waitFor(() -> dataset.asyncDiskService.countPendingDeletions() == 0,
733+
100, 10000);
734+
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
735+
assertEquals(numBlocks / 2, dataset.volumeMap.size(bpid));
736+
}
737+
}
738+
688739
@Test(timeout = 5000)
689740
public void testRemoveNewlyAddedVolume() throws IOException {
690741
final int numExistingVolumes = getNumVolumes();

0 commit comments

Comments
 (0)