Skip to content

Commit

Permalink
Add server level dynamically configurable segment download throttler (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
somandal authored Feb 13, 2025
1 parent df62894 commit 80d3caf
Show file tree
Hide file tree
Showing 30 changed files with 1,123 additions and 879 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.segment.local.utils.SegmentDownloadThrottler;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
Expand Down Expand Up @@ -126,9 +127,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
protected String _peerDownloadScheme;
protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
protected boolean _isStreamSegmentDownloadUntar;
protected SegmentPreprocessThrottler _segmentPreprocessThrottler;
// Semaphore to restrict the maximum number of parallel segment downloads for a table
// TODO: Make this configurable via ZK cluster configs to avoid server restarts to update
protected SegmentOperationsThrottler _segmentOperationsThrottler;
// Semaphore to restrict the maximum number of parallel segment downloads from deep store for a table
private Semaphore _segmentDownloadSemaphore;

// Fixed size LRU cache with TableName - SegmentName pair as key, and segment related errors as the value.
Expand All @@ -142,7 +142,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager,
SegmentLocks segmentLocks, TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) {
@Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
LOGGER.info("Initializing table data manager for table: {}", tableConfig.getTableName());

_instanceDataManagerConfig = instanceDataManagerConfig;
Expand Down Expand Up @@ -170,7 +170,7 @@ public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManag
+ "Please check for available space and write-permissions for this directory.", _resourceTmpDir);
}
_errorCache = errorCache;
_segmentPreprocessThrottler = segmentPreprocessThrottler;
_segmentOperationsThrottler = segmentOperationsThrottler;
_recentlyDeletedSegments =
CacheBuilder.newBuilder().maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize())
.expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(), TimeUnit.MINUTES).build();
Expand Down Expand Up @@ -408,7 +408,7 @@ public void downloadAndLoadSegment(SegmentZKMetadata zkMetadata, IndexLoadingCon
String segmentName = zkMetadata.getSegmentName();
_logger.info("Downloading and loading segment: {}", segmentName);
File indexDir = downloadSegment(zkMetadata);
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentPreprocessThrottler));
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler));
_logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
}
Expand Down Expand Up @@ -701,7 +701,7 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
_logger.info("Loading segment: {} from indexDir: {} to tier: {}", segmentName, indexDir,
TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema,
_segmentPreprocessThrottler);
_segmentOperationsThrottler);
addSegment(segment);

