|
33 | 33 | import java.io.File;
|
34 | 34 | import java.io.FileInputStream;
|
35 | 35 | import java.io.IOException;
|
| 36 | +import java.lang.reflect.Field; |
36 | 37 | import java.nio.ByteBuffer;
|
37 | 38 | import java.util.List;
|
| 39 | +import java.util.Set; |
38 | 40 | import org.apache.bookkeeper.bookie.Bookie;
|
39 | 41 | import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
|
40 | 42 | import org.apache.bookkeeper.bookie.BookieException;
|
|
52 | 54 | import org.apache.bookkeeper.conf.TestBKConfiguration;
|
53 | 55 | import org.apache.bookkeeper.proto.BookieProtocol;
|
54 | 56 | import org.junit.After;
|
| 57 | +import org.junit.Assert; |
55 | 58 | import org.junit.Before;
|
56 | 59 | import org.junit.Test;
|
57 | 60 | import org.slf4j.Logger;
|
@@ -822,4 +825,60 @@ public void testSingleLedgerDirectoryCheckpoint() throws Exception {
|
822 | 825 | assertEquals(7, logMark.getLogFileId());
|
823 | 826 | assertEquals(8, logMark.getLogFileOffset());
|
824 | 827 | }
|
| 828 | + |
| 829 | + @Test |
| 830 | + public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedgers() |
| 831 | + throws Exception { |
| 832 | + int gcWaitTime = 1000; |
| 833 | + File ledgerDir = new File(tmpDir, "dir"); |
| 834 | + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| 835 | + conf.setGcWaitTime(gcWaitTime); |
| 836 | + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4); |
| 837 | + conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4); |
| 838 | + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); |
| 839 | + conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() }); |
| 840 | + |
| 841 | + BookieImpl bookie = new TestBookieImpl(conf); |
| 842 | + ByteBuf entry1 = Unpooled.buffer(1024); |
| 843 | + entry1.writeLong(1); // ledger id |
| 844 | + entry1.writeLong(2); // entry id |
| 845 | + entry1.writeBytes("entry-1".getBytes()); |
| 846 | + bookie.getLedgerStorage().addEntry(entry1); |
| 847 | + |
| 848 | + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2); |
| 849 | + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush(); |
| 850 | + |
| 851 | + File ledgerDirMark = new File(ledgerDir + "/current", "lastMark"); |
| 852 | + try { |
| 853 | + LogMark logMark = readLogMark(ledgerDirMark); |
| 854 | + assertEquals(1, logMark.getLogFileId()); |
| 855 | + assertEquals(2, logMark.getLogFileOffset()); |
| 856 | + } catch (Exception e) { |
| 857 | + fail(); |
| 858 | + } |
| 859 | + |
| 860 | + ByteBuf entry2 = Unpooled.buffer(1024); |
| 861 | + entry2.writeLong(2); // ledger id |
| 862 | + entry2.writeLong(1); // entry id |
| 863 | + entry2.writeBytes("entry-2".getBytes()); |
| 864 | + |
| 865 | + bookie.getLedgerStorage().addEntry(entry2); |
| 866 | + // write one entry to first ledger directory and flush with logMark(1, 2), |
| 867 | + // only the first ledger directory should have lastMark |
| 868 | + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5); |
| 869 | + |
| 870 | + SingleDirectoryDbLedgerStorage storage1 = |
| 871 | + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0); |
| 872 | + Field field = SingleDirectoryDbLedgerStorage.class.getDeclaredField("ledgerIndex"); |
| 873 | + field.setAccessible(true); |
| 874 | + LedgerMetadataIndex ledgerMetadataIndex = (LedgerMetadataIndex) field.get(storage1); |
| 875 | + Field field1 = LedgerMetadataIndex.class.getDeclaredField("pendingDeletedLedgers"); |
| 876 | + field1.setAccessible(true); |
| 877 | + Set<Long> pendingDeletedLedgers = (Set<Long>) field1.get(ledgerMetadataIndex); |
| 878 | + |
| 879 | + Assert.assertEquals(pendingDeletedLedgers.size(), 0); |
| 880 | + pendingDeletedLedgers.add(2L); |
| 881 | + bookie.getLedgerStorage().flush(); |
| 882 | + Assert.assertEquals(pendingDeletedLedgers.size(), 0); |
| 883 | + } |
825 | 884 | }
|
0 commit comments