Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Query Resource Isolation] Workload Configs #15109

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.WorkloadConfigUtils;
import org.apache.pinot.spi.config.ConfigUtils;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.config.workload.WorkloadConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
Expand Down Expand Up @@ -76,6 +78,7 @@ private ZKMetadataProvider() {
private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
private static final String PROPERTYSTORE_SEGMENT_LINEAGE = "/SEGMENT_LINEAGE";
private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = "/MINION_TASK_METADATA";
private static final String PROPERTYSTORE_WORKLOAD_CONFIGS_PREFIX = "/WORKLOAD_CONFIGS";

public static void setUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String username, ZNRecord znRecord) {
propertyStore.set(constructPropertyStorePathForUserConfig(username), znRecord, AccessOption.PERSISTENT);
Expand Down Expand Up @@ -299,6 +302,14 @@ public static String constructPropertyStorePathForMinionTaskMetadata(String tabl
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, tableNameWithType);
}

public static String getPropertyStoreWorkloadConfigsPrefix() {
return PROPERTYSTORE_WORKLOAD_CONFIGS_PREFIX;
}

public static String constructPropertyStorePathForWorkloadConfig(String workloadName) {
return StringUtil.join("/", PROPERTYSTORE_WORKLOAD_CONFIGS_PREFIX, workloadName);
}

@Deprecated
public static String constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType,
String tableNameWithType) {
Expand Down Expand Up @@ -845,4 +856,53 @@ private static Map<String, Double> toApplicationQpsQuotas(Map<String, String> qu
return result;
}
}

@Nullable
public static List<WorkloadConfig> getAllWorkloadConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore)
throws Exception {
List<ZNRecord> znRecords =
propertyStore.getChildren(getPropertyStoreWorkloadConfigsPrefix(), null, AccessOption.PERSISTENT,
CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
if (znRecords != null) {
int numZNRecords = znRecords.size();
List<WorkloadConfig> workloadConfigs = new ArrayList<>(numZNRecords);
for (ZNRecord znRecord : znRecords) {
workloadConfigs.add(WorkloadConfigUtils.fromZNRecord(znRecord));
}
return workloadConfigs;
}
return null;
}

@Nullable
public static WorkloadConfig getWorkloadConfig(ZkHelixPropertyStore<ZNRecord> propertyStore,
String workloadName) throws Exception {
ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForWorkloadConfig(workloadName),
null, AccessOption.PERSISTENT);
if (znRecord == null) {
return null;
}
return WorkloadConfigUtils.fromZNRecord(znRecord);
}

public static boolean setWorkloadConfig(ZkHelixPropertyStore<ZNRecord> propertyStore,
WorkloadConfig workloadConfig) throws Exception {

String path = constructPropertyStorePathForWorkloadConfig(workloadConfig.getWorkloadName());
boolean isNewConfig = !propertyStore.exists(path, AccessOption.PERSISTENT);
ZNRecord znRecord = isNewConfig ? new ZNRecord(path)
: propertyStore.get(path, null, AccessOption.PERSISTENT);
// Update the record with new workload configuration
WorkloadConfigUtils.updateZNRecordWithWorkloadConfig(znRecord, workloadConfig);
// Create or update based on existence
return isNewConfig ? propertyStore.create(path, znRecord, AccessOption.PERSISTENT)
: propertyStore.set(path, znRecord, AccessOption.PERSISTENT);
}

