From 0b1dae7b57641a179d11b3226775bb40cf3f5aae Mon Sep 17 00:00:00 2001 From: Taylor Date: Fri, 24 Jan 2025 22:48:30 -0800 Subject: [PATCH 1/2] Backfill more eval keys --- .../arm/checks/resource/ACRAnonymousPullDisabled.py | 5 ++++- checkov/arm/checks/resource/ACRContainerScanEnabled.py | 5 ++++- checkov/arm/checks/resource/ACREnableZoneRedundancy.py | 2 ++ .../checks/resource/AKSApiServerAuthorizedIpRanges.py | 5 ++++- checkov/arm/checks/resource/AKSDashboardDisabled.py | 5 +++++ checkov/arm/checks/resource/AKSLoggingEnabled.py | 3 +++ checkov/arm/checks/resource/AKSMaxPodsMinimum.py | 5 ++++- checkov/arm/checks/resource/AKSNetworkPolicy.py | 5 ++++- checkov/arm/checks/resource/AKSRbacEnabled.py | 3 +++ checkov/arm/checks/resource/APIManagementMinTLS12.py | 2 ++ .../arm/checks/resource/AppGWDefinesSecureProtocols.py | 6 +++++- .../arm/checks/resource/AppServiceAuthentication.py | 1 + .../arm/checks/resource/AppServiceClientCertificate.py | 5 ++++- checkov/arm/checks/resource/AppServiceHTTPSOnly.py | 3 +++ .../arm/checks/resource/AppServiceHttps20Enabled.py | 2 ++ checkov/arm/checks/resource/AppServiceIdentity.py | 5 ++++- .../arm/checks/resource/AppServiceInstanceMinimum.py | 5 ++++- .../arm/checks/resource/AppServiceUsedAzureFiles.py | 5 ++++- .../arm/checks/resource/AzureManagedDiscEncryption.py | 1 + checkov/arm/checks/resource/AzureSearchSLAIndex.py | 2 ++ ...reSynapseWorkspacesHaveNoIPFirewallRulesAttached.py | 1 + .../checks/resource/CosmosDBDisableAccessKeyWrite.py | 5 ++++- .../resource/CustomRoleDefinitionSubscriptionOwner.py | 5 ++++- ...WorkspaceDBFSRootEncryptedWithCustomerManagedKey.py | 5 ++++- .../checks/resource/DatabricksWorkspaceIsNotPublic.py | 5 ++++- .../checks/resource/FunctionAppsAccessibleOverHttps.py | 5 ++++- .../resource/FunctionAppsEnableAuthentication.py | 5 ++++- checkov/arm/checks/resource/KeyvaultRecoveryEnabled.py | 5 ++++- .../arm/checks/resource/MonitorLogProfileCategories.py | 3 +++ .../checks/resource/MonitorLogProfileRetentionDays.py | 2 ++ .../arm/checks/resource/NetworkWatcherFlowLogPeriod.py | 6 +++++- .../PostgreSQLServerConnectionThrottlingEnabled.py | 3 +++ .../resource/PostgreSQLServerLogCheckpointsEnabled.py | 3 +++ .../resource/PostgreSQLServerLogConnectionsEnabled.py | 5 ++++- .../arm/checks/resource/SQLServerAuditingEnabled.py | 4 +++- checkov/arm/checks/resource/SQLServerNoPublicAccess.py | 5 ++++- .../checks/resource/SQLServerThreatDetectionTypes.py | 1 + checkov/arm/checks/resource/SecretExpirationDate.py | 4 +++- .../checks/resource/SecurityCenterContactEmailAlert.py | 5 ++++- .../resource/SecurityCenterContactEmailAlertAdmins.py | 5 ++++- .../arm/checks/resource/SecurityCenterContactPhone.py | 5 ++++- .../checks/resource/SecurityCenterStandardPricing.py | 4 +++- .../StorageAccountAzureServicesAccessEnabled.py | 3 +++ .../resource/StorageAccountDefaultNetworkAccessDeny.py | 5 ++++- .../StorageAccountLoggingQueueServiceEnabled.py | 5 ++++- .../checks/resource/StorageAccountMinimumTlsVersion.py | 5 ++++- checkov/arm/checks/resource/StorageAccountName.py | 3 +++ .../resource/StorageAccountsTransportEncryption.py | 3 +++ .../StorageBlobServiceContainerPrivateAccess.py | 4 +++- ...SynapseWorkspaceAdministratorLoginPasswordHidden.py | 5 ++++- .../checks/resource/SynapseWorkspaceCMKEncryption.py | 3 +++ .../checks/resource/VMDisablePasswordAuthentication.py | 3 +++ .../checks/resource/aws/ALBDropHttpHeaders.py | 2 ++ .../checks/resource/aws/ALBListenerHTTPS.py | 5 ++++- .../checks/resource/aws/ALBListenerTLS12.py | 5 ++++- .../checks/resource/aws/APIGatewayAuthorization.py | 3 +++ .../aws/AbsSecurityGroupUnrestrictedIngress.py | 10 +++++++--- .../resource/aws/CloudfrontDistributionEncryption.py | 5 ++++- .../checks/resource/aws/CodeBuildProjectEncryption.py | 2 ++ .../cloudformation/checks/resource/aws/EC2PublicIP.py | 7 +++++-- .../checks/resource/aws/ECSClusterContainerInsights.py | 4 +++- .../aws/ECSTaskDefinitionEFSVolumeEncryption.py | 5 ++++- .../checks/resource/aws/ELBv2AccessLogs.py | 2 ++ ...acheReplicationGroupEncryptionAtTransitAuthToken.py | 5 ++++- .../checks/resource/aws/ElasticsearchDomainLogging.py | 2 ++ .../checks/resource/aws/GlueDataCatalogEncryption.py | 6 ++++-- .../checks/resource/aws/GlueSecurityConfiguration.py | 4 +++- .../checks/resource/aws/IAMAdminPolicyDocument.py | 6 +++++- .../resource/aws/IAMRoleAllowAssumeFromAccount.py | 4 +++- .../checks/resource/aws/IAMStarActionPolicyDocument.py | 6 +++++- .../resource/aws/LaunchConfigurationEBSEncryption.py | 8 +++++--- .../checks/resource/aws/MSKClusterLogging.py | 6 ++++-- .../cloudformation/checks/resource/aws/RedShiftSSL.py | 5 +++++ .../resource/aws/SecurityGroupRuleDescription.py | 3 +++ 74 files changed, 257 insertions(+), 53 deletions(-) diff --git a/checkov/arm/checks/resource/ACRAnonymousPullDisabled.py b/checkov/arm/checks/resource/ACRAnonymousPullDisabled.py index 812a084160c..4fa9f0e55d1 100644 --- a/checkov/arm/checks/resource/ACRAnonymousPullDisabled.py +++ b/checkov/arm/checks/resource/ACRAnonymousPullDisabled.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -34,5 +34,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/anonymousPullEnabled', 'sku'] + check = ACRAnonymousPullDisabled() diff --git a/checkov/arm/checks/resource/ACRContainerScanEnabled.py b/checkov/arm/checks/resource/ACRContainerScanEnabled.py index f588818dc4d..b573819b700 100644 --- a/checkov/arm/checks/resource/ACRContainerScanEnabled.py +++ b/checkov/arm/checks/resource/ACRContainerScanEnabled.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Dict +from typing import Any, Dict, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -24,5 +24,8 @@ def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["sku", "sku/name"] + check = ACRContainerScanEnabled() diff --git a/checkov/arm/checks/resource/ACREnableZoneRedundancy.py b/checkov/arm/checks/resource/ACREnableZoneRedundancy.py index da146a84344..02d4c3a8983 100644 --- a/checkov/arm/checks/resource/ACREnableZoneRedundancy.py +++ b/checkov/arm/checks/resource/ACREnableZoneRedundancy.py @@ -23,7 +23,9 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: # check registry. default=false properties = conf.get("properties") if properties and isinstance(properties, dict): + self.evaluated_keys = ["properties"] if properties.get("zoneRedundancy") == "Disabled": + self.evaluated_keys = ["properties/zoneRedundancy"] return CheckResult.FAILED return CheckResult.PASSED diff --git a/checkov/arm/checks/resource/AKSApiServerAuthorizedIpRanges.py b/checkov/arm/checks/resource/AKSApiServerAuthorizedIpRanges.py index 1bdcaa45fcb..012ee113749 100644 --- a/checkov/arm/checks/resource/AKSApiServerAuthorizedIpRanges.py +++ b/checkov/arm/checks/resource/AKSApiServerAuthorizedIpRanges.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -41,5 +41,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/apiServerAccessProfile', 'properties/apiServerAccessProfile/authorizedIPRanges'] + check = AKSApiServerAuthorizedIpRanges() diff --git a/checkov/arm/checks/resource/AKSDashboardDisabled.py b/checkov/arm/checks/resource/AKSDashboardDisabled.py index d684ec23b2f..e8800eb769f 100644 --- a/checkov/arm/checks/resource/AKSDashboardDisabled.py +++ b/checkov/arm/checks/resource/AKSDashboardDisabled.py @@ -19,16 +19,21 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if conf.get("apiVersion") is not None: if conf["apiVersion"] == "2017-08-31": # No addonProfiles option to configure + self.evaluated_keys = ["apiVersion"] return CheckResult.FAILED properties = conf.get("properties") + self.evaluated_keys = ["properties"] if properties is None or not isinstance(properties, dict): + self.evaluated_keys = ["properties"] return CheckResult.FAILED addon_profiles = conf["properties"].get("addonProfiles") if not isinstance(addon_profiles, dict): + self.evaluated_keys = ["properties/addonProfiles"] return CheckResult.FAILED kube_dashboard = addon_profiles.get("kubeDashboard") if not isinstance(kube_dashboard, dict): + self.evaluated_keys = ["properties/addonProfiles/kubeDashboard"] return CheckResult.FAILED enabled = kube_dashboard.get("enabled") if enabled is not None and str(enabled).lower() == "false": diff --git a/checkov/arm/checks/resource/AKSLoggingEnabled.py b/checkov/arm/checks/resource/AKSLoggingEnabled.py index 684361a5a7a..f127fb505e3 100644 --- a/checkov/arm/checks/resource/AKSLoggingEnabled.py +++ b/checkov/arm/checks/resource/AKSLoggingEnabled.py @@ -18,13 +18,16 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if "apiVersion" in conf: if conf["apiVersion"] == "2017-08-31": + self.evaluated_keys = ["apiVersion"] # No addonProfiles option to configure return CheckResult.FAILED properties = conf.get("properties") + self.evaluated_keys = ["properties"] if isinstance(properties, dict): addon_profiles = properties.get("addonProfiles") if isinstance(addon_profiles, dict): + self.evaluated_keys = ["properties/addonProfiles"] omsagent = addon_profiles.get("omsagent") if not omsagent: # it can be written in lowercase or camelCase diff --git a/checkov/arm/checks/resource/AKSMaxPodsMinimum.py b/checkov/arm/checks/resource/AKSMaxPodsMinimum.py index 087a23d64f2..b6c1d9b4167 100644 --- a/checkov/arm/checks/resource/AKSMaxPodsMinimum.py +++ b/checkov/arm/checks/resource/AKSMaxPodsMinimum.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck from typing import Optional @@ -30,5 +30,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/agentPoolProfiles", "properties/agentPoolProfiles/maxPods"] + check = AKSMaxPodsMinimum() diff --git a/checkov/arm/checks/resource/AKSNetworkPolicy.py b/checkov/arm/checks/resource/AKSNetworkPolicy.py index e9ed0dc2226..30f52eab306 100644 --- a/checkov/arm/checks/resource/AKSNetworkPolicy.py +++ b/checkov/arm/checks/resource/AKSNetworkPolicy.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -32,5 +32,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/networkProfile', 'properties/networkProfile/networkPolicy'] + check = AKSNetworkPolicy() diff --git a/checkov/arm/checks/resource/AKSRbacEnabled.py b/checkov/arm/checks/resource/AKSRbacEnabled.py index 54b874720a3..9883b2e26b0 100644 --- a/checkov/arm/checks/resource/AKSRbacEnabled.py +++ b/checkov/arm/checks/resource/AKSRbacEnabled.py @@ -19,14 +19,17 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if "apiVersion" in conf: if conf["apiVersion"] == "2017-08-31": # No enableRBAC option to configure + self.evaluated_keys = ["apiVersion"] return CheckResult.FAILED + self.evaluated_keys = ["properties"] properties = conf.get('properties') if not properties or not isinstance(properties, dict): return CheckResult.FAILED enable_RBAC = properties.get('enableRBAC') if str(enable_RBAC).lower() == "true": return CheckResult.PASSED + self.evaluated_keys.append("properties/enableRBAC") return CheckResult.FAILED diff --git a/checkov/arm/checks/resource/APIManagementMinTLS12.py b/checkov/arm/checks/resource/APIManagementMinTLS12.py index 9813b7f5c7a..0d05a23f960 100644 --- a/checkov/arm/checks/resource/APIManagementMinTLS12.py +++ b/checkov/arm/checks/resource/APIManagementMinTLS12.py @@ -17,8 +17,10 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: properties = conf.get("properties") if isinstance(properties, dict) and "customProperties" in properties: + self.evaluated_keys = ["properties"] customProperties = properties.get("customProperties") if isinstance(customProperties, dict): + self.evaluated_keys = ["properties/customProperties"] if customProperties.get("Microsoft.WindowsAzure.ApiManagement.Gateway.Security.Backend.Protocols.Ssl30"): return CheckResult.FAILED if customProperties.get("Microsoft.WindowsAzure.ApiManagement.Gateway.Security.Backend.Protocols.Tls10"): diff --git a/checkov/arm/checks/resource/AppGWDefinesSecureProtocols.py b/checkov/arm/checks/resource/AppGWDefinesSecureProtocols.py index a8003bb59d8..e7bef111e15 100644 --- a/checkov/arm/checks/resource/AppGWDefinesSecureProtocols.py +++ b/checkov/arm/checks/resource/AppGWDefinesSecureProtocols.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckCategories, CheckResult from checkov.arm.base_resource_check import BaseResourceCheck @@ -67,5 +67,9 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.FAILED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["properties/sslPolicy", "properties/sslPolicy/policyType", "properties/sslPolicy/minProtocolVersion", + "properties/sslPolicy/cipherSuites"] + check = AppGWDefinesSecureProtocols() diff --git a/checkov/arm/checks/resource/AppServiceAuthentication.py b/checkov/arm/checks/resource/AppServiceAuthentication.py index 9d74b896c90..29c6507e9ec 100644 --- a/checkov/arm/checks/resource/AppServiceAuthentication.py +++ b/checkov/arm/checks/resource/AppServiceAuthentication.py @@ -17,6 +17,7 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ["name"] if self.entity_type == "Microsoft.Web/sites/config": if "name" in conf and "authsettings" in conf["name"]: if "properties" in conf and "enabled" in conf["properties"]: diff --git a/checkov/arm/checks/resource/AppServiceClientCertificate.py b/checkov/arm/checks/resource/AppServiceClientCertificate.py index 5c07a395c63..d904aedcf29 100644 --- a/checkov/arm/checks/resource/AppServiceClientCertificate.py +++ b/checkov/arm/checks/resource/AppServiceClientCertificate.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -23,5 +23,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/clientCertEnabled"] + check = AppServiceClientCertificate() diff --git a/checkov/arm/checks/resource/AppServiceHTTPSOnly.py b/checkov/arm/checks/resource/AppServiceHTTPSOnly.py index a90240a45d6..c4925e7585a 100644 --- a/checkov/arm/checks/resource/AppServiceHTTPSOnly.py +++ b/checkov/arm/checks/resource/AppServiceHTTPSOnly.py @@ -22,5 +22,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/httpsOnly"] + check = AppServiceHTTPSOnly() diff --git a/checkov/arm/checks/resource/AppServiceHttps20Enabled.py b/checkov/arm/checks/resource/AppServiceHttps20Enabled.py index ae77eae9b8c..4051e56453e 100644 --- a/checkov/arm/checks/resource/AppServiceHttps20Enabled.py +++ b/checkov/arm/checks/resource/AppServiceHttps20Enabled.py @@ -18,8 +18,10 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ["properties"] http_20_enabled = find_in_dict(conf, "properties/siteConfig/http20Enabled") if http_20_enabled and "apiVersion" in conf: + self.evaluated_keys = ["properties/siteConfig/http20Enabled", "apiVersion"] if conf["apiVersion"] == "2018-11-01": if isinstance(http_20_enabled, str) and str(http_20_enabled).lower() == "true": return CheckResult.PASSED diff --git a/checkov/arm/checks/resource/AppServiceIdentity.py b/checkov/arm/checks/resource/AppServiceIdentity.py index 7ede61040a2..64fb35b9579 100644 --- a/checkov/arm/checks/resource/AppServiceIdentity.py +++ b/checkov/arm/checks/resource/AppServiceIdentity.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -28,5 +28,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['identity', 'identity/type', 'identity/userAssignedIdentities'] + check = AppServiceIdentity() diff --git a/checkov/arm/checks/resource/AppServiceInstanceMinimum.py b/checkov/arm/checks/resource/AppServiceInstanceMinimum.py index 5954c70fc95..f407b3f62b1 100644 --- a/checkov/arm/checks/resource/AppServiceInstanceMinimum.py +++ b/checkov/arm/checks/resource/AppServiceInstanceMinimum.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Dict +from typing import Dict, List from checkov.arm.base_resource_check import BaseResourceCheck from checkov.common.models.enums import CheckCategories, CheckResult @@ -30,5 +30,8 @@ def scan_resource_conf(self, conf: Dict[str, Dict[str, Dict[str, int]]]) -> Chec return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/siteConfig", "properties/siteConfig/numberOfWorkers"] + check = AppServiceInstanceMinimum() diff --git a/checkov/arm/checks/resource/AppServiceUsedAzureFiles.py b/checkov/arm/checks/resource/AppServiceUsedAzureFiles.py index 6fe3b21019d..7d3b3cf41cf 100644 --- a/checkov/arm/checks/resource/AppServiceUsedAzureFiles.py +++ b/checkov/arm/checks/resource/AppServiceUsedAzureFiles.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.arm.base_resource_check import BaseResourceCheck from checkov.common.models.enums import CheckCategories, CheckResult @@ -24,5 +24,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/azureStorageAccounts'] + check = AppServiceUsedAzureFiles() diff --git a/checkov/arm/checks/resource/AzureManagedDiscEncryption.py b/checkov/arm/checks/resource/AzureManagedDiscEncryption.py index b2afb44c34b..afe67901ce0 100644 --- a/checkov/arm/checks/resource/AzureManagedDiscEncryption.py +++ b/checkov/arm/checks/resource/AzureManagedDiscEncryption.py @@ -18,6 +18,7 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: properties = conf.get("properties") if properties: + self.evaluated_keys = ["properties"] encryption = properties.get("encryption") if encryption: # if the block exists, then it is enabled diff --git a/checkov/arm/checks/resource/AzureSearchSLAIndex.py b/checkov/arm/checks/resource/AzureSearchSLAIndex.py index c13eda686f4..7d36c98b246 100644 --- a/checkov/arm/checks/resource/AzureSearchSLAIndex.py +++ b/checkov/arm/checks/resource/AzureSearchSLAIndex.py @@ -23,6 +23,7 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: properties = conf.get("properties", {}) + self.evaluated_keys = ["properties"] if not isinstance(properties, dict): return CheckResult.FAILED replica_count = properties.get("replicaCount") @@ -30,6 +31,7 @@ def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: if replica_count >= 3: return CheckResult.PASSED else: + self.evaluated_keys = ["properties/replicaCount"] return CheckResult.FAILED else: return CheckResult.FAILED diff --git a/checkov/arm/checks/resource/AzureSynapseWorkspacesHaveNoIPFirewallRulesAttached.py b/checkov/arm/checks/resource/AzureSynapseWorkspacesHaveNoIPFirewallRulesAttached.py index 7c2d0bc94da..823a0473e37 100644 --- a/checkov/arm/checks/resource/AzureSynapseWorkspacesHaveNoIPFirewallRulesAttached.py +++ b/checkov/arm/checks/resource/AzureSynapseWorkspacesHaveNoIPFirewallRulesAttached.py @@ -16,6 +16,7 @@ def scan_resource_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: if depends_on is None or not len(depends_on): return CheckResult.PASSED if any('Microsoft.Synapse/workspaces/firewallRules' in item for item in depends_on): + self.evaluated_keys = ["dependsOn"] return CheckResult.FAILED return CheckResult.PASSED diff --git a/checkov/arm/checks/resource/CosmosDBDisableAccessKeyWrite.py b/checkov/arm/checks/resource/CosmosDBDisableAccessKeyWrite.py index 27419a2b28f..15534737801 100644 --- a/checkov/arm/checks/resource/CosmosDBDisableAccessKeyWrite.py +++ b/checkov/arm/checks/resource/CosmosDBDisableAccessKeyWrite.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -20,5 +20,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/disableKeyBasedMetadataWriteAccess"] + check = CosmosDBDisableAccessKeyWrite() diff --git a/checkov/arm/checks/resource/CustomRoleDefinitionSubscriptionOwner.py b/checkov/arm/checks/resource/CustomRoleDefinitionSubscriptionOwner.py index bf2134a1947..de9c5830be9 100644 --- a/checkov/arm/checks/resource/CustomRoleDefinitionSubscriptionOwner.py +++ b/checkov/arm/checks/resource/CustomRoleDefinitionSubscriptionOwner.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -35,5 +35,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.FAILED return CheckResult.PASSED + def get_evaluated_keys(self) -> List[str]: + return ["properties/assignableScopes", "properties/permissions/actions"] + check = CustomRoleDefinitionSubscriptionOwner() diff --git a/checkov/arm/checks/resource/DatabricksWorkspaceDBFSRootEncryptedWithCustomerManagedKey.py b/checkov/arm/checks/resource/DatabricksWorkspaceDBFSRootEncryptedWithCustomerManagedKey.py index ee16ea9731f..ae19d3f2a6d 100644 --- a/checkov/arm/checks/resource/DatabricksWorkspaceDBFSRootEncryptedWithCustomerManagedKey.py +++ b/checkov/arm/checks/resource/DatabricksWorkspaceDBFSRootEncryptedWithCustomerManagedKey.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -28,5 +28,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED + def get_evaluated_keys(self) -> List[str]: + return ["properties/parameters"] + check = DatabricksWorkspaceDBFSRootEncryptedWithCustomerManagedKey() diff --git a/checkov/arm/checks/resource/DatabricksWorkspaceIsNotPublic.py b/checkov/arm/checks/resource/DatabricksWorkspaceIsNotPublic.py index aaae17689c8..2f59cd17335 100644 --- a/checkov/arm/checks/resource/DatabricksWorkspaceIsNotPublic.py +++ b/checkov/arm/checks/resource/DatabricksWorkspaceIsNotPublic.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -23,5 +23,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/publicNetworkAccess"] + check = DatabricksWorkspaceIsNotPublic() diff --git a/checkov/arm/checks/resource/FunctionAppsAccessibleOverHttps.py b/checkov/arm/checks/resource/FunctionAppsAccessibleOverHttps.py index 87e4a2cd84b..98fceede6a7 100644 --- a/checkov/arm/checks/resource/FunctionAppsAccessibleOverHttps.py +++ b/checkov/arm/checks/resource/FunctionAppsAccessibleOverHttps.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -41,5 +41,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/httpsOnly", "properties/httpSettings"] + check = FunctionAppsAccessibleOverHttps() diff --git a/checkov/arm/checks/resource/FunctionAppsEnableAuthentication.py b/checkov/arm/checks/resource/FunctionAppsEnableAuthentication.py index 3c6656b3e84..7484d691f79 100644 --- a/checkov/arm/checks/resource/FunctionAppsEnableAuthentication.py +++ b/checkov/arm/checks/resource/FunctionAppsEnableAuthentication.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.arm.base_resource_check import BaseResourceCheck from checkov.common.models.enums import CheckCategories, CheckResult @@ -28,5 +28,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/platform', 'properties/platform/enabled'] + check = FunctionAppsEnableAuthentication() diff --git a/checkov/arm/checks/resource/KeyvaultRecoveryEnabled.py b/checkov/arm/checks/resource/KeyvaultRecoveryEnabled.py index 7f4334c979b..f7f23a1c9c7 100644 --- a/checkov/arm/checks/resource/KeyvaultRecoveryEnabled.py +++ b/checkov/arm/checks/resource/KeyvaultRecoveryEnabled.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -24,5 +24,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/enablePurgeProtection", "properties/enableSoftDelete"] + check = KeyVaultRecoveryEnabled() diff --git a/checkov/arm/checks/resource/MonitorLogProfileCategories.py b/checkov/arm/checks/resource/MonitorLogProfileCategories.py index d86f35b087c..5f0e320c6b1 100644 --- a/checkov/arm/checks/resource/MonitorLogProfileCategories.py +++ b/checkov/arm/checks/resource/MonitorLogProfileCategories.py @@ -23,5 +23,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/categories"] + check = MonitorLogProfileRetentionDays() diff --git a/checkov/arm/checks/resource/MonitorLogProfileRetentionDays.py b/checkov/arm/checks/resource/MonitorLogProfileRetentionDays.py index 35d574c9920..08b33946c65 100644 --- a/checkov/arm/checks/resource/MonitorLogProfileRetentionDays.py +++ b/checkov/arm/checks/resource/MonitorLogProfileRetentionDays.py @@ -18,7 +18,9 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ["properties"] if "properties" in conf and "retentionPolicy" in conf["properties"]: + self.evaluated_keys = ["properties/retentionPolicy"] retention = conf["properties"]["retentionPolicy"] if "enabled" in retention and str(retention["enabled"]).lower() == "true": if "days" in retention: diff --git a/checkov/arm/checks/resource/NetworkWatcherFlowLogPeriod.py b/checkov/arm/checks/resource/NetworkWatcherFlowLogPeriod.py index 946e459503e..c81020f2db4 100644 --- a/checkov/arm/checks/resource/NetworkWatcherFlowLogPeriod.py +++ b/checkov/arm/checks/resource/NetworkWatcherFlowLogPeriod.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -34,5 +34,9 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/enabled', 'properties/retentionPolicy', 'properties/retentionPolicy/enabled', + 'properties/retentionPolicy/days'] + check = NetworkWatcherFlowLogPeriod() diff --git a/checkov/arm/checks/resource/PostgreSQLServerConnectionThrottlingEnabled.py b/checkov/arm/checks/resource/PostgreSQLServerConnectionThrottlingEnabled.py index dfb854cc1d9..6ca072b483e 100644 --- a/checkov/arm/checks/resource/PostgreSQLServerConnectionThrottlingEnabled.py +++ b/checkov/arm/checks/resource/PostgreSQLServerConnectionThrottlingEnabled.py @@ -24,6 +24,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if "value" in conf["properties"] and \ conf["properties"]["value"].lower() == "on": return CheckResult.PASSED + self.evaluated_keys = ['properties', 'properties/value'] return CheckResult.FAILED elif conf["type"] == "configurations": if "name" in conf and conf["name"] == "connection_throttling": @@ -33,8 +34,10 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if "value" in conf["properties"] and \ conf["properties"]["value"].lower() == "on": return CheckResult.PASSED + self.evaluated_keys = ['properties', 'properties/value'] return CheckResult.FAILED else: + self.evaluated_keys = ["properties"] return CheckResult.FAILED # If name not connection_throttling - don't report (neither pass nor fail) diff --git a/checkov/arm/checks/resource/PostgreSQLServerLogCheckpointsEnabled.py b/checkov/arm/checks/resource/PostgreSQLServerLogCheckpointsEnabled.py index 39aa02f60ef..edec2b95c73 100644 --- a/checkov/arm/checks/resource/PostgreSQLServerLogCheckpointsEnabled.py +++ b/checkov/arm/checks/resource/PostgreSQLServerLogCheckpointsEnabled.py @@ -18,6 +18,7 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ["properties"] if "type" in conf: if conf["type"] == "Microsoft.DBforPostgreSQL/servers/configurations": if "name" in conf and conf["name"] == "log_checkpoints": @@ -25,6 +26,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if "value" in conf["properties"] and \ conf["properties"]["value"].lower() == "on": return CheckResult.PASSED + self.evaluated_keys.append("properties/value") return CheckResult.FAILED elif conf["type"] == "configurations": if "name" in conf and conf["name"] == "log_checkpoints": @@ -34,6 +36,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if "value" in conf["properties"] and \ conf["properties"]["value"].lower() == "on": return CheckResult.PASSED + self.evaluated_keys.append("properties/value") return CheckResult.FAILED else: return CheckResult.FAILED diff --git a/checkov/arm/checks/resource/PostgreSQLServerLogConnectionsEnabled.py b/checkov/arm/checks/resource/PostgreSQLServerLogConnectionsEnabled.py index 4151b9ad66f..e997d6624ab 100644 --- a/checkov/arm/checks/resource/PostgreSQLServerLogConnectionsEnabled.py +++ b/checkov/arm/checks/resource/PostgreSQLServerLogConnectionsEnabled.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -41,5 +41,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: # If name not connection_throttling - don't report (neither pass nor fail) return CheckResult.UNKNOWN + def get_evaluated_keys(self) -> List[str]: + return ["type", "name", "properties/value"] + check = PostgreSQLServerLogConnectionsEnabled() diff --git a/checkov/arm/checks/resource/SQLServerAuditingEnabled.py b/checkov/arm/checks/resource/SQLServerAuditingEnabled.py index 8693ce76308..6adf0c9d8dd 100644 --- a/checkov/arm/checks/resource/SQLServerAuditingEnabled.py +++ b/checkov/arm/checks/resource/SQLServerAuditingEnabled.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -38,5 +38,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["resources"] check = SQLServerAuditingEnabled() diff --git a/checkov/arm/checks/resource/SQLServerNoPublicAccess.py b/checkov/arm/checks/resource/SQLServerNoPublicAccess.py index d526a90249f..11cba990069 100644 --- a/checkov/arm/checks/resource/SQLServerNoPublicAccess.py +++ b/checkov/arm/checks/resource/SQLServerNoPublicAccess.py @@ -21,7 +21,10 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: resources = conf.get("resources") if resources and isinstance(resources, list): - for resource in resources: + self.evaluated_keys = ["resources"] + for idx, resource in enumerate(resources): + self.evaluated_keys = [f"resources/[{idx}]/type", f"resources/[{idx}]/properties/startIpAddress", + f"resources/[{idx}]/properties/endIpAddress"] resource_type = resource.get("type") if resource_type in ("Microsoft.Sql/servers/firewallRules", "firewallRules", "firewallrules"): if "properties" in resource: diff --git a/checkov/arm/checks/resource/SQLServerThreatDetectionTypes.py b/checkov/arm/checks/resource/SQLServerThreatDetectionTypes.py index 0648bc30fbb..7f74794f7fc 100644 --- a/checkov/arm/checks/resource/SQLServerThreatDetectionTypes.py +++ b/checkov/arm/checks/resource/SQLServerThreatDetectionTypes.py @@ -19,6 +19,7 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: resources = conf.get("resources") if isinstance(resources, list): + self.evaluated_keys = ["resources"] for resource in resources: if "type" in resource: if resource["type"] in ( diff --git a/checkov/arm/checks/resource/SecretExpirationDate.py b/checkov/arm/checks/resource/SecretExpirationDate.py index 1aa6e80dcdc..938fe994929 100644 --- a/checkov/arm/checks/resource/SecretExpirationDate.py +++ b/checkov/arm/checks/resource/SecretExpirationDate.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -17,7 +17,9 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if "properties" in conf: + self.evaluated_keys = ['properties'] if "attributes" in conf["properties"]: + self.evaluated_keys = ['properties/attributes'] if "exp" in conf["properties"]["attributes"]: if conf["properties"]["attributes"]["exp"]: return CheckResult.PASSED diff --git a/checkov/arm/checks/resource/SecurityCenterContactEmailAlert.py b/checkov/arm/checks/resource/SecurityCenterContactEmailAlert.py index 37eb676a8e3..ab214030adf 100644 --- a/checkov/arm/checks/resource/SecurityCenterContactEmailAlert.py +++ b/checkov/arm/checks/resource/SecurityCenterContactEmailAlert.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -22,5 +22,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/alertNotifications'] + check = SecurityCenterContactEmailAlert() diff --git a/checkov/arm/checks/resource/SecurityCenterContactEmailAlertAdmins.py b/checkov/arm/checks/resource/SecurityCenterContactEmailAlertAdmins.py index 4af442ff57c..d5b6ddf2c8b 100644 --- a/checkov/arm/checks/resource/SecurityCenterContactEmailAlertAdmins.py +++ b/checkov/arm/checks/resource/SecurityCenterContactEmailAlertAdmins.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -22,5 +22,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/alertsToAdmins'] + check = SecurityCenterContactEmailAlertAdmins() diff --git a/checkov/arm/checks/resource/SecurityCenterContactPhone.py b/checkov/arm/checks/resource/SecurityCenterContactPhone.py index 2e93bc0cf9c..504f29c0201 100644 --- a/checkov/arm/checks/resource/SecurityCenterContactPhone.py +++ b/checkov/arm/checks/resource/SecurityCenterContactPhone.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -22,5 +22,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["properties", "properties/phone"] + check = SecurityCenterContactPhone() diff --git a/checkov/arm/checks/resource/SecurityCenterStandardPricing.py b/checkov/arm/checks/resource/SecurityCenterStandardPricing.py index 080cbadaee9..69cda447ba1 100644 --- a/checkov/arm/checks/resource/SecurityCenterStandardPricing.py +++ b/checkov/arm/checks/resource/SecurityCenterStandardPricing.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -16,8 +16,10 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ["properties"] if "properties" in conf: if "pricingTier" in conf["properties"]: + self.evaluated_keys = ["properties/pricingTier"] if str(conf["properties"]["pricingTier"]).lower() == "standard": return CheckResult.PASSED return CheckResult.FAILED diff --git a/checkov/arm/checks/resource/StorageAccountAzureServicesAccessEnabled.py b/checkov/arm/checks/resource/StorageAccountAzureServicesAccessEnabled.py index 4504c93c815..b8146ed99ab 100644 --- a/checkov/arm/checks/resource/StorageAccountAzureServicesAccessEnabled.py +++ b/checkov/arm/checks/resource/StorageAccountAzureServicesAccessEnabled.py @@ -26,10 +26,13 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if year is None: return CheckResult.UNKNOWN # Should be handled by variable rendering if year < 2017: + self.evaluated_keys = ["apiVersion"] return CheckResult.FAILED + self.evaluated_keys = ["properties"] if "properties" in conf: if "networkAcls" in conf["properties"]: + self.evaluated_keys = ["properties/networkAcls"] if "defaultAction" in conf["properties"]["networkAcls"]: if not isinstance(conf["properties"]["networkAcls"], dict): return CheckResult.UNKNOWN diff --git a/checkov/arm/checks/resource/StorageAccountDefaultNetworkAccessDeny.py b/checkov/arm/checks/resource/StorageAccountDefaultNetworkAccessDeny.py index 1141f40839f..ff9b77d2bbb 100644 --- a/checkov/arm/checks/resource/StorageAccountDefaultNetworkAccessDeny.py +++ b/checkov/arm/checks/resource/StorageAccountDefaultNetworkAccessDeny.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.arm.base_resource_check import BaseResourceCheck from checkov.common.models.enums import CheckResult, CheckCategories @@ -38,5 +38,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["apiVersion", "properties", "properties/networkAcls"] + check = StorageAccountDefaultNetworkAccessDeny() diff --git a/checkov/arm/checks/resource/StorageAccountLoggingQueueServiceEnabled.py b/checkov/arm/checks/resource/StorageAccountLoggingQueueServiceEnabled.py index 3f6ac2fa82d..737ec0aea8b 100644 --- a/checkov/arm/checks/resource/StorageAccountLoggingQueueServiceEnabled.py +++ b/checkov/arm/checks/resource/StorageAccountLoggingQueueServiceEnabled.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -37,5 +37,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/logs'] + check = StorageAccountLoggingQueueServiceEnabled() diff --git a/checkov/arm/checks/resource/StorageAccountMinimumTlsVersion.py b/checkov/arm/checks/resource/StorageAccountMinimumTlsVersion.py index 45bfd1ef0b5..874379e701f 100644 --- a/checkov/arm/checks/resource/StorageAccountMinimumTlsVersion.py +++ b/checkov/arm/checks/resource/StorageAccountMinimumTlsVersion.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -28,5 +28,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['properties', 'properties/minimumTlsVersion'] + check = StorageAccountMinimumTlsVersion() diff --git a/checkov/arm/checks/resource/StorageAccountName.py b/checkov/arm/checks/resource/StorageAccountName.py index 41b5bfd6f42..e5937cd06aa 100644 --- a/checkov/arm/checks/resource/StorageAccountName.py +++ b/checkov/arm/checks/resource/StorageAccountName.py @@ -44,5 +44,8 @@ def scan_resource_conf(self, conf: dict[str, typing.Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> typing.List[str]: + return ["name"] + check = StorageAccountName() diff --git a/checkov/arm/checks/resource/StorageAccountsTransportEncryption.py b/checkov/arm/checks/resource/StorageAccountsTransportEncryption.py index a57260cb706..85593104e9c 100644 --- a/checkov/arm/checks/resource/StorageAccountsTransportEncryption.py +++ b/checkov/arm/checks/resource/StorageAccountsTransportEncryption.py @@ -18,8 +18,10 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ["properties"] properties = conf.get("properties") if isinstance(properties, dict) and "supportsHttpsTrafficOnly" in properties: + self.evaluated_keys = ["properties/supportsHttpsTrafficOnly"] https = str(properties["supportsHttpsTrafficOnly"]).lower() return CheckResult.PASSED if https == "true" else CheckResult.FAILED @@ -31,6 +33,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if year is None: return CheckResult.UNKNOWN elif year < 2019: + self.evaluated_keys = ["apiVersion"] return CheckResult.FAILED else: return CheckResult.PASSED diff --git a/checkov/arm/checks/resource/StorageBlobServiceContainerPrivateAccess.py b/checkov/arm/checks/resource/StorageBlobServiceContainerPrivateAccess.py index 3d58df2e27f..1da839cda30 100644 --- a/checkov/arm/checks/resource/StorageBlobServiceContainerPrivateAccess.py +++ b/checkov/arm/checks/resource/StorageBlobServiceContainerPrivateAccess.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -22,7 +22,9 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if "properties" in conf: + self.evaluated_keys = ["properties"] if "publicAccess" in conf["properties"]: + self.evaluated_keys = ["properties/publicAccess"] if str(conf["properties"]["publicAccess"]).lower() == "container" or \ str(conf["properties"]["publicAccess"]).lower() == "blob": return CheckResult.FAILED diff --git a/checkov/arm/checks/resource/SynapseWorkspaceAdministratorLoginPasswordHidden.py b/checkov/arm/checks/resource/SynapseWorkspaceAdministratorLoginPasswordHidden.py index bfff551fa39..e4d14af0a6f 100644 --- a/checkov/arm/checks/resource/SynapseWorkspaceAdministratorLoginPasswordHidden.py +++ b/checkov/arm/checks/resource/SynapseWorkspaceAdministratorLoginPasswordHidden.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck @@ -19,5 +19,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.FAILED return CheckResult.PASSED + def get_evaluated_keys(self) -> List[str]: + return ['properties/sqlAdministratorLoginPassword'] + check = SynapseWorkspaceAdministratorLoginPasswordHidden() diff --git a/checkov/arm/checks/resource/SynapseWorkspaceCMKEncryption.py b/checkov/arm/checks/resource/SynapseWorkspaceCMKEncryption.py index 225c6ee31c9..bb488720834 100644 --- a/checkov/arm/checks/resource/SynapseWorkspaceCMKEncryption.py +++ b/checkov/arm/checks/resource/SynapseWorkspaceCMKEncryption.py @@ -20,5 +20,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.FAILED + def get_evaluated_keys(self) -> list[str]: + return ['properties', 'properties/encryption'] + check = SynapseWorkspaceCMKEncryption() diff --git a/checkov/arm/checks/resource/VMDisablePasswordAuthentication.py b/checkov/arm/checks/resource/VMDisablePasswordAuthentication.py index e1cedfc61e7..7ee3a3f9686 100644 --- a/checkov/arm/checks/resource/VMDisablePasswordAuthentication.py +++ b/checkov/arm/checks/resource/VMDisablePasswordAuthentication.py @@ -22,15 +22,18 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: properties = conf.get("properties") if properties and isinstance(properties, dict): + self.evaluated_keys = ["properties"] if self.entity_type == "Microsoft.Compute/virtualMachines": tmp_os_profile = properties.get("osProfile") if tmp_os_profile and isinstance(tmp_os_profile, dict): + self.evaluated_keys = ["properties/osProfile"] os_profile = tmp_os_profile elif self.entity_type == "Microsoft.Compute/virtualMachineScaleSets": vm_profile = properties.get("virtualMachineProfile") if vm_profile and isinstance(vm_profile, dict): tmp_os_profile = vm_profile.get("osProfile") if tmp_os_profile and isinstance(tmp_os_profile, dict): + self.evaluated_keys = ["properties/virtualMachineProfile/osProfile"] os_profile = tmp_os_profile if os_profile is None: diff --git a/checkov/cloudformation/checks/resource/aws/ALBDropHttpHeaders.py b/checkov/cloudformation/checks/resource/aws/ALBDropHttpHeaders.py index 85813914157..cab1fc01def 100644 --- a/checkov/cloudformation/checks/resource/aws/ALBDropHttpHeaders.py +++ b/checkov/cloudformation/checks/resource/aws/ALBDropHttpHeaders.py @@ -21,9 +21,11 @@ def scan_resource_conf(self, conf): alb = False # If lb is alb then drop headers must be present and true + self.evaluated_keys = ['Properties'] if alb: lb_attributes = properties.get('LoadBalancerAttributes', {}) if isinstance(lb_attributes, list): + self.evaluated_keys = ['Properties/LoadBalancerAttributes'] for item in lb_attributes: key = item.get('Key') if key == 'routing.http.drop_invalid_header_fields.enabled': diff --git a/checkov/cloudformation/checks/resource/aws/ALBListenerHTTPS.py b/checkov/cloudformation/checks/resource/aws/ALBListenerHTTPS.py index ae24741b54a..8549c090fe7 100644 --- a/checkov/cloudformation/checks/resource/aws/ALBListenerHTTPS.py +++ b/checkov/cloudformation/checks/resource/aws/ALBListenerHTTPS.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck @@ -45,5 +45,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["Properties/Protocol", "Properties/DefaultActions"] + check = ALBListenerHTTPS() diff --git a/checkov/cloudformation/checks/resource/aws/ALBListenerTLS12.py b/checkov/cloudformation/checks/resource/aws/ALBListenerTLS12.py index d78f6655159..b00165f3050 100644 --- a/checkov/cloudformation/checks/resource/aws/ALBListenerTLS12.py +++ b/checkov/cloudformation/checks/resource/aws/ALBListenerTLS12.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck @@ -55,5 +55,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['Properties/Protocol', 'Properties/SslPolicy'] + check = ALBListenerTLS12() diff --git a/checkov/cloudformation/checks/resource/aws/APIGatewayAuthorization.py b/checkov/cloudformation/checks/resource/aws/APIGatewayAuthorization.py index f867099468c..ede1951d574 100644 --- a/checkov/cloudformation/checks/resource/aws/APIGatewayAuthorization.py +++ b/checkov/cloudformation/checks/resource/aws/APIGatewayAuthorization.py @@ -15,10 +15,13 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ["Properties"] if 'Properties' in conf.keys(): if 'HttpMethod' in conf['Properties'].keys() and 'AuthorizationType' in conf['Properties'].keys(): if conf['Properties']['HttpMethod'] != "OPTIONS" and conf['Properties']['AuthorizationType'] == "NONE": if 'ApiKeyRequired' not in conf['Properties'].keys() or conf['Properties']['ApiKeyRequired'] is False: + self.evaluated_keys = ["Properties/HttpMethod", "Properties/AuthorizationType", + "Properties/ApiKeyRequired"] return CheckResult.FAILED return CheckResult.PASSED diff --git a/checkov/cloudformation/checks/resource/aws/AbsSecurityGroupUnrestrictedIngress.py b/checkov/cloudformation/checks/resource/aws/AbsSecurityGroupUnrestrictedIngress.py index 1830165ef59..468f13f8ed5 100644 --- a/checkov/cloudformation/checks/resource/aws/AbsSecurityGroupUnrestrictedIngress.py +++ b/checkov/cloudformation/checks/resource/aws/AbsSecurityGroupUnrestrictedIngress.py @@ -1,16 +1,18 @@ +from typing import List + from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck class AbsSecurityGroupUnrestrictedIngress(BaseResourceCheck): - def __init__(self, check_id, port): + def __init__(self, check_id, port) -> None: name = "Ensure no security groups allow ingress from 0.0.0.0:0 to port %d" % port supported_resources = ['AWS::EC2::SecurityGroup', 'AWS::EC2::SecurityGroupIngress'] categories = [CheckCategories.NETWORKING] super().__init__(name=name, id=check_id, categories=categories, supported_resources=supported_resources) self.port = port - def scan_resource_conf(self, conf): + def scan_resource_conf(self, conf) -> CheckResult: """ Looks for configuration at security group ingress rules: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-security-group-ingress.html @@ -21,9 +23,11 @@ def scan_resource_conf(self, conf): if conf['Type'] == 'AWS::EC2::SecurityGroup': if 'Properties' in conf.keys(): if 'SecurityGroupIngress' in conf['Properties'].keys(): + self.evaluated_keys = ['Properties/SecurityGroupIngress'] rules = conf['Properties']['SecurityGroupIngress'] elif conf['Type'] == 'AWS::EC2::SecurityGroupIngress': if 'Properties' in conf.keys(): + self.evaluated_keys = ['Properties'] rules = [] rules.append(conf['Properties']) @@ -44,7 +48,7 @@ def scan_resource_conf(self, conf): return CheckResult.FAILED return CheckResult.PASSED - def range(self, rule): + def range(self, rule) -> bool: if int(rule['FromPort']) <= int(self.port) <= int(rule['ToPort']): return True return False diff --git a/checkov/cloudformation/checks/resource/aws/CloudfrontDistributionEncryption.py b/checkov/cloudformation/checks/resource/aws/CloudfrontDistributionEncryption.py index 6a26085cf1e..1d3c2b5409e 100644 --- a/checkov/cloudformation/checks/resource/aws/CloudfrontDistributionEncryption.py +++ b/checkov/cloudformation/checks/resource/aws/CloudfrontDistributionEncryption.py @@ -19,18 +19,21 @@ def scan_resource_conf(self, conf): :return: """ + self.evaluated_keys = ["Properties"] if 'Properties' in conf.keys(): if 'DistributionConfig' in conf['Properties'].keys(): if 'DefaultCacheBehavior' in conf['Properties']['DistributionConfig'].keys(): if 'ViewerProtocolPolicy' in conf['Properties']['DistributionConfig']['DefaultCacheBehavior'].keys(): if conf['Properties']['DistributionConfig']['DefaultCacheBehavior']['ViewerProtocolPolicy'] == 'allow-all': + self.evaluated_keys = ["Properties/DistributionConfig/DefaultCacheBehavior/ViewerProtocolPolicy"] return CheckResult.FAILED if 'CacheBehaviors' in conf['Properties']['DistributionConfig'].keys(): if not isinstance(conf['Properties']['DistributionConfig']['CacheBehaviors'], list): return CheckResult.UNKNOWN - for behavior in range(len(conf['Properties']['DistributionConfig']['CacheBehaviors'])): + for idx, behavior in enumerate(range(len(conf['Properties']['DistributionConfig']['CacheBehaviors']))): if 'ViewerProtocolPolicy' in conf['Properties']['DistributionConfig']['CacheBehaviors'][behavior].keys(): if conf['Properties']['DistributionConfig']['CacheBehaviors'][behavior]['ViewerProtocolPolicy'] == 'allow-all': + self.evaluated_keys = [f"Properties/DistributionConfig/CacheBehaviors/[{idx}]/ViewerProtocolPolicy"] return CheckResult.FAILED return CheckResult.PASSED diff --git a/checkov/cloudformation/checks/resource/aws/CodeBuildProjectEncryption.py b/checkov/cloudformation/checks/resource/aws/CodeBuildProjectEncryption.py index 44b5e3ae16c..6b897d51a87 100644 --- a/checkov/cloudformation/checks/resource/aws/CodeBuildProjectEncryption.py +++ b/checkov/cloudformation/checks/resource/aws/CodeBuildProjectEncryption.py @@ -20,8 +20,10 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: encryption_disabled = False properties = conf.get("Properties") if properties: + self.evaluated_keys = ["Properties"] artifacts = properties.get("Artifacts") if artifacts and isinstance(artifacts, dict): + self.evaluated_keys = ["Properties/Artifacts"] if "Type" in artifacts.keys(): artifact_type = artifacts["Type"] if "EncryptionDisabled" in artifacts.keys(): diff --git a/checkov/cloudformation/checks/resource/aws/EC2PublicIP.py b/checkov/cloudformation/checks/resource/aws/EC2PublicIP.py index ba3e7202ee2..afd903cdc3f 100644 --- a/checkov/cloudformation/checks/resource/aws/EC2PublicIP.py +++ b/checkov/cloudformation/checks/resource/aws/EC2PublicIP.py @@ -15,15 +15,17 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ['Properties'] properties = conf.get('Properties') if properties: # For AWS::EC2::Instance if 'NetworkInterfaces' in properties.keys(): network_interfaces = properties['NetworkInterfaces'] if isinstance(network_interfaces, list): - for network_interface in network_interfaces: + for idx, network_interface in enumerate(network_interfaces): if 'AssociatePublicIpAddress' in network_interface.keys(): if network_interface['AssociatePublicIpAddress'] is True: + self.evaluated_keys = [f'Properties/NetworkInterfaces/[{idx}]/AssociatePublicIpAddress'] return CheckResult.FAILED else: # If not made explicit then default is true if default subnet and false otherwise. @@ -35,9 +37,10 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if 'NetworkInterfaces' in properties['LaunchTemplateData'].keys(): network_interfaces = properties['LaunchTemplateData']['NetworkInterfaces'] if isinstance(network_interfaces, list): - for network_interface in network_interfaces: + for idx, network_interface in enumerate(network_interfaces): if 'AssociatePublicIpAddress' in network_interface.keys(): if network_interface['AssociatePublicIpAddress'] is True: + self.evaluated_keys = [f'Properties/LaunchTemplateData/NetworkInterfaces/[{idx}]/AssociatePublicIpAddress'] return CheckResult.FAILED else: return CheckResult.UNKNOWN diff --git a/checkov/cloudformation/checks/resource/aws/ECSClusterContainerInsights.py b/checkov/cloudformation/checks/resource/aws/ECSClusterContainerInsights.py index 4db1b85c07b..1616edc60b4 100644 --- a/checkov/cloudformation/checks/resource/aws/ECSClusterContainerInsights.py +++ b/checkov/cloudformation/checks/resource/aws/ECSClusterContainerInsights.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck @@ -21,10 +21,12 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: :param conf: AWS::ECS::Cluster configuration :return: """ + self.evaluated_keys = ["Properties"] properties = conf.get("Properties") if properties and isinstance(properties, dict): settings = properties.get("ClusterSettings") if settings and isinstance(settings, list): + self.evaluated_keys = ["Properties/ClusterSettings"] for setting in settings: if setting["Name"] == "containerInsights" and setting["Value"] == "enabled": return CheckResult.PASSED diff --git a/checkov/cloudformation/checks/resource/aws/ECSTaskDefinitionEFSVolumeEncryption.py b/checkov/cloudformation/checks/resource/aws/ECSTaskDefinitionEFSVolumeEncryption.py index 7b18c34c1b5..958f2f3d842 100644 --- a/checkov/cloudformation/checks/resource/aws/ECSTaskDefinitionEFSVolumeEncryption.py +++ b/checkov/cloudformation/checks/resource/aws/ECSTaskDefinitionEFSVolumeEncryption.py @@ -17,11 +17,14 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: properties = conf.get("Properties") if properties and isinstance(properties, dict): + self.evaluated_keys = ["Properties"] volumes = properties.get("Volumes") if volumes and isinstance(volumes, list): - for volume in volumes: + self.evaluated_keys = ["Properties/Volumes"] + for idx, volume in enumerate(volumes): efs_config = volume.get("EFSVolumeConfiguration") if efs_config and isinstance(efs_config, dict): + self.evaluated_keys = [f"Properties/Volumes/[{idx}]/EFSVolumeConfiguration"] if efs_config.get("TransitEncryption") == "ENABLED": return CheckResult.PASSED else: diff --git a/checkov/cloudformation/checks/resource/aws/ELBv2AccessLogs.py b/checkov/cloudformation/checks/resource/aws/ELBv2AccessLogs.py index dff060a22b4..09fd7ed88b2 100644 --- a/checkov/cloudformation/checks/resource/aws/ELBv2AccessLogs.py +++ b/checkov/cloudformation/checks/resource/aws/ELBv2AccessLogs.py @@ -12,7 +12,9 @@ def __init__(self): def scan_resource_conf(self, conf): if 'Properties' in conf.keys(): + self.evaluated_keys = ['Properties'] if 'LoadBalancerAttributes' in conf['Properties'].keys(): + self.evaluated_keys = ['Properties/LoadBalancerAttributes'] if isinstance(conf['Properties']['LoadBalancerAttributes'], list): for item in conf['Properties']['LoadBalancerAttributes']: if 'Key' in item.keys() and 'Value' in item.keys(): diff --git a/checkov/cloudformation/checks/resource/aws/ElasticacheReplicationGroupEncryptionAtTransitAuthToken.py b/checkov/cloudformation/checks/resource/aws/ElasticacheReplicationGroupEncryptionAtTransitAuthToken.py index 6bddcecb7a1..9c363926523 100644 --- a/checkov/cloudformation/checks/resource/aws/ElasticacheReplicationGroupEncryptionAtTransitAuthToken.py +++ b/checkov/cloudformation/checks/resource/aws/ElasticacheReplicationGroupEncryptionAtTransitAuthToken.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck @@ -29,5 +29,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.PASSED return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ["Properties", "Properties/TransitEncryptionEnabled", "Properties/AuthToken", "Properties/UserGroupIds"] + check = ElasticacheReplicationGroupEncryptionAtTransitAuthToken() diff --git a/checkov/cloudformation/checks/resource/aws/ElasticsearchDomainLogging.py b/checkov/cloudformation/checks/resource/aws/ElasticsearchDomainLogging.py index c9474540d09..7a023b44d60 100644 --- a/checkov/cloudformation/checks/resource/aws/ElasticsearchDomainLogging.py +++ b/checkov/cloudformation/checks/resource/aws/ElasticsearchDomainLogging.py @@ -17,8 +17,10 @@ def __init__(self) -> None: def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: properties = conf.get("Properties") if properties: + self.evaluated_keys = ["Properties"] options = properties.get("LogPublishingOptions") if options: + self.evaluated_keys = ["Properties/LogPublishingOptions"] for option_conf in options.values(): if isinstance(option_conf, dict) and option_conf.get("Enabled"): return CheckResult.PASSED diff --git a/checkov/cloudformation/checks/resource/aws/GlueDataCatalogEncryption.py b/checkov/cloudformation/checks/resource/aws/GlueDataCatalogEncryption.py index d83ae3bb0a5..655f5b3dec2 100644 --- a/checkov/cloudformation/checks/resource/aws/GlueDataCatalogEncryption.py +++ b/checkov/cloudformation/checks/resource/aws/GlueDataCatalogEncryption.py @@ -3,18 +3,20 @@ class GlueDataCatalogEncryption(BaseResourceCheck): - def __init__(self): + def __init__(self) -> None: name = "Ensure Glue Data Catalog Encryption is enabled" id = "CKV_AWS_94" supported_resources = ['AWS::Glue::DataCatalogEncryptionSettings'] categories = [CheckCategories.LOGGING] super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf): + def scan_resource_conf(self, conf) -> CheckResult: connection_encrypted = False encrypted_at_rest = False if 'Properties' in conf.keys(): + self.evaluated_keys = ['Properties'] if 'DataCatalogEncryptionSettings' in conf['Properties'].keys(): + self.evaluated_keys = ['Properties/DataCatalogEncryptionSettings'] dc_enc_settings = conf['Properties']['DataCatalogEncryptionSettings'] if 'ConnectionPasswordEncryption' in dc_enc_settings.keys(): con_pass_enc = dc_enc_settings['ConnectionPasswordEncryption'] diff --git a/checkov/cloudformation/checks/resource/aws/GlueSecurityConfiguration.py b/checkov/cloudformation/checks/resource/aws/GlueSecurityConfiguration.py index c8440358d04..43b4fd6eb62 100644 --- a/checkov/cloudformation/checks/resource/aws/GlueSecurityConfiguration.py +++ b/checkov/cloudformation/checks/resource/aws/GlueSecurityConfiguration.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck @@ -20,7 +20,9 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: cw_enc = False book_enc = False if 'Properties' in conf.keys(): + self.evaluated_keys = ['Properties'] if 'EncryptionConfiguration' in conf['Properties'].keys(): + self.evaluated_keys = ['Properties/EncryptionConfiguration'] enc_conf = conf['Properties']['EncryptionConfiguration'] if 'CloudWatchEncryption' in enc_conf.keys(): diff --git a/checkov/cloudformation/checks/resource/aws/IAMAdminPolicyDocument.py b/checkov/cloudformation/checks/resource/aws/IAMAdminPolicyDocument.py index a90f3b6aba9..1cbc765ca4f 100644 --- a/checkov/cloudformation/checks/resource/aws/IAMAdminPolicyDocument.py +++ b/checkov/cloudformation/checks/resource/aws/IAMAdminPolicyDocument.py @@ -17,13 +17,16 @@ def scan_resource_conf(self, conf): # catch for inline policies if isinstance(my_properties, dict) and type != 'AWS::IAM::Policy': + self.evaluated_keys = ['Properties'] if 'Policies' in my_properties.keys(): + self.evaluated_keys = ['Properties/Policies'] policies = my_properties['Policies'] if len(policies) > 0: - for policy in policies: + for idx, policy in enumerate(policies): if not isinstance(policy, dict): return CheckResult.UNKNOWN if policy.get('PolicyDocument'): + self.evaluated_keys = [f'Properties/Policies/[{idx}]/PolicyDocument'] result = check_policy(policy['PolicyDocument']) if result == CheckResult.FAILED: return result @@ -32,6 +35,7 @@ def scan_resource_conf(self, conf): return CheckResult.UNKNOWN # this is just for Policy resources if isinstance(my_properties, dict) and 'PolicyDocument' in my_properties.keys(): + self.evaluated_keys = ['Properties/PolicyDocument'] return check_policy(my_properties['PolicyDocument']) return CheckResult.UNKNOWN diff --git a/checkov/cloudformation/checks/resource/aws/IAMRoleAllowAssumeFromAccount.py b/checkov/cloudformation/checks/resource/aws/IAMRoleAllowAssumeFromAccount.py index 6bbede60825..23ecded4665 100644 --- a/checkov/cloudformation/checks/resource/aws/IAMRoleAllowAssumeFromAccount.py +++ b/checkov/cloudformation/checks/resource/aws/IAMRoleAllowAssumeFromAccount.py @@ -2,7 +2,7 @@ import json import re -from typing import Any +from typing import Any, List from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck from checkov.common.models.enums import CheckResult, CheckCategories @@ -19,6 +19,7 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + self.evaluated_keys = ['Properties'] properties = conf.get('Properties') if properties and 'AssumeRolePolicyDocument' in properties: assume_role_policy_doc = properties['AssumeRolePolicyDocument'] @@ -45,6 +46,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if isinstance(assume_role_block['Statement'][0]['Principal']['AWS'], list) \ and isinstance(assume_role_block['Statement'][0]['Principal']['AWS'][0], str): if re.match(ACCOUNT_ACCESS, assume_role_block['Statement'][0]['Principal']['AWS'][0]): + self.evaluated_keys = ['Properties/AssumeRolePolicyDocument/Statement'] return CheckResult.FAILED return CheckResult.PASSED diff --git a/checkov/cloudformation/checks/resource/aws/IAMStarActionPolicyDocument.py b/checkov/cloudformation/checks/resource/aws/IAMStarActionPolicyDocument.py index e4967229b7b..e50a6a2d86d 100644 --- a/checkov/cloudformation/checks/resource/aws/IAMStarActionPolicyDocument.py +++ b/checkov/cloudformation/checks/resource/aws/IAMStarActionPolicyDocument.py @@ -15,16 +15,19 @@ def __init__(self): def scan_resource_conf(self, conf): myproperties = conf.get("Properties") type = conf['Type'] + self.evaluated_keys = ['Properties'] # catch for inline policies if type != 'AWS::IAM::Policy': if isinstance(myproperties, dict) and 'Policies' in myproperties.keys(): policies = myproperties['Policies'] if len(policies) > 0: - for policy in policies: + for idx, policy in enumerate(policies): + self.evaluated_keys = [f"Properties/Policies"] if not isinstance(policy, dict): return CheckResult.UNKNOWN if policy.get('PolicyDocument'): + self.evaluated_keys = [f"Properties/Policies/{idx}/PolicyDocument"] result = check_policy(policy['PolicyDocument']) if result == CheckResult.FAILED: return result @@ -33,6 +36,7 @@ def scan_resource_conf(self, conf): return CheckResult.UNKNOWN # this is just for Policy resources if isinstance(myproperties, dict) and 'PolicyDocument' in myproperties.keys(): + self.evaluated_keys = [f"Properties/PolicyDocument"] return check_policy(myproperties['PolicyDocument']) return CheckResult.UNKNOWN diff --git a/checkov/cloudformation/checks/resource/aws/LaunchConfigurationEBSEncryption.py b/checkov/cloudformation/checks/resource/aws/LaunchConfigurationEBSEncryption.py index 04653f462b0..b6fc14fe0e6 100644 --- a/checkov/cloudformation/checks/resource/aws/LaunchConfigurationEBSEncryption.py +++ b/checkov/cloudformation/checks/resource/aws/LaunchConfigurationEBSEncryption.py @@ -3,20 +3,21 @@ class LaunchConfigurationEBSEncryption(BaseResourceCheck): - def __init__(self): + def __init__(self) -> None: name = "Ensure all data stored in the Launch configuration EBS is securely encrypted" id = "CKV_AWS_8" supported_resources = ['AWS::AutoScaling::LaunchConfiguration'] categories = [CheckCategories.ENCRYPTION] super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf): + def scan_resource_conf(self, conf) -> CheckResult: """ Looks for encryption configuration of device block mapping in an AWS launch configurations https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-as-launchconfig-blockdev-template.html :param conf: aws_launch_configuration configuration :return: """ + self.evaluated_keys = ['Properties'] properties = conf.get('Properties', {}) if properties is None: return CheckResult.UNKNOWN @@ -25,11 +26,12 @@ def scan_resource_conf(self, conf): return CheckResult.UNKNOWN if not isinstance(block_device_mappings, list): return CheckResult.UNKNOWN - for block_device_mapping in block_device_mappings: + for idx, block_device_mapping in enumerate(block_device_mappings): if not isinstance(block_device_mapping, dict): return CheckResult.UNKNOWN if block_device_mapping.get('Ebs'): if not block_device_mapping['Ebs'].get('Encrypted'): + self.evaluated_keys = [f'Properties/BlockDeviceMappings/[{idx}]/Ebs/Encrypted'] return CheckResult.FAILED return CheckResult.PASSED diff --git a/checkov/cloudformation/checks/resource/aws/MSKClusterLogging.py b/checkov/cloudformation/checks/resource/aws/MSKClusterLogging.py index 088115dd12c..03c0599ced0 100644 --- a/checkov/cloudformation/checks/resource/aws/MSKClusterLogging.py +++ b/checkov/cloudformation/checks/resource/aws/MSKClusterLogging.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck @@ -12,9 +12,11 @@ def __init__(self) -> None: categories = [CheckCategories.LOGGING] super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf) -> Any: + def scan_resource_conf(self, conf) -> CheckResult: if 'Properties' in conf.keys(): + self.evaluated_keys = ['Properties'] if 'LoggingInfo' in conf['Properties'].keys(): + self.evaluated_keys = ['Properties/LoggingInfo'] if 'BrokerLogs' in conf['Properties']['LoggingInfo'].keys(): logging = conf['Properties']['LoggingInfo']['BrokerLogs'] types = ["CloudWatchLogs", "Firehose", "S3"] diff --git a/checkov/cloudformation/checks/resource/aws/RedShiftSSL.py b/checkov/cloudformation/checks/resource/aws/RedShiftSSL.py index a56050dd2e0..a5a7e3827bc 100644 --- a/checkov/cloudformation/checks/resource/aws/RedShiftSSL.py +++ b/checkov/cloudformation/checks/resource/aws/RedShiftSSL.py @@ -1,3 +1,5 @@ +from typing import List + from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck from checkov.common.parsers.node import DictNode from checkov.common.models.enums import CheckCategories, CheckResult @@ -24,5 +26,8 @@ def scan_resource_conf(self, conf: DictNode) -> CheckResult: return CheckResult.FAILED + def get_evaluated_keys(self) -> List[str]: + return ['Properties', 'Properties/Parameters'] + check = RedShiftSSL() diff --git a/checkov/cloudformation/checks/resource/aws/SecurityGroupRuleDescription.py b/checkov/cloudformation/checks/resource/aws/SecurityGroupRuleDescription.py index 2efd375759a..1fd3b55f5a9 100644 --- a/checkov/cloudformation/checks/resource/aws/SecurityGroupRuleDescription.py +++ b/checkov/cloudformation/checks/resource/aws/SecurityGroupRuleDescription.py @@ -24,15 +24,18 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if conf['Type'] == 'AWS::EC2::SecurityGroup': if 'Properties' in conf: + self.evaluated_keys = ['Properties'] security_group_ingress_rules = conf.get('Properties', {}).get('SecurityGroupIngress') if security_group_ingress_rules: for rule in security_group_ingress_rules: if isinstance(rule, dict) and ('Description' not in rule or not rule['Description']): + self.evaluated_keys = ['Properties/SecurityGroupIngress'] return CheckResult.FAILED security_group_egress_rules = conf.get('Properties', {}).get('SecurityGroupEgress') if security_group_egress_rules: for rule in security_group_egress_rules: if isinstance(rule, dict) and ('Description' not in rule.keys() or not rule['Description']): + self.evaluated_keys = ['Properties/SecurityGroupEgress'] return CheckResult.FAILED return CheckResult.PASSED From 142a970cd62260d4b61cc5c2e7e9e6dd7885efa5 Mon Sep 17 00:00:00 2001 From: Taylor Date: Fri, 24 Jan 2025 23:04:04 -0800 Subject: [PATCH 2/2] fix flake8 --- checkov/arm/checks/resource/AppServiceHTTPSOnly.py | 2 +- checkov/arm/checks/resource/MonitorLogProfileCategories.py | 2 +- checkov/arm/checks/resource/SQLServerAuditingEnabled.py | 3 ++- checkov/arm/checks/resource/SecretExpirationDate.py | 2 +- checkov/arm/checks/resource/SecurityCenterStandardPricing.py | 2 +- .../resource/StorageBlobServiceContainerPrivateAccess.py | 2 +- .../checks/resource/aws/APIGatewayAuthorization.py | 4 ++-- .../resource/aws/AbsSecurityGroupUnrestrictedIngress.py | 2 -- .../checks/resource/aws/ECSClusterContainerInsights.py | 2 +- .../checks/resource/aws/GlueSecurityConfiguration.py | 2 +- .../checks/resource/aws/IAMRoleAllowAssumeFromAccount.py | 2 +- .../checks/resource/aws/IAMStarActionPolicyDocument.py | 4 ++-- .../cloudformation/checks/resource/aws/MSKClusterLogging.py | 2 -- .../checks/resource/aws/SecurityGroupRuleDescription.py | 4 ++-- 14 files changed, 16 insertions(+), 19 deletions(-) diff --git a/checkov/arm/checks/resource/AppServiceHTTPSOnly.py b/checkov/arm/checks/resource/AppServiceHTTPSOnly.py index c4925e7585a..657b7372ca8 100644 --- a/checkov/arm/checks/resource/AppServiceHTTPSOnly.py +++ b/checkov/arm/checks/resource/AppServiceHTTPSOnly.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck diff --git a/checkov/arm/checks/resource/MonitorLogProfileCategories.py b/checkov/arm/checks/resource/MonitorLogProfileCategories.py index 5f0e320c6b1..24f9d86ad5d 100644 --- a/checkov/arm/checks/resource/MonitorLogProfileCategories.py +++ b/checkov/arm/checks/resource/MonitorLogProfileCategories.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, List from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck diff --git a/checkov/arm/checks/resource/SQLServerAuditingEnabled.py b/checkov/arm/checks/resource/SQLServerAuditingEnabled.py index 6adf0c9d8dd..7c85f9cde6c 100644 --- a/checkov/arm/checks/resource/SQLServerAuditingEnabled.py +++ b/checkov/arm/checks/resource/SQLServerAuditingEnabled.py @@ -39,6 +39,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.FAILED def get_evaluated_keys(self) -> List[str]: - return ["resources"] + return ["resources"] + check = SQLServerAuditingEnabled() diff --git a/checkov/arm/checks/resource/SecretExpirationDate.py b/checkov/arm/checks/resource/SecretExpirationDate.py index 938fe994929..1864cca0614 100644 --- a/checkov/arm/checks/resource/SecretExpirationDate.py +++ b/checkov/arm/checks/resource/SecretExpirationDate.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, List +from typing import Any from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck diff --git a/checkov/arm/checks/resource/SecurityCenterStandardPricing.py b/checkov/arm/checks/resource/SecurityCenterStandardPricing.py index 69cda447ba1..b746f526948 100644 --- a/checkov/arm/checks/resource/SecurityCenterStandardPricing.py +++ b/checkov/arm/checks/resource/SecurityCenterStandardPricing.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, List +from typing import Any from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck diff --git a/checkov/arm/checks/resource/StorageBlobServiceContainerPrivateAccess.py b/checkov/arm/checks/resource/StorageBlobServiceContainerPrivateAccess.py index 1da839cda30..e6fa2f563fa 100644 --- a/checkov/arm/checks/resource/StorageBlobServiceContainerPrivateAccess.py +++ b/checkov/arm/checks/resource/StorageBlobServiceContainerPrivateAccess.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, List +from typing import Any from checkov.common.models.enums import CheckResult, CheckCategories from checkov.arm.base_resource_check import BaseResourceCheck diff --git a/checkov/cloudformation/checks/resource/aws/APIGatewayAuthorization.py b/checkov/cloudformation/checks/resource/aws/APIGatewayAuthorization.py index ede1951d574..d59356e6480 100644 --- a/checkov/cloudformation/checks/resource/aws/APIGatewayAuthorization.py +++ b/checkov/cloudformation/checks/resource/aws/APIGatewayAuthorization.py @@ -20,8 +20,8 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if 'HttpMethod' in conf['Properties'].keys() and 'AuthorizationType' in conf['Properties'].keys(): if conf['Properties']['HttpMethod'] != "OPTIONS" and conf['Properties']['AuthorizationType'] == "NONE": if 'ApiKeyRequired' not in conf['Properties'].keys() or conf['Properties']['ApiKeyRequired'] is False: - self.evaluated_keys = ["Properties/HttpMethod", "Properties/AuthorizationType", - "Properties/ApiKeyRequired"] + self.evaluated_keys = ["Properties/HttpMethod", "Properties/AuthorizationType", + "Properties/ApiKeyRequired"] return CheckResult.FAILED return CheckResult.PASSED diff --git a/checkov/cloudformation/checks/resource/aws/AbsSecurityGroupUnrestrictedIngress.py b/checkov/cloudformation/checks/resource/aws/AbsSecurityGroupUnrestrictedIngress.py index 468f13f8ed5..77144964f3e 100644 --- a/checkov/cloudformation/checks/resource/aws/AbsSecurityGroupUnrestrictedIngress.py +++ b/checkov/cloudformation/checks/resource/aws/AbsSecurityGroupUnrestrictedIngress.py @@ -1,5 +1,3 @@ -from typing import List - from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck diff --git a/checkov/cloudformation/checks/resource/aws/ECSClusterContainerInsights.py b/checkov/cloudformation/checks/resource/aws/ECSClusterContainerInsights.py index 1616edc60b4..04eb0e287e9 100644 --- a/checkov/cloudformation/checks/resource/aws/ECSClusterContainerInsights.py +++ b/checkov/cloudformation/checks/resource/aws/ECSClusterContainerInsights.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, List +from typing import Any from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck diff --git a/checkov/cloudformation/checks/resource/aws/GlueSecurityConfiguration.py b/checkov/cloudformation/checks/resource/aws/GlueSecurityConfiguration.py index 43b4fd6eb62..aa33eb29df4 100644 --- a/checkov/cloudformation/checks/resource/aws/GlueSecurityConfiguration.py +++ b/checkov/cloudformation/checks/resource/aws/GlueSecurityConfiguration.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, List +from typing import Any from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck diff --git a/checkov/cloudformation/checks/resource/aws/IAMRoleAllowAssumeFromAccount.py b/checkov/cloudformation/checks/resource/aws/IAMRoleAllowAssumeFromAccount.py index 23ecded4665..860ab81101e 100644 --- a/checkov/cloudformation/checks/resource/aws/IAMRoleAllowAssumeFromAccount.py +++ b/checkov/cloudformation/checks/resource/aws/IAMRoleAllowAssumeFromAccount.py @@ -2,7 +2,7 @@ import json import re -from typing import Any, List +from typing import Any from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck from checkov.common.models.enums import CheckResult, CheckCategories diff --git a/checkov/cloudformation/checks/resource/aws/IAMStarActionPolicyDocument.py b/checkov/cloudformation/checks/resource/aws/IAMStarActionPolicyDocument.py index e50a6a2d86d..9481e8f3509 100644 --- a/checkov/cloudformation/checks/resource/aws/IAMStarActionPolicyDocument.py +++ b/checkov/cloudformation/checks/resource/aws/IAMStarActionPolicyDocument.py @@ -23,7 +23,7 @@ def scan_resource_conf(self, conf): policies = myproperties['Policies'] if len(policies) > 0: for idx, policy in enumerate(policies): - self.evaluated_keys = [f"Properties/Policies"] + self.evaluated_keys = ["Properties/Policies"] if not isinstance(policy, dict): return CheckResult.UNKNOWN if policy.get('PolicyDocument'): @@ -36,7 +36,7 @@ def scan_resource_conf(self, conf): return CheckResult.UNKNOWN # this is just for Policy resources if isinstance(myproperties, dict) and 'PolicyDocument' in myproperties.keys(): - self.evaluated_keys = [f"Properties/PolicyDocument"] + self.evaluated_keys = ["Properties/PolicyDocument"] return check_policy(myproperties['PolicyDocument']) return CheckResult.UNKNOWN diff --git a/checkov/cloudformation/checks/resource/aws/MSKClusterLogging.py b/checkov/cloudformation/checks/resource/aws/MSKClusterLogging.py index 03c0599ced0..afcf5446117 100644 --- a/checkov/cloudformation/checks/resource/aws/MSKClusterLogging.py +++ b/checkov/cloudformation/checks/resource/aws/MSKClusterLogging.py @@ -1,5 +1,3 @@ -from typing import Any, List - from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck diff --git a/checkov/cloudformation/checks/resource/aws/SecurityGroupRuleDescription.py b/checkov/cloudformation/checks/resource/aws/SecurityGroupRuleDescription.py index 1fd3b55f5a9..9a585daf39e 100644 --- a/checkov/cloudformation/checks/resource/aws/SecurityGroupRuleDescription.py +++ b/checkov/cloudformation/checks/resource/aws/SecurityGroupRuleDescription.py @@ -29,13 +29,13 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: if security_group_ingress_rules: for rule in security_group_ingress_rules: if isinstance(rule, dict) and ('Description' not in rule or not rule['Description']): - self.evaluated_keys = ['Properties/SecurityGroupIngress'] + self.evaluated_keys = ['Properties/SecurityGroupIngress'] return CheckResult.FAILED security_group_egress_rules = conf.get('Properties', {}).get('SecurityGroupEgress') if security_group_egress_rules: for rule in security_group_egress_rules: if isinstance(rule, dict) and ('Description' not in rule.keys() or not rule['Description']): - self.evaluated_keys = ['Properties/SecurityGroupEgress'] + self.evaluated_keys = ['Properties/SecurityGroupEgress'] return CheckResult.FAILED return CheckResult.PASSED