// Remove backup directory to mark the completion of segment reloading.
Expand Down Expand Up @@ -808,45 +808,61 @@ protected File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
if (_segmentDownloadSemaphore != null) {
long startTime = System.currentTimeMillis();
_logger.info("Acquiring segment download semaphore for segment: {}, queue-length: {} ", segmentName,
_logger.info("Acquiring table level segment download semaphore for segment: {}, queue-length: {} ", segmentName,
_segmentDownloadSemaphore.getQueueLength());
_segmentDownloadSemaphore.acquire();
_logger.info("Acquired segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).",
_logger.info("Acquired table level segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).",
segmentName, System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength());
}
try {
File untarredSegmentDir;
if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
_logger.info("Downloading segment: {} using streamed download-untar with maxStreamRateInByte: {}", segmentName,
_streamSegmentDownloadUntarRateLimitBytesPerSec);
AtomicInteger failedAttempts = new AtomicInteger(0);
try {
untarredSegmentDir = SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
_streamSegmentDownloadUntarRateLimitBytesPerSec, failedAttempts);
_logger.info("Downloaded and untarred segment: {} from: {}, failed attempts: {}", segmentName, downloadUrl,
failedAttempts.get());
} finally {
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
failedAttempts.get());
if (_segmentOperationsThrottler != null) {
long startTime = System.currentTimeMillis();
SegmentDownloadThrottler segmentDownloadThrottler = _segmentOperationsThrottler.getSegmentDownloadThrottler();
_logger.info("Acquiring instance level segment download semaphore for segment: {}, queue-length: {} ",
segmentName, segmentDownloadThrottler.getQueueLength());
segmentDownloadThrottler.acquire();
_logger.info("Acquired instance level segment download semaphore for segment: {} (lock-time={}ms, "
+ "queue-length={}).", segmentName, System.currentTimeMillis() - startTime,
segmentDownloadThrottler.getQueueLength());
}
try {
File untarredSegmentDir;
if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
_logger.info("Downloading segment: {} using streamed download-untar with maxStreamRateInByte: {}",
segmentName, _streamSegmentDownloadUntarRateLimitBytesPerSec);
AtomicInteger failedAttempts = new AtomicInteger(0);
try {
untarredSegmentDir = SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
_streamSegmentDownloadUntarRateLimitBytesPerSec, failedAttempts);
_logger.info("Downloaded and untarred segment: {} from: {}, failed attempts: {}", segmentName, downloadUrl,
failedAttempts.get());
} finally {
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES, failedAttempts.get());
}
} else {
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, segmentTarFile, zkMetadata.getCrypterName());
_logger.info("Downloaded tarred segment: {} from: {} to: {}, file length: {}", segmentName, downloadUrl,
segmentTarFile, segmentTarFile.length());
untarredSegmentDir = untarSegment(segmentName, segmentTarFile, tempRootDir);
}
} else {
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, segmentTarFile, zkMetadata.getCrypterName());
_logger.info("Downloaded tarred segment: {} from: {} to: {}, file length: {}", segmentName, downloadUrl,
segmentTarFile, segmentTarFile.length());
untarredSegmentDir = untarSegment(segmentName, segmentTarFile, tempRootDir);
File indexDir = moveSegment(segmentName, untarredSegmentDir);
_logger.info("Downloaded segment: {} from: {} to: {}", segmentName, downloadUrl, indexDir);
return indexDir;
} catch (Exception e) {
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1);
throw e;
} finally {
if (_segmentOperationsThrottler != null) {
_segmentOperationsThrottler.getSegmentDownloadThrottler().release();
}
FileUtils.deleteQuietly(tempRootDir);
}
File indexDir = moveSegment(segmentName, untarredSegmentDir);
_logger.info("Downloaded segment: {} from: {} to: {}", segmentName, downloadUrl, indexDir);
return indexDir;
} catch (Exception e) {
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1);
throw e;
} finally {
if (_segmentDownloadSemaphore != null) {
_segmentDownloadSemaphore.release();
}
FileUtils.deleteQuietly(tempRootDir);
}
}

Expand All @@ -858,6 +874,16 @@ protected File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata)
_logger.info("Downloading segment: {} from peers", segmentName);
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
if (_segmentOperationsThrottler != null) {
long startTime = System.currentTimeMillis();
SegmentDownloadThrottler segmentDownloadThrottler = _segmentOperationsThrottler.getSegmentDownloadThrottler();
_logger.info("Acquiring instance level segment download semaphore for peer downloading segment: {}, "
+ "queue-length: {} ", segmentName, segmentDownloadThrottler.getQueueLength());
segmentDownloadThrottler.acquire();
_logger.info("Acquired instance level segment download semaphore for peer downloading segment: {} "
+ "(lock-time={}ms, queue-length={}).", segmentName, System.currentTimeMillis() - startTime,
segmentDownloadThrottler.getQueueLength());
}
try {
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, _peerDownloadScheme, () -> {
List<URI> peerServerURIs =
Expand All @@ -875,6 +901,9 @@ protected File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata)
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES, 1);
throw e;
} finally {
if (_segmentOperationsThrottler != null) {
_segmentOperationsThrottler.getSegmentDownloadThrottler().release();
}
FileUtils.deleteQuietly(tempRootDir);
}
}
Expand Down Expand Up @@ -1045,7 +1074,7 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
segmentDirectory.copyTo(indexDir);
// Close the stale SegmentDirectory object and recreate it with reprocessed segment.
closeSegmentDirectoryQuietly(segmentDirectory);
ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig, schema, _segmentPreprocessThrottler);
ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig, schema, _segmentOperationsThrottler);
segmentDirectory = initSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand All @@ -51,7 +51,7 @@ public interface InstanceDataManager {
* <p>NOTE: The config is the subset of server config with prefix 'pinot.server.instance'
*/
void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler)
@Nullable SegmentOperationsThrottler segmentOperationsThrottler)
throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
Expand All @@ -49,17 +49,17 @@ public class DefaultTableDataManagerProvider implements TableDataManagerProvider
private HelixManager _helixManager;
private SegmentLocks _segmentLocks;
private Semaphore _segmentBuildSemaphore;
private SegmentPreprocessThrottler _segmentPreprocessThrottler;
private SegmentOperationsThrottler _segmentOperationsThrottler;

