|
| 1 | +import os |
| 2 | +import io |
| 3 | +import sys |
| 4 | + |
| 5 | +import minio |
| 6 | +from minio import Minio |
| 7 | +from minio.deleteobjects import DeleteObject |
| 8 | +import urllib3 |
| 9 | +from pg_probackup2.storage.fs_backup import TestBackupDir |
| 10 | +from pg_probackup2.init_helpers import init_params |
| 11 | +from s3.test_utils import config_provider |
| 12 | + |
| 13 | +root = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..')) |
| 14 | +if root not in sys.path: |
| 15 | + sys.path.append(root) |
| 16 | + |
| 17 | +status_forcelist = [413, # RequestBodyTooLarge |
| 18 | + 429, # TooManyRequests |
| 19 | + 500, # InternalError |
| 20 | + 503, # ServerBusy |
| 21 | + ] |
| 22 | + |
| 23 | +DEFAULT_CONF_FILE = 's3/tests/s3.conf' |
| 24 | + |
| 25 | + |
| 26 | +class S3TestBackupDir(TestBackupDir): |
| 27 | + is_file_based = False |
| 28 | + |
| 29 | + def __init__(self, *, rel_path, backup): |
| 30 | + self.access_key = None |
| 31 | + self.secret_key = None |
| 32 | + self.s3_type = None |
| 33 | + self.tmp_path = None |
| 34 | + self.host = None |
| 35 | + self.port = None |
| 36 | + self.bucket_name = None |
| 37 | + self.region = None |
| 38 | + self.bucket = None |
| 39 | + self.path_suffix = None |
| 40 | + self.https = None |
| 41 | + self.s3_config_file = None |
| 42 | + self.ca_certificate = None |
| 43 | + |
| 44 | + self.set_s3_config_file() |
| 45 | + self.setup_s3_env() |
| 46 | + |
| 47 | + path = "pg_probackup" |
| 48 | + if self.path_suffix: |
| 49 | + path += "_" + self.path_suffix |
| 50 | + if self.tmp_path == '' or os.path.isabs(self.tmp_path): |
| 51 | + self.path = f"{path}{self.tmp_path}/{rel_path}/{backup}" |
| 52 | + else: |
| 53 | + self.path = f"{path}/{self.tmp_path}/{rel_path}/{backup}" |
| 54 | + |
| 55 | + secure: bool = False |
| 56 | + self.versioning: bool = False |
| 57 | + if self.https in ['ON', 'HTTPS']: |
| 58 | + secure = True |
| 59 | + if self.https and self.ca_certificate: |
| 60 | + http_client = urllib3.PoolManager(cert_reqs='CERT_REQUIRED', |
| 61 | + ca_certs=self.ca_certificate, |
| 62 | + retries=urllib3.Retry(total=5, |
| 63 | + backoff_factor=1, |
| 64 | + status_forcelist=status_forcelist)) |
| 65 | + else: |
| 66 | + http_client = urllib3.PoolManager(retries=urllib3.Retry(total=5, |
| 67 | + backoff_factor=1, |
| 68 | + status_forcelist=status_forcelist)) |
| 69 | + |
| 70 | + self.conn = Minio(self.host + ":" + self.port, secure=secure, access_key=self.access_key, |
| 71 | + secret_key=self.secret_key, http_client=http_client) |
| 72 | + if not self.conn.bucket_exists(self.bucket): |
| 73 | + raise Exception(f"Test bucket {self.bucket} does not exist.") |
| 74 | + |
| 75 | + try: |
| 76 | + config = self.conn.get_bucket_versioning(self.bucket) |
| 77 | + if config.status.lower() == "enabled" or config.status.lower() == "suspended": |
| 78 | + self.versioning = True |
| 79 | + else: |
| 80 | + self.versioning = False |
| 81 | + except Exception as e: |
| 82 | + if "NotImplemented" in repr(e): |
| 83 | + self.versioning = False |
| 84 | + else: |
| 85 | + raise e |
| 86 | + self.pb_args = ('-B', '/' + self.path, f'--s3={init_params.s3_type}') |
| 87 | + if self.s3_config_file: |
| 88 | + self.pb_args += (f'--s3-config-file={self.s3_config_file}',) |
| 89 | + return |
| 90 | + |
| 91 | + def setup_s3_env(self, s3_config=None): |
| 92 | + self.tmp_path = os.environ.get('PGPROBACKUP_TMP_DIR', default='') |
| 93 | + self.host = os.environ.get('PG_PROBACKUP_S3_HOST', default='') |
| 94 | + |
| 95 | + # If environment variables are not setup, use from config |
| 96 | + if self.s3_config_file or s3_config: |
| 97 | + minio_config = config_provider.read_config(self.s3_config_file or s3_config) |
| 98 | + self.access_key = minio_config['access-key'] |
| 99 | + self.secret_key = minio_config['secret-key'] |
| 100 | + self.host = minio_config['s3-host'] |
| 101 | + self.port = minio_config['s3-port'] |
| 102 | + self.bucket = minio_config['s3-bucket'] |
| 103 | + self.region = minio_config['s3-region'] |
| 104 | + self.https = minio_config['s3-secure'] |
| 105 | + init_params.s3_type = 'minio' |
| 106 | + else: |
| 107 | + self.access_key = os.environ.get('PG_PROBACKUP_S3_ACCESS_KEY') |
| 108 | + self.secret_key = os.environ.get('PG_PROBACKUP_S3_SECRET_ACCESS_KEY') |
| 109 | + self.host = os.environ.get('PG_PROBACKUP_S3_HOST') |
| 110 | + self.port = os.environ.get('PG_PROBACKUP_S3_PORT') |
| 111 | + self.bucket = os.environ.get('PG_PROBACKUP_S3_BUCKET_NAME') |
| 112 | + self.region = os.environ.get('PG_PROBACKUP_S3_REGION') |
| 113 | + self.https = os.environ.get('PG_PROBACKUP_S3_HTTPS') |
| 114 | + self.ca_certificate = os.environ.get('PG_PROBACKUP_S3_CA_CERTIFICATE') |
| 115 | + init_params.s3_type = os.environ.get('PG_PROBACKUP_S3_TEST') |
| 116 | + |
| 117 | + # multi-url case |
| 118 | + # remove all urls from string except the first one |
| 119 | + if ';' in self.host: |
| 120 | + self.host = self.host[:self.host.find(';')] |
| 121 | + if ':' in self.host: # also change port if it was overridden in multihost string |
| 122 | + self.port = self.host[self.host.find(':') + 1:] |
| 123 | + self.host = self.host[:self.host.find(':')] |
| 124 | + |
| 125 | + def set_s3_config_file(self): |
| 126 | + s3_config = os.environ.get('PG_PROBACKUP_S3_CONFIG_FILE') |
| 127 | + if s3_config is not None and s3_config.strip().lower() == "true": |
| 128 | + self.s3_config_file = DEFAULT_CONF_FILE |
| 129 | + else: |
| 130 | + self.s3_config_file = s3_config |
| 131 | + |
| 132 | + def list_instance_backups(self, instance): |
| 133 | + full_path = os.path.join(self.path, 'backups', instance) |
| 134 | + candidates = self.conn.list_objects(self.bucket, prefix=full_path, recursive=True) |
| 135 | + return [os.path.basename(os.path.dirname(x.object_name)) |
| 136 | + for x in candidates if x.object_name.endswith('backup.control')] |
| 137 | + |
| 138 | + def list_files(self, sub_dir, recursive=False): |
| 139 | + full_path = os.path.join(self.path, sub_dir) |
| 140 | + # Need '/' in the end to find inside the folder |
| 141 | + full_path_dir = full_path if full_path[-1] == '/' else full_path + '/' |
| 142 | + object_list = self.conn.list_objects(self.bucket, prefix=full_path_dir, recursive=recursive) |
| 143 | + return [obj.object_name.replace(full_path_dir, '', 1) |
| 144 | + for obj in object_list |
| 145 | + if not obj.is_dir] |
| 146 | + |
| 147 | + def list_dirs(self, sub_dir): |
| 148 | + full_path = os.path.join(self.path, sub_dir) |
| 149 | + # Need '/' in the end to find inside the folder |
| 150 | + full_path_dir = full_path if full_path[-1] == '/' else full_path + '/' |
| 151 | + object_list = self.conn.list_objects(self.bucket, prefix=full_path_dir, recursive=False) |
| 152 | + return [obj.object_name.replace(full_path_dir, '', 1).rstrip('\\/') |
| 153 | + for obj in object_list |
| 154 | + if obj.is_dir] |
| 155 | + |
| 156 | + def read_file(self, sub_path, *, text=True): |
| 157 | + full_path = os.path.join(self.path, sub_path) |
| 158 | + bytes = self.conn.get_object(self.bucket, full_path).read() |
| 159 | + if not text: |
| 160 | + return bytes |
| 161 | + return bytes.decode('utf-8') |
| 162 | + |
| 163 | + def write_file(self, sub_path, data, *, text=True): |
| 164 | + full_path = os.path.join(self.path, sub_path) |
| 165 | + if text: |
| 166 | + data = data.encode('utf-8') |
| 167 | + self.conn.put_object(self.bucket, full_path, io.BytesIO(data), length=len(data)) |
| 168 | + |
| 169 | + def cleanup(self, dir=''): |
| 170 | + self.remove_dir(dir) |
| 171 | + |
| 172 | + def remove_file(self, sub_path): |
| 173 | + full_path = os.path.join(self.path, sub_path) |
| 174 | + self.conn.remove_object(self.bucket, full_path) |
| 175 | + |
| 176 | + def remove_dir(self, sub_path): |
| 177 | + if sub_path: |
| 178 | + full_path = os.path.join(self.path, sub_path) |
| 179 | + else: |
| 180 | + full_path = self.path |
| 181 | + objs = self.conn.list_objects(self.bucket, prefix=full_path, recursive=True, |
| 182 | + include_version=self.versioning) |
| 183 | + delobjs = (DeleteObject(o.object_name, o.version_id) for o in objs) |
| 184 | + errs = list(self.conn.remove_objects(self.bucket, delobjs)) |
| 185 | + if errs: |
| 186 | + strerrs = "; ".join(str(err) for err in errs) |
| 187 | + raise Exception("There were errors: {0}".format(strerrs)) |
| 188 | + |
| 189 | + def exists(self, sub_path): |
| 190 | + full_path = os.path.join(self.path, sub_path) |
| 191 | + try: |
| 192 | + self.conn.stat_object(self.bucket, full_path) |
| 193 | + return True |
| 194 | + except minio.error.S3Error as s3err: |
| 195 | + if s3err.code == 'NoSuchKey': |
| 196 | + return False |
| 197 | + raise s3err |
| 198 | + except Exception as err: |
| 199 | + raise err |
| 200 | + |
| 201 | + def __str__(self): |
| 202 | + return '/' + self.path |
| 203 | + |
| 204 | + def __repr__(self): |
| 205 | + return "S3TestBackupDir" + str(self.path) |
| 206 | + |
| 207 | + def __fspath__(self): |
| 208 | + return self.path |
0 commit comments