diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 37ba1499e365..20322cb0a464 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -40,12 +40,14 @@ import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.common.utils.config.WorkloadConfigUtils; import org.apache.pinot.spi.config.ConfigUtils; import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.user.UserConfig; +import org.apache.pinot.spi.config.workload.WorkloadConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -76,6 +78,7 @@ private ZKMetadataProvider() { private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER"; private static final String PROPERTYSTORE_SEGMENT_LINEAGE = "/SEGMENT_LINEAGE"; private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = "/MINION_TASK_METADATA"; + private static final String PROPERTYSTORE_WORKLOAD_CONFIGS_PREFIX = "/WORKLOAD_CONFIGS"; public static void setUserConfig(ZkHelixPropertyStore propertyStore, String username, ZNRecord znRecord) { propertyStore.set(constructPropertyStorePathForUserConfig(username), znRecord, AccessOption.PERSISTENT); @@ -299,6 +302,14 @@ public static String constructPropertyStorePathForMinionTaskMetadata(String tabl return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, tableNameWithType); } + public static String getPropertyStoreWorkloadConfigsPrefix() { + return PROPERTYSTORE_WORKLOAD_CONFIGS_PREFIX; + } + + public static String constructPropertyStorePathForWorkloadConfig(String workloadName) { + return StringUtil.join("/", PROPERTYSTORE_WORKLOAD_CONFIGS_PREFIX, workloadName); + } + @Deprecated public static String constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType, String tableNameWithType) { @@ -845,4 +856,53 @@ private static Map toApplicationQpsQuotas(Map qu return result; } } + + @Nullable + public static List getAllWorkloadConfigs(ZkHelixPropertyStore propertyStore) + throws Exception { + List znRecords = + propertyStore.getChildren(getPropertyStoreWorkloadConfigsPrefix(), null, AccessOption.PERSISTENT, + CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); + if (znRecords != null) { + int numZNRecords = znRecords.size(); + List workloadConfigs = new ArrayList<>(numZNRecords); + for (ZNRecord znRecord : znRecords) { + workloadConfigs.add(WorkloadConfigUtils.fromZNRecord(znRecord)); + } + return workloadConfigs; + } + return null; + } + + @Nullable + public static WorkloadConfig getWorkloadConfig(ZkHelixPropertyStore propertyStore, + String workloadName) throws Exception { + ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForWorkloadConfig(workloadName), + null, AccessOption.PERSISTENT); + if (znRecord == null) { + return null; + } + return WorkloadConfigUtils.fromZNRecord(znRecord); + } + + public static boolean setWorkloadConfig(ZkHelixPropertyStore propertyStore, + WorkloadConfig workloadConfig) throws Exception { + + String path = constructPropertyStorePathForWorkloadConfig(workloadConfig.getWorkloadName()); + boolean isNewConfig = !propertyStore.exists(path, AccessOption.PERSISTENT); + ZNRecord znRecord = isNewConfig ? new ZNRecord(path) + : propertyStore.get(path, null, AccessOption.PERSISTENT); + // Update the record with new workload configuration + WorkloadConfigUtils.updateZNRecordWithWorkloadConfig(znRecord, workloadConfig); + // Create or update based on existence + return isNewConfig ? propertyStore.create(path, znRecord, AccessOption.PERSISTENT) + : propertyStore.set(path, znRecord, AccessOption.PERSISTENT); + } + + public static void deleteWorkloadConfig(ZkHelixPropertyStore propertyStore, String workloadName) { + String propertyStorePath = constructPropertyStorePathForWorkloadConfig(workloadName); + if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) { + propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT); + } + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/WorkloadConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/WorkloadConfigUtils.java new file mode 100644 index 000000000000..dc5989d7de06 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/WorkloadConfigUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.spi.config.workload.EnforcementProfile; +import org.apache.pinot.spi.config.workload.WorkloadConfig; + + +public class WorkloadConfigUtils { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private WorkloadConfigUtils() { + } + + /** + * Converts a ZNRecord into a WorkloadConfig object by extracting mapFields. + * + * @param znRecord The ZNRecord containing workload config data. + * @return A WorkloadConfig object. + */ + public static WorkloadConfig fromZNRecord(ZNRecord znRecord) throws Exception { + Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null"); + String workloadId = znRecord.getId(); + String enforcementProfileJson = znRecord.getSimpleField("enforcementProfile"); + Preconditions.checkArgument(enforcementProfileJson != null, "enforcementProfile field is missing"); + EnforcementProfile enforcementProfile = OBJECT_MAPPER.readValue(enforcementProfileJson, EnforcementProfile.class); + return new WorkloadConfig(workloadId, enforcementProfile); + } + + /** + * Updates a ZNRecord with the fields from a WorkloadConfig object. + * + * @param workloadConfig The WorkloadConfig object to convert. + * @param znRecord The ZNRecord to update. + */ + public static void updateZNRecordWithWorkloadConfig(ZNRecord znRecord, WorkloadConfig workloadConfig) + throws Exception { + Preconditions.checkNotNull(workloadConfig, "WorkloadConfig cannot be null"); + Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null"); + String enforcementProfileJson = OBJECT_MAPPER.writeValueAsString(workloadConfig.getEnforcementProfile()); + znRecord.setSimpleField("enforcementProfile", enforcementProfileJson); + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/WorkloadConfigUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/WorkloadConfigUtilsTest.java new file mode 100644 index 000000000000..cda665153675 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/WorkloadConfigUtilsTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; +import java.util.List; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.spi.config.workload.EnforcementProfile; +import org.apache.pinot.spi.config.workload.WorkloadConfig; +import org.apache.pinot.spi.config.workload.WorkloadCost; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + +public class WorkloadConfigUtilsTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Test(dataProvider = "fromZNRecordDataProvider") + public void testFromZNRecord(ZNRecord znRecord, WorkloadConfig expectedWorkloadConfig, boolean shouldFail) { + try { + WorkloadConfig actualWorkloadConfig = WorkloadConfigUtils.fromZNRecord(znRecord); + if (shouldFail) { + Assert.fail("Expected an exception but none was thrown"); + } + Assert.assertEquals(actualWorkloadConfig, expectedWorkloadConfig); + } catch (Exception e) { + if (!shouldFail) { + Assert.fail("Caught unexpected exception: " + e.getMessage(), e); + } + } + } + + @DataProvider(name = "fromZNRecordDataProvider") + public Object[][] fromZNRecordDataProvider() { + List data = new ArrayList<>(); + + WorkloadCost validWorkloadCost = new WorkloadCost(100.0, 200.0); + EnforcementProfile validEnforcementProfile = new EnforcementProfile(validWorkloadCost, validWorkloadCost); + WorkloadConfig validWorkloadConfig = new WorkloadConfig("workloadId", validEnforcementProfile); + + // Valid case + ZNRecord validZnRecord = new ZNRecord("workloadId"); + validZnRecord.setSimpleField("enforcementProfile", convertToJson(validEnforcementProfile)); + data.add(new Object[]{validZnRecord, validWorkloadConfig, false}); + + // Case: Missing `enforcementProfile` + ZNRecord missingEnforcementProfile = new ZNRecord("workloadId"); + data.add(new Object[]{missingEnforcementProfile, null, true}); + + // Case: Invalid JSON in `enforcementProfile` + ZNRecord invalidJsonRecord = new ZNRecord("workloadId"); + invalidJsonRecord.setSimpleField("enforcementProfile", "{invalid_json}"); + data.add(new Object[]{invalidJsonRecord, null, true}); + + // Case: Null ZNRecord + data.add(new Object[]{null, null, true}); + + // Case: Empty ZNRecord + ZNRecord emptyZnRecord = new ZNRecord(""); + data.add(new Object[]{emptyZnRecord, null, true}); + + // Case: Negative workload costs + WorkloadCost negativeWorkloadCost = new WorkloadCost(-100.0, -200.0); + EnforcementProfile negativeEnforcementProfile = new EnforcementProfile(negativeWorkloadCost, negativeWorkloadCost); + WorkloadConfig negativeWorkloadConfig = new WorkloadConfig("negativeWorkload", negativeEnforcementProfile); + + ZNRecord negativeZnRecord = new ZNRecord("negativeWorkload"); + negativeZnRecord.setSimpleField("enforcementProfile", convertToJson(negativeEnforcementProfile)); + data.add(new Object[]{negativeZnRecord, negativeWorkloadConfig, false}); + + // Case: Zero workload costs + WorkloadCost zeroWorkloadCost = new WorkloadCost(0.0, 0.0); + EnforcementProfile zeroEnforcementProfile = new EnforcementProfile(zeroWorkloadCost, zeroWorkloadCost); + WorkloadConfig zeroWorkloadConfig = new WorkloadConfig("zeroWorkload", zeroEnforcementProfile); + + ZNRecord zeroZnRecord = new ZNRecord("zeroWorkload"); + zeroZnRecord.setSimpleField("enforcementProfile", convertToJson(zeroEnforcementProfile)); + data.add(new Object[]{zeroZnRecord, zeroWorkloadConfig, false}); + + // Case: Unexpected additional fields + ZNRecord extraFieldsRecord = new ZNRecord("workloadId"); + extraFieldsRecord.setSimpleField("enforcementProfile", convertToJson(validEnforcementProfile)); + extraFieldsRecord.setSimpleField("extraField", "extraValue"); + data.add(new Object[]{extraFieldsRecord, validWorkloadConfig, false}); + + return data.toArray(new Object[0][]); + } + + private static String convertToJson(EnforcementProfile enforcementProfile) { + try { + return OBJECT_MAPPER.writeValueAsString(enforcementProfile); + } catch (Exception e) { + Assert.fail("Caught exception while converting enforcementProfile to JSON", e); + return null; + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotWorkloadConfigRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotWorkloadConfigRestletResource.java new file mode 100644 index 000000000000..08b72e1a8b50 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotWorkloadConfigRestletResource.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.core.auth.Actions; +import org.apache.pinot.core.auth.Authorize; +import org.apache.pinot.core.auth.TargetType; +import org.apache.pinot.spi.config.workload.WorkloadConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; + +@Api(tags = Constants.APPLICATION_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)}) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { + @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = + SWAGGER_AUTHORIZATION_KEY, description = + "The format of the key is ```\"Basic \" or \"Bearer " + + "\"```"), @ApiKeyAuthDefinition(name = CommonConstants.WORKLOAD, in = + ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = CommonConstants.WORKLOAD, description = + "Workload context passed through http header. If no context is provided 'default' workload " + + "context will be considered.") +})) +@Path("/") +public class PinotWorkloadConfigRestletResource { + public static final Logger LOGGER = LoggerFactory.getLogger(PinotWorkloadConfigRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + /** + * API to get all workload configs + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/workloadConfigs/{workloadName}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_WORKLOAD_CONFIG) + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get workload configs", notes = "Get workload configs for the workload name") + public String getWorkloadConfig(@PathParam("workloadName") String workloadName, @Context HttpHeaders httpHeaders) { + try { + LOGGER.info("Received request to get workload config for workload: {}", workloadName); + WorkloadConfig workloadConfig = _pinotHelixResourceManager.getWorkloadConfig(workloadName); + if (workloadConfig == null) { + throw new ControllerApplicationException(LOGGER, "Workload config not found for workload: " + workloadName, + Response.Status.NOT_FOUND, null); + } + return workloadConfig.toJsonString(); + } catch (Exception e) { + LOGGER.error("Caught exception while getting workload config for workload: {}", workloadName, e); + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/workloadConfigs") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_WORKLOAD_CONFIG) + @Authenticate(AccessType.UPDATE) + @ApiOperation(value = "Update workload config", notes = "Update workload config for the workload name") + public void updateWorkloadConfig(String requestString, @Context HttpHeaders httpHeaders) { + try { + WorkloadConfig workloadConfig = JsonUtils.stringToObject(requestString, WorkloadConfig.class); + LOGGER.info("Received request to update workload config for workload: {}", workloadConfig.getWorkloadName()); + _pinotHelixResourceManager.setWorkloadConfig(workloadConfig); + } catch (Exception e) { + LOGGER.error("Caught exception while updating workload config for request: {}", requestString, e); + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + @DELETE + @Produces(MediaType.APPLICATION_JSON) + @Path("/workloadConfigs/{workloadName}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DELETE_WORKLOAD_CONFIG) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Delete workload config", notes = "Delete workload config for the workload name") + public void deleteWorkloadConfig(@PathParam("workloadName") String workloadName, @Context HttpHeaders httpHeaders) { + try { + LOGGER.info("Received request to delete workload config for workload: {}", workloadName); + _pinotHelixResourceManager.deleteWorkloadConfig(workloadName); + } catch (Exception e) { + LOGGER.error("Caught exception while deleting workload config for workload: {}", workloadName, e); + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } +} + diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index fd36bf44eadc..f72c6298fc15 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -175,6 +175,7 @@ import org.apache.pinot.spi.config.user.ComponentType; import org.apache.pinot.spi.config.user.RoleType; import org.apache.pinot.spi.config.user.UserConfig; +import org.apache.pinot.spi.config.workload.WorkloadConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; @@ -4604,6 +4605,27 @@ public Map minimumInstancesRequiredForTags() { return tagMinInstanceMap; } + @Nullable + public List getAllWorkloadConfigs() throws Exception { + return ZKMetadataProvider.getAllWorkloadConfigs(_propertyStore); + } + + @Nullable + public WorkloadConfig getWorkloadConfig(String workload) throws Exception { + return ZKMetadataProvider.getWorkloadConfig(_propertyStore, workload); + } + + public void setWorkloadConfig(WorkloadConfig workloadConfig) throws Exception { + if (!ZKMetadataProvider.setWorkloadConfig(_propertyStore, workloadConfig)) { + throw new RuntimeException("Failed to set workload config for workload: " + workloadConfig.getWorkloadName()); + } + } + + public void deleteWorkloadConfig(String workload) { + ZKMetadataProvider.deleteWorkloadConfig(_propertyStore, workload); + } + + /* * Uncomment and use for testing on a real cluster public static void main(String[] args) throws Exception { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index 96e4f27790a7..cd6994b79c10 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -99,6 +99,9 @@ public static class Cluster { public static final String UPDATE_INSTANCE_PARTITIONS = "UpdateInstancePartitions"; public static final String GET_RESPONSE_STORE = "GetResponseStore"; public static final String DELETE_RESPONSE_STORE = "DeleteResponseStore"; + public static final String GET_WORKLOAD_CONFIG = "GetWorkloadConfig"; + public static final String UPDATE_WORKLOAD_CONFIG = "UpdateWorkloadConfig"; + public static final String DELETE_WORKLOAD_CONFIG = "DeleteWorkloadConfig"; } // Action names for table diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java new file mode 100644 index 000000000000..da908584ef5c --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class EnforcementProfile extends BaseJsonConfig { + + private static final String BROKER_COST = "brokerCost"; + private static final String SERVER_COST = "serverCost"; + + + @JsonPropertyDescription("Workload cost for the broker") + private WorkloadCost _brokerCost; + + @JsonPropertyDescription("Workload cost for the server") + private WorkloadCost _serverCost; + + @JsonCreator + public EnforcementProfile(@JsonProperty(BROKER_COST) @Nullable WorkloadCost brokerCost, + @JsonProperty(SERVER_COST) @Nullable WorkloadCost serveCost) { + _brokerCost = brokerCost; + _serverCost = serveCost; + } + + public WorkloadCost getBrokerCost() { + return _brokerCost; + } + + public WorkloadCost getServerCost() { + return _serverCost; + } + + public void setBrokerCost(WorkloadCost brokerCost) { + _brokerCost = brokerCost; + } + + public void setServerCost(WorkloadCost serverCost) { + _serverCost = serverCost; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/WorkloadConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/WorkloadConfig.java new file mode 100644 index 000000000000..b61b710037cb --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/WorkloadConfig.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.pinot.spi.config.BaseJsonConfig; +import org.apache.pinot.spi.utils.JsonUtils; + + +public class WorkloadConfig extends BaseJsonConfig { + + private static final String WORKLOAD_ID = "id"; + private static final String ENFORCEMENT_PROFILE = "enforcementProfile"; + + @JsonPropertyDescription("Unique identifier for the workload") + private String _workloadName; + + @JsonPropertyDescription("Enforcement profile for the workload") + private EnforcementProfile _enforcementProfile; + + @JsonCreator + public WorkloadConfig(@JsonProperty(value = WORKLOAD_ID, required = true) String workloadId, + @JsonProperty(value = ENFORCEMENT_PROFILE, required = true) EnforcementProfile enforcementProfile) { + _workloadName = workloadId; + _enforcementProfile = enforcementProfile; + } + + public String getWorkloadName() { + return _workloadName; + } + + public EnforcementProfile getEnforcementProfile() { + return _enforcementProfile; + } + + public void setWorkloadName(String workloadName) { + _workloadName = workloadName; + } + + public void setEnforcementProfile(EnforcementProfile enforcementProfile) { + _enforcementProfile = enforcementProfile; + } + + @Override + public String toJsonString() { + try { + return JsonUtils.objectToString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/WorkloadCost.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/WorkloadCost.java new file mode 100644 index 000000000000..b1d6e3e223a5 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/WorkloadCost.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.workload; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class WorkloadCost extends BaseJsonConfig { + + private static final String CPU_COST = "cpuCost"; + private static final String MEMORY_COST = "memoryCost"; + + @JsonPropertyDescription("Max CPU cost allowed for the workload") + private double _cpuCost; + + @JsonPropertyDescription("Max memory cost allowed for the workload") + private double _memoryCost; + + public WorkloadCost(@JsonProperty(CPU_COST) double cpuCost, @JsonProperty(MEMORY_COST) double memoryCost) { + _cpuCost = cpuCost; + _memoryCost = memoryCost; + } + + public double getCpuCost() { + return _cpuCost; + } + + public double getMemoryCost() { + return _memoryCost; + } + + public void setCpuCost(double cpuCost) { + _cpuCost = cpuCost; + } + + public void setMemoryCost(double memoryCost) { + _memoryCost = memoryCost; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 284759354c0c..614bb12e47ff 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -71,6 +71,7 @@ private CommonConstants() { public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1"; public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME = "pinot.tar.compression.codec.name"; + public static final String WORKLOAD = "workload"; /** * The state of the consumer for a given segment