Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(general): fix proxy access from git and registry loader #6992

Merged
merged 18 commits into from
Feb 6, 2025
33 changes: 21 additions & 12 deletions checkov/common/goget/github/get_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import logging
import re
import shutil
import os

from checkov.common.goget.base_getter import BaseGetter
from checkov.common.resource_code_logger_filter import add_resource_code_filter_to_logger
from checkov.common.util.contextmanagers import temp_environ
from checkov.common.util.env_vars_config import env_vars_config

try:
from git import Repo
Expand Down Expand Up @@ -64,9 +66,7 @@
if git_import_error is not None:
raise ImportError("Unable to load git module (is the git executable available?)") \
from git_import_error

git_url, internal_dir = self._source_subdir()

clone_dir = self.temp_dir + "/clone/" if self.create_clone_and_res_dirs else self.temp_dir
self._clone(git_url, clone_dir)

Expand All @@ -81,17 +81,26 @@
return clone_dir

def _clone(self, git_url: str, clone_dir: str) -> None:
self.logger.debug(f"cloning {self.url if '@' not in self.url else self.url.split('@')[1]} to {clone_dir}")
logging.info(f"cloning {self.url if '@' not in self.url else self.url.split('@')[1]} to {clone_dir}")
Fixed Show fixed Hide fixed
with temp_environ(GIT_TERMINAL_PROMPT="0"): # disables user prompts originating from GIT
if self.branch:
Repo.clone_from(git_url, clone_dir, branch=self.branch, depth=1) # depth=1 for shallow clone
elif self.commit_id: # no commit id support for branch
repo = Repo.clone_from(git_url, clone_dir, no_checkout=True) # need to be a full git clone
repo.git.checkout(self.commit_id)
elif self.tag:
Repo.clone_from(git_url, clone_dir, depth=1, b=self.tag)
else:
Repo.clone_from(git_url, clone_dir, depth=1)
if os.getenv('PROXY_URL'):
logging.info(f'Performing clone through proxy - {os.getenv("PROXY_URL")}')
with temp_environ(GIT_SSL_CAINFO=os.getenv('PROXY_CA_PATH'),
https_proxy=os.getenv('PROXY_URL'),
GIT_CONFIG_PARAMETERS=f"'http.extraHeader={os.getenv('PROXY_HEADER_KEY')}:{os.getenv('PROXY_HEADER_VALUE')}'"):
self._clone_helper(clone_dir, git_url)
self._clone_helper(clone_dir, git_url)

def _clone_helper(self, clone_dir, git_url):
if self.branch:
Repo.clone_from(git_url, clone_dir, branch=self.branch, depth=1) # depth=1 for shallow clone
elif self.commit_id: # no commit id support for branch
repo = Repo.clone_from(git_url, clone_dir, no_checkout=True) # need to be a full git clone
repo.git.checkout(self.commit_id)
elif self.tag:
Repo.clone_from(git_url, clone_dir, depth=1, b=self.tag)
else:
Repo.clone_from(git_url, clone_dir, depth=1)

# Split source url into Git url and subdirectory path e.g. test.com/repo//repo/subpath becomes 'test.com/repo', '/repo/subpath')
# Also see reference implementation @ go-getter https://github.com/hashicorp/go-getter/blob/main/source.go
Expand Down
15 changes: 7 additions & 8 deletions checkov/common/proxy/proxy_client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
from __future__ import annotations

import logging
import os
from typing import Any

import requests

from checkov.common.util.env_vars_config import env_vars_config


class ProxyClient:
def __init__(self) -> None:
self.identity = env_vars_config.PROXY_HEADER_VALUE
self.proxy_ca_path = env_vars_config.PROXY_CA_PATH
self.identity = os.getenv('PROXY_HEADER_VALUE')
self.proxy_ca_path = os.getenv('PROXY_CA_PATH')
if self.proxy_ca_path is None:
logging.warning("[ProxyClient] CA certificate path is missing")

def get_session(self) -> requests.Session:
if not env_vars_config.PROXY_URL:
if not os.getenv('PROXY_URL'):
logging.warning('Please provide "PROXY_URL" env var')
proxy_url = env_vars_config.PROXY_URL
proxy_url = os.getenv('PROXY_URL')
session = requests.Session()
proxies = {
"http": proxy_url,
Expand All @@ -28,8 +27,8 @@ def get_session(self) -> requests.Session:
return session

def update_request_header(self, request: requests.Request) -> None:
if env_vars_config.PROXY_HEADER_VALUE:
request.headers[env_vars_config.PROXY_HEADER_VALUE] = self.identity
if os.getenv('PROXY_HEADER_KEY'):
request.headers[os.getenv('PROXY_HEADER_KEY')] = self.identity

def send_request(self, request: requests.Request) -> requests.Response:
session = self.get_session()
Expand Down
2 changes: 1 addition & 1 deletion checkov/common/util/env_vars_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self) -> None:
self.PROXY_CA_PATH = os.getenv('PROXY_CA_PATH', None)
self.PROXY_URL = os.getenv('PROXY_URL', None)
self.PROXY_HEADER_VALUE = os.getenv('PROXY_HEADER_VALUE', None)
self.PROXY_HEADER_KEY = os.getenv('PROXY_HEADER_VALUE', None)
self.PROXY_HEADER_KEY = os.getenv('PROXY_HEADER_KEY', None)
self.ENABLE_CONFIG_FILE_VALIDATION = convert_str_to_bool(os.getenv("ENABLE_CONFIG_FILE_VALIDATION", False))


