Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(general): fix proxy access from git and registry loader #6992

Merged
merged 18 commits into from
Feb 6, 2025
34 changes: 22 additions & 12 deletions checkov/common/goget/github/get_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import logging
import re
import shutil
import os

from checkov.common.goget.base_getter import BaseGetter
from checkov.common.resource_code_logger_filter import add_resource_code_filter_to_logger
from checkov.common.util.contextmanagers import temp_environ
from checkov.common.util.env_vars_config import env_vars_config

try:
from git import Repo
Expand Down Expand Up @@ -64,9 +66,7 @@
if git_import_error is not None:
raise ImportError("Unable to load git module (is the git executable available?)") \
from git_import_error

git_url, internal_dir = self._source_subdir()

clone_dir = self.temp_dir + "/clone/" if self.create_clone_and_res_dirs else self.temp_dir
self._clone(git_url, clone_dir)

Expand All @@ -81,17 +81,27 @@
return clone_dir

def _clone(self, git_url: str, clone_dir: str) -> None:
self.logger.debug(f"cloning {self.url if '@' not in self.url else self.url.split('@')[1]} to {clone_dir}")
print(f"cloning {self.url if '@' not in self.url else self.url.split('@')[1]} to {clone_dir}")
Fixed Show fixed Hide fixed
with temp_environ(GIT_TERMINAL_PROMPT="0"): # disables user prompts originating from GIT
if self.branch:
Repo.clone_from(git_url, clone_dir, branch=self.branch, depth=1) # depth=1 for shallow clone
elif self.commit_id: # no commit id support for branch
repo = Repo.clone_from(git_url, clone_dir, no_checkout=True) # need to be a full git clone
repo.git.checkout(self.commit_id)
elif self.tag:
Repo.clone_from(git_url, clone_dir, depth=1, b=self.tag)
else:
Repo.clone_from(git_url, clone_dir, depth=1)
print(f'The proxy url {os.getenv("PROXY_URL")}')
if os.getenv('PROXY_URL'):
print(f'Performing clone through proxy - {env_vars_config.PROXY_URL} - {os.getenv("PROXY_URL")}')
with temp_environ(GIT_SSL_CAINFO=os.getenv('PROXY_CA_PATH'),
https_proxy=os.getenv('PROXY_URL'),
GIT_CONFIG_PARAMETERS=f"'http.extraHeader={os.getenv('PROXY_HEADER_KEY')}:{os.getenv('PROXY_HEADER_VALUE')}'"):
self._clone_helper(clone_dir, git_url)
self._clone_helper(clone_dir, git_url)

def _clone_helper(self, clone_dir, git_url):
if self.branch:
Repo.clone_from(git_url, clone_dir, branch=self.branch, depth=1) # depth=1 for shallow clone
elif self.commit_id: # no commit id support for branch
repo = Repo.clone_from(git_url, clone_dir, no_checkout=True) # need to be a full git clone
repo.git.checkout(self.commit_id)
elif self.tag:
Repo.clone_from(git_url, clone_dir, depth=1, b=self.tag)
else:
Repo.clone_from(git_url, clone_dir, depth=1)

# Split source url into Git url and subdirectory path e.g. test.com/repo//repo/subpath becomes 'test.com/repo', '/repo/subpath')
# Also see reference implementation @ go-getter https://github.com/hashicorp/go-getter/blob/main/source.go
Expand Down
16 changes: 8 additions & 8 deletions checkov/common/proxy/proxy_client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
from __future__ import annotations

import logging
import os
from typing import Any

import requests

from checkov.common.util.env_vars_config import env_vars_config


class ProxyClient:
def __init__(self) -> None:
self.identity = env_vars_config.PROXY_HEADER_VALUE
self.proxy_ca_path = env_vars_config.PROXY_CA_PATH
self.identity = os.getenv('PROXY_HEADER_VALUE')
self.proxy_ca_path = os.getenv('PROXY_CA_PATH')
if self.proxy_ca_path is None:
logging.warning("[ProxyClient] CA certificate path is missing")

def get_session(self) -> requests.Session:
if not env_vars_config.PROXY_URL:
if not os.getenv('PROXY_URL'):
logging.warning('Please provide "PROXY_URL" env var')
proxy_url = env_vars_config.PROXY_URL
proxy_url = os.getenv('PROXY_URL')
session = requests.Session()
proxies = {
"http": proxy_url,
Expand All @@ -28,8 +27,8 @@ def get_session(self) -> requests.Session:
return session

def update_request_header(self, request: requests.Request) -> None:
if env_vars_config.PROXY_HEADER_VALUE:
request.headers[env_vars_config.PROXY_HEADER_VALUE] = self.identity
if os.getenv('PROXY_HEADER_KEY'):
request.headers[os.getenv('PROXY_HEADER_KEY')] = self.identity

def send_request(self, request: requests.Request) -> requests.Response:
session = self.get_session()
Expand All @@ -39,5 +38,6 @@ def send_request(self, request: requests.Request) -> requests.Response:


def call_http_request_with_proxy(request: requests.Request) -> Any:
print(f'Calling through proxy url: {request.__dict__}')
proxy_client = ProxyClient()
return proxy_client.send_request(request=request)
2 changes: 1 addition & 1 deletion checkov/common/util/env_vars_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self) -> None:
self.PROXY_CA_PATH = os.getenv('PROXY_CA_PATH', None)
self.PROXY_URL = os.getenv('PROXY_URL', None)
self.PROXY_HEADER_VALUE = os.getenv('PROXY_HEADER_VALUE', None)
self.PROXY_HEADER_KEY = os.getenv('PROXY_HEADER_VALUE', None)
self.PROXY_HEADER_KEY = os.getenv('PROXY_HEADER_KEY', None)
self.ENABLE_CONFIG_FILE_VALIDATION = convert_str_to_bool(os.getenv("ENABLE_CONFIG_FILE_VALIDATION", False))


Expand Down
3 changes: 3 additions & 0 deletions checkov/terraform/module_loading/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,18 @@
"""
self.discover(module_params)
if not self._is_matching_loader(module_params):
print(f'the module params of {module_params.module_source} do not match loader {self.__class__} so returning None')
Fixed Show fixed Hide fixed
return ModuleContent(dir=None)

module_path = self._find_module_path(module_params)
if os.path.exists(module_path):
print(f'path {module_path} exists so no need to load')
return ModuleContent(dir=module_path)

self.logger.debug(f"Using {self.__class__.__name__} attempting to get module "
f"{module_params.module_source if '@' not in module_params.module_source else module_params.module_source.split('@')[1]} "
f"version: {module_params.version}")
print(f'about to load {module_params.module_source} with {self.__class__}')
Fixed Show fixed Hide fixed
return self._load_module(module_params)

@abstractmethod
Expand Down
6 changes: 5 additions & 1 deletion checkov/terraform/module_loading/loaders/git_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@

def _load_module(self, module_params: ModuleParams) -> ModuleContent:
try:
print(f'attempting to load module {module_params.module_source} via git loader: {module_params.__dict__}')
Fixed Show fixed Hide fixed
self._process_generic_git_repo(module_params)
module_source = module_params.module_source.replace("git::", "")
print(f'running git getter with {module_source} via git loader: {module_params.__dict__}')
Fixed Show fixed Hide fixed
git_getter = GitGetter(module_source, create_clone_and_result_dirs=False)
git_getter.temp_dir = module_params.dest_dir
print('performing git operation')
git_getter.do_get()
except Exception as e:
str_e = str(e)
Expand All @@ -72,9 +75,10 @@
# but the current loader (ex. GithubLoader) is not using it
return ModuleContent(dir=None, failed_url=module_params.module_source)
if 'File exists' not in str_e and 'already exists and is not an empty directory' not in str_e:
self.logger.warning(f"failed to get {module_params.module_source} because of {e}")
self.logger.warning(f"failed to get {module_params.module_source} in git loader because of {e}")
return ModuleContent(dir=None, failed_url=module_params.module_source)
return_dir = module_params.dest_dir
self.logger.info(f'finished loading {module_params.module_source}')
if module_params.inner_module:
return_dir = os.path.join(module_params.dest_dir, module_params.inner_module)
return ModuleContent(dir=return_dir)
Expand Down
10 changes: 6 additions & 4 deletions checkov/terraform/module_loading/loaders/registry_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
return False

def _load_module(self, module_params: ModuleParams) -> ModuleContent:
print(f'loading NOW {module_params.module_source} via registry loader')
Fixed Show fixed Hide fixed
if module_params.best_version:
best_version = module_params.best_version
else:
Expand All @@ -81,16 +82,17 @@
return ModuleContent(dir=None)

request_download_url = urljoin(module_params.tf_modules_endpoint, "/".join((module_params.module_source, best_version, "download")))
logging.debug(f"Best version for {module_params.module_source} is {best_version} based on the version constraint {module_params.version}.")
logging.debug(f"Module download url: {request_download_url}")
self.logger.debug(f"Best version for {module_params.module_source} is {best_version} based on the version constraint {module_params.version}.")
self.logger.debug(f"Module download url: {request_download_url}")
print(f'The proxy url currently: {os.getenv("PROXY_URL")}')
try:
request = requests.Request(
method='GET',
url=request_download_url,
headers={"Authorization": f"Bearer {module_params.token}"} if module_params.token else None
)
if os.getenv('PROXY_URL'):
logging.info('Sending request with proxy')
self.logger.info('Sending request with proxy')
response = call_http_request_with_proxy(request)
else:
session = requests.Session()
Expand All @@ -117,7 +119,7 @@
except Exception as e:
str_e = str(e)
if 'File exists' not in str_e and 'already exists and is not an empty directory' not in str_e:
self.logger.error(f"failed to get {module_params.module_source} because of {e}")
self.logger.error(f"failed to get {module_params.module_source} in registry loader because of {e}")
return ModuleContent(dir=None, failed_url=module_params.module_source)
if module_params.inner_module:
return_dir = os.path.join(module_params.dest_dir, module_params.inner_module)
Expand Down
7 changes: 4 additions & 3 deletions checkov/terraform/module_loading/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ def load(
next_url = ""
if source in self.failed_urls_cache:
break
logging.info(f"Iterating over {len(self.loaders)} loaders")
print(f"Iterating over {len(self.loaders)} loaders")
for loader in self.loaders:
print(f"Trying loader {loader.__class__} loader")
if not self.download_external_modules and loader.is_external:
continue
try:
Expand All @@ -82,10 +83,10 @@ def load(
inner_module=inner_module,
tf_managed=tf_managed,
)
logging.info(f"Attempting loading via {loader.__class__} loader")
print(f"Attempting loading {source} via {loader.__class__} loader")
content = loader.load(module_params)
except Exception as e:
logging.warning(f'Module {module_address} failed to load via {loader.__class__}')
logging.warning(f'Module {module_address} failed to load via {loader.__class__} due to: {e}')
last_exception = e
continue
if content.next_url:
Expand Down
Loading