public static void deleteWorkloadConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String workloadName) {
String propertyStorePath = constructPropertyStorePathForWorkloadConfig(workloadName);
if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.spi.config.workload.EnforcementProfile;
import org.apache.pinot.spi.config.workload.WorkloadConfig;


public class WorkloadConfigUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private WorkloadConfigUtils() {
}

/**
* Converts a ZNRecord into a WorkloadConfig object by extracting mapFields.
*
* @param znRecord The ZNRecord containing workload config data.
* @return A WorkloadConfig object.
*/
public static WorkloadConfig fromZNRecord(ZNRecord znRecord) throws Exception {
Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null");
String workloadId = znRecord.getId();
String enforcementProfileJson = znRecord.getSimpleField("enforcementProfile");
Preconditions.checkArgument(enforcementProfileJson != null, "enforcementProfile field is missing");
EnforcementProfile enforcementProfile = OBJECT_MAPPER.readValue(enforcementProfileJson, EnforcementProfile.class);
return new WorkloadConfig(workloadId, enforcementProfile);
}

/**
* Updates a ZNRecord with the fields from a WorkloadConfig object.
*
* @param workloadConfig The WorkloadConfig object to convert.
* @param znRecord The ZNRecord to update.
*/
public static void updateZNRecordWithWorkloadConfig(ZNRecord znRecord, WorkloadConfig workloadConfig)
throws Exception {
Preconditions.checkNotNull(workloadConfig, "WorkloadConfig cannot be null");
Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null");
String enforcementProfileJson = OBJECT_MAPPER.writeValueAsString(workloadConfig.getEnforcementProfile());
znRecord.setSimpleField("enforcementProfile", enforcementProfileJson);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.spi.config.workload.EnforcementProfile;
import org.apache.pinot.spi.config.workload.WorkloadConfig;
import org.apache.pinot.spi.config.workload.WorkloadCost;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


public class WorkloadConfigUtilsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Test(dataProvider = "fromZNRecordDataProvider")
public void testFromZNRecord(ZNRecord znRecord, WorkloadConfig expectedWorkloadConfig, boolean shouldFail) {
try {
WorkloadConfig actualWorkloadConfig = WorkloadConfigUtils.fromZNRecord(znRecord);
if (shouldFail) {
Assert.fail("Expected an exception but none was thrown");
}
Assert.assertEquals(actualWorkloadConfig, expectedWorkloadConfig);
} catch (Exception e) {
if (!shouldFail) {
Assert.fail("Caught unexpected exception: " + e.getMessage(), e);
}
}
}

@DataProvider(name = "fromZNRecordDataProvider")
public Object[][] fromZNRecordDataProvider() {
List<Object[]> data = new ArrayList<>();

WorkloadCost validWorkloadCost = new WorkloadCost(100.0, 200.0);
EnforcementProfile validEnforcementProfile = new EnforcementProfile(validWorkloadCost, validWorkloadCost);
WorkloadConfig validWorkloadConfig = new WorkloadConfig("workloadId", validEnforcementProfile);

// Valid case
ZNRecord validZnRecord = new ZNRecord("workloadId");
validZnRecord.setSimpleField("enforcementProfile", convertToJson(validEnforcementProfile));
data.add(new Object[]{validZnRecord, validWorkloadConfig, false});

// Case: Missing `enforcementProfile`
ZNRecord missingEnforcementProfile = new ZNRecord("workloadId");
data.add(new Object[]{missingEnforcementProfile, null, true});

// Case: Invalid JSON in `enforcementProfile`
ZNRecord invalidJsonRecord = new ZNRecord("workloadId");
invalidJsonRecord.setSimpleField("enforcementProfile", "{invalid_json}");
data.add(new Object[]{invalidJsonRecord, null, true});

// Case: Null ZNRecord
data.add(new Object[]{null, null, true});

// Case: Empty ZNRecord
ZNRecord emptyZnRecord = new ZNRecord("");
data.add(new Object[]{emptyZnRecord, null, true});

// Case: Negative workload costs
WorkloadCost negativeWorkloadCost = new WorkloadCost(-100.0, -200.0);
EnforcementProfile negativeEnforcementProfile = new EnforcementProfile(negativeWorkloadCost, negativeWorkloadCost);
WorkloadConfig negativeWorkloadConfig = new WorkloadConfig("negativeWorkload", negativeEnforcementProfile);

ZNRecord negativeZnRecord = new ZNRecord("negativeWorkload");
negativeZnRecord.setSimpleField("enforcementProfile", convertToJson(negativeEnforcementProfile));
data.add(new Object[]{negativeZnRecord, negativeWorkloadConfig, false});

// Case: Zero workload costs
WorkloadCost zeroWorkloadCost = new WorkloadCost(0.0, 0.0);
EnforcementProfile zeroEnforcementProfile = new EnforcementProfile(zeroWorkloadCost, zeroWorkloadCost);
WorkloadConfig zeroWorkloadConfig = new WorkloadConfig("zeroWorkload", zeroEnforcementProfile);

ZNRecord zeroZnRecord = new ZNRecord("zeroWorkload");
zeroZnRecord.setSimpleField("enforcementProfile", convertToJson(zeroEnforcementProfile));
data.add(new Object[]{zeroZnRecord, zeroWorkloadConfig, false});

// Case: Unexpected additional fields
ZNRecord extraFieldsRecord = new ZNRecord("workloadId");
extraFieldsRecord.setSimpleField("enforcementProfile", convertToJson(validEnforcementProfile));
extraFieldsRecord.setSimpleField("extraField", "extraValue");
data.add(new Object[]{extraFieldsRecord, validWorkloadConfig, false});

return data.toArray(new Object[0][]);
}

private static String convertToJson(EnforcementProfile enforcementProfile) {
try {
return OBJECT_MAPPER.writeValueAsString(enforcementProfile);
} catch (Exception e) {
Assert.fail("Caught exception while converting enforcementProfile to JSON", e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.config.workload.WorkloadConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;

@Api(tags = Constants.APPLICATION_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)})
@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = {
@ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
SWAGGER_AUTHORIZATION_KEY, description =
"The format of the key is ```\"Basic <token>\" or \"Bearer "
+ "<token>\"```"), @ApiKeyAuthDefinition(name = CommonConstants.WORKLOAD, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = CommonConstants.WORKLOAD, description =
"Workload context passed through http header. If no context is provided 'default' workload "
+ "context will be considered.")
}))
@Path("/")
public class PinotWorkloadConfigRestletResource {
public static final Logger LOGGER = LoggerFactory.getLogger(PinotWorkloadConfigRestletResource.class);

@Inject
PinotHelixResourceManager _pinotHelixResourceManager;

/**
* API to get all workload configs
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/workloadConfigs/{workloadName}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_WORKLOAD_CONFIG)
@Authenticate(AccessType.READ)
@ApiOperation(value = "Get workload configs", notes = "Get workload configs for the workload name")
public String getWorkloadConfig(@PathParam("workloadName") String workloadName, @Context HttpHeaders httpHeaders) {
try {
LOGGER.info("Received request to get workload config for workload: {}", workloadName);
WorkloadConfig workloadConfig = _pinotHelixResourceManager.getWorkloadConfig(workloadName);
if (workloadConfig == null) {
throw new ControllerApplicationException(LOGGER, "Workload config not found for workload: " + workloadName,
Response.Status.NOT_FOUND, null);
}
return workloadConfig.toJsonString();
} catch (Exception e) {
LOGGER.error("Caught exception while getting workload config for workload: {}", workloadName, e);
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

@POST
@Produces(MediaType.APPLICATION_JSON)
@Path("/workloadConfigs")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_WORKLOAD_CONFIG)
@Authenticate(AccessType.UPDATE)
@ApiOperation(value = "Update workload config", notes = "Update workload config for the workload name")
public void updateWorkloadConfig(String requestString, @Context HttpHeaders httpHeaders) {
try {
WorkloadConfig workloadConfig = JsonUtils.stringToObject(requestString, WorkloadConfig.class);
LOGGER.info("Received request to update workload config for workload: {}", workloadConfig.getWorkloadName());
_pinotHelixResourceManager.setWorkloadConfig(workloadConfig);
} catch (Exception e) {
LOGGER.error("Caught exception while updating workload config for request: {}", requestString, e);
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Path("/workloadConfigs/{workloadName}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DELETE_WORKLOAD_CONFIG)
@Authenticate(AccessType.DELETE)
@ApiOperation(value = "Delete workload config", notes = "Delete workload config for the workload name")
public void deleteWorkloadConfig(@PathParam("workloadName") String workloadName, @Context HttpHeaders httpHeaders) {
try {
LOGGER.info("Received request to delete workload config for workload: {}", workloadName);
_pinotHelixResourceManager.deleteWorkloadConfig(workloadName);
} catch (Exception e) {
LOGGER.error("Caught exception while deleting workload config for workload: {}", workloadName, e);
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
}

Loading
Loading