Expand Down
3 changes: 3 additions & 0 deletions checkov/terraform/module_loading/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,18 @@ def load(self, module_params: ModuleParams) -> ModuleContent:
"""
self.discover(module_params)
if not self._is_matching_loader(module_params):
self.logger.info(f'the module params of {module_params.module_source} do not match loader {self.__class__}')
return ModuleContent(dir=None)

module_path = self._find_module_path(module_params)
if os.path.exists(module_path):
self.logger.info(f'path {module_path} exists so no need to load')
return ModuleContent(dir=module_path)

self.logger.debug(f"Using {self.__class__.__name__} attempting to get module "
f"{module_params.module_source if '@' not in module_params.module_source else module_params.module_source.split('@')[1]} "
f"version: {module_params.version}")
self.logger.info(f'about to load {module_params.module_source} with {self.__class__}')
return self._load_module(module_params)

@abstractmethod
Expand Down
3 changes: 2 additions & 1 deletion checkov/terraform/module_loading/loaders/git_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ def _load_module(self, module_params: ModuleParams) -> ModuleContent:
# but the current loader (ex. GithubLoader) is not using it
return ModuleContent(dir=None, failed_url=module_params.module_source)
if 'File exists' not in str_e and 'already exists and is not an empty directory' not in str_e:
self.logger.warning(f"failed to get {module_params.module_source} because of {e}")
self.logger.warning(f"failed to get {module_params.module_source} in git loader because of {e}")
return ModuleContent(dir=None, failed_url=module_params.module_source)
return_dir = module_params.dest_dir
self.logger.info(f'finished loading {module_params.module_source}')
if module_params.inner_module:
return_dir = os.path.join(module_params.dest_dir, module_params.inner_module)
return ModuleContent(dir=return_dir)
Expand Down
35 changes: 24 additions & 11 deletions checkov/terraform/module_loading/loaders/registry_loader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import os
import logging
from http import HTTPStatus
from typing import List, Dict, TYPE_CHECKING

Expand Down Expand Up @@ -81,16 +80,16 @@ def _load_module(self, module_params: ModuleParams) -> ModuleContent:
return ModuleContent(dir=None)

request_download_url = urljoin(module_params.tf_modules_endpoint, "/".join((module_params.module_source, best_version, "download")))
logging.debug(f"Best version for {module_params.module_source} is {best_version} based on the version constraint {module_params.version}.")
logging.debug(f"Module download url: {request_download_url}")
self.logger.debug(f"Best version for {module_params.module_source} is {best_version} based on the version constraint {module_params.version}.")
self.logger.debug(f"Module download url: {request_download_url} and proxy: {os.getenv('PROXY_URL')}")
try:
request = requests.Request(
method='GET',
url=request_download_url,
headers={"Authorization": f"Bearer {module_params.token}"} if module_params.token else None
)
if os.getenv('PROXY_URL'):
logging.info('Sending request with proxy')
self.logger.info(f'Sending request to {request.url} through proxy')
response = call_http_request_with_proxy(request)
else:
session = requests.Session()
Expand All @@ -117,7 +116,7 @@ def _load_module(self, module_params: ModuleParams) -> ModuleContent:
except Exception as e:
str_e = str(e)
if 'File exists' not in str_e and 'already exists and is not an empty directory' not in str_e:
self.logger.error(f"failed to get {module_params.module_source} because of {e}")
self.logger.error(f"failed to get {module_params.module_source} in registry loader because of {e}")
return ModuleContent(dir=None, failed_url=module_params.module_source)
if module_params.inner_module:
return_dir = os.path.join(module_params.dest_dir, module_params.inner_module)
Expand Down Expand Up @@ -157,11 +156,18 @@ def _cache_available_versions(self, module_params: ModuleParams) -> bool:
return False

try:
response = requests.get(
url=module_params.tf_modules_versions_endpoint,
request = requests.Request(
method='GET',
headers={"Authorization": f"Bearer {module_params.token}"} if module_params.token else None,
timeout=DEFAULT_TIMEOUT,
url=module_params.tf_modules_versions_endpoint
)
if os.getenv('PROXY_URL'):
self.logger.info(f'Sending request to {request.url} through proxy')
response = call_http_request_with_proxy(request)
else:
session = requests.Session()
prepared_request = session.prepare_request(request)
response = session.send(prepared_request, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
available_versions = [
v.get("version") for v in response.json().get("modules", [{}])[0].get("versions", {})
Expand Down Expand Up @@ -193,10 +199,17 @@ def _determine_tf_api_endpoints(self, module_params: ModuleParams) -> None:
# https://www.terraform.io/internals/remote-service-discovery#remote-service-discovery
module_params.module_source = module_params.module_source.replace(f"{module_params.tf_host_name}/", "")
try:
response = requests.get(
url=f"https://{module_params.tf_host_name}/.well-known/terraform.json",
timeout=DEFAULT_TIMEOUT,
request = requests.Request(
method='GET',
url=f"https://{module_params.tf_host_name}/.well-known/terraform.json"
)
if os.getenv('PROXY_URL'):
self.logger.info(f'Sending request to {request.url} through proxy')
response = call_http_request_with_proxy(request)
else:
session = requests.Session()
prepared_request = session.prepare_request(request)
response = session.send(prepared_request, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except HTTPError as e:
self.logger.debug(e)
Expand Down
5 changes: 3 additions & 2 deletions checkov/terraform/module_loading/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ def load(
inner_module=inner_module,
tf_managed=tf_managed,
)
logging.info(f"Attempting loading via {loader.__class__} loader")
logging.info(f"Attempting loading {source} via {loader.__class__} loader")
content = loader.load(module_params)
logging.info(f"Loading result of {module_address}={content.loaded()} via {loader.__class__} loader")
except Exception as e:
logging.warning(f'Module {module_address} failed to load via {loader.__class__}')
logging.warning(f'Module {module_address} failed to load via {loader.__class__} due to: {e}')
last_exception = e
continue
if content.next_url:
Expand Down
Loading