@Override
public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager,
SegmentLocks segmentLocks, @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) {
SegmentLocks segmentLocks, @Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
_instanceDataManagerConfig = instanceDataManagerConfig;
_helixManager = helixManager;
_segmentLocks = segmentLocks;
int maxParallelSegmentBuilds = instanceDataManagerConfig.getMaxParallelSegmentBuilds();
_segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new Semaphore(maxParallelSegmentBuilds, true) : null;
_segmentPreprocessThrottler = segmentPreprocessThrottler;
_segmentOperationsThrottler = segmentOperationsThrottler;
}

@Override
Expand Down Expand Up @@ -89,7 +89,7 @@ public TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable E
throw new IllegalStateException();
}
tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, segmentPreloadExecutor,
errorCache, _segmentPreprocessThrottler);
errorCache, _segmentOperationsThrottler);
return tableDataManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -40,7 +40,7 @@
public interface TableDataManagerProvider {

void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler);
@Nullable SegmentOperationsThrottler segmentOperationsThrottler);

default TableDataManager getTableDataManager(TableConfig tableConfig) {
return getTableDataManager(tableConfig, null, null, () -> true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
// Get a new index loading config with latest table config and schema to load the segment
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentPreprocessThrottler));
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler));
_logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName);
}

Expand All @@ -826,7 +826,7 @@ public void replaceConsumingSegment(String segmentName)
File indexDir = new File(_indexDir, segmentName);
// Get a new index loading config with latest table config and schema to load the segment
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentPreprocessThrottler));
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler));
_logger.info("Replaced CONSUMING segment: {}", segmentName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentDownloadThrottler;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.local.utils.SegmentStarTreePreprocessThrottler;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
Expand Down Expand Up @@ -120,11 +121,12 @@ private TableDataManager makeTestableManager()
when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
SegmentPreprocessThrottler segmentPreprocessThrottler = new SegmentPreprocessThrottler(
new SegmentAllIndexPreprocessThrottler(8, 10, true), new SegmentStarTreePreprocessThrottler(4, 8, true));
SegmentOperationsThrottler segmentOperationsThrottler = new SegmentOperationsThrottler(
new SegmentAllIndexPreprocessThrottler(8, 10, true), new SegmentStarTreePreprocessThrottler(4, 8, true),
new SegmentDownloadThrottler(10, 20, true));
TableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, null,
null, segmentPreprocessThrottler);
null, segmentOperationsThrottler);
tableDataManager.start();
Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
segsMapField.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private static ImmutableSegmentDataManager createImmutableSegmentDataManager(Tab

IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, schema);
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
BaseTableDataManagerTest.SEGMENT_PREPROCESS_THROTTLER);
BaseTableDataManagerTest.SEGMENT_OPERATIONS_THROTTLER);
when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
return segmentDataManager;
}
Expand Down
Loading

0 comments on commit 80d3caf

Please sign in to comment.