-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig_importer.py
executable file
·332 lines (281 loc) · 12.8 KB
/
config_importer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
#!/usr/bin/env python
from __future__ import annotations
import configparser
import hashlib
import logging
import os
import pathlib
import platform
import re
import shutil
import sys
import yaml
from abc import ABC, abstractmethod
from agent.config import AgentConfig, get_config
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional
HASH_BLOCK_SIZE = 65536
YAML_LINE_WIDTH = 999
KEY_IP_ALLOWED_HOSTS = 'allowed_hosts'
KEY_IP_SERVER_PORT = 'server_port'
KEY_OP_SERVER = 'server'
KEY_OP_SERVER_ALLOWED_HOSTS = 'allowed_hosts'
KEY_OP_SERVER_PORT = 'port'
KEY_OP_COMMANDS = 'commands'
AGENT_PATH_LINUX = '/opt/opsview/agent'
AGENT_PATH_WINDOWS = 'C:/Program Files/Opsview Agent'
RE_COMMAND_NIX = re.compile(r'(?P<dir>[\w/\-.]*/)?(?P<check>check_[\w\-.]+)')
RE_COMMAND_WIN = re.compile(r'(?P<dir>(?:[A-Z]:)?[\w \\\-/)(.]+[\\/])?(?P<check>check_[\w\-.]+)')
logger: Optional[logging.Logger] = None
def is_windows() -> bool:
"""Determine if we are running on Windows"""
return 'windows' in platform.system().lower()
def clean_partition(line: str, delim='=') -> tuple[str, str] | tuple[None, None]:
"""Helper method to perform a string partition providing stripped parts"""
pt1, dv, pt2 = line.partition(delim)
return (pt1.strip(), pt2.strip()) if dv == delim else (None, None)
def clean_split(line: str, delim=','):
"""Helper method to convert a delimited string into a clean list"""
if line is None:
return line
parts: list[str] = line.split(delim)
return [p.strip() for p in parts]
def hash_file(file_path: str) -> str:
"""Produce a SHA256 hash of a file"""
file_hash = hashlib.sha256()
with open(file_path, 'rb') as f_in:
fb = f_in.read(HASH_BLOCK_SIZE)
while len(fb):
file_hash.update(fb)
fb = f_in.read(HASH_BLOCK_SIZE)
return file_hash.hexdigest()
def copy_file_if_different(src_file_path: str, dst_file_path: str) -> bool:
"""Helper method to copy a file if there is no destination file, or they are different"""
src_hash = hash_file(src_file_path)
dst_hash = hash_file(dst_file_path) if os.path.exists(dst_file_path) else None
if src_hash != dst_hash:
shutil.copy2(src_file_path, dst_file_path)
return True
return False
class BaseReader(ABC):
"""Abstract base class for configuration file readers"""
@abstractmethod
def scan_config(self, existing_plugins: set[str]) -> (dict[str, str], str, str):
command_dict: dict[str, str] = {}
allowed_hosts: str = ''
server_port: int = 0
return command_dict, allowed_hosts, server_port
@abstractmethod
def get_agent_dir(self) -> str:
return ''
def write_path(self, cmd_line) -> str:
"""Allows paths to be re-written according to the OS"""
return cmd_line
class CfgReader(BaseReader):
"""Manages reading from NRPE bases config files (Linux)"""
RE_COMMAND = re.compile(r'command\[([\w-]+)]')
def scan_config(self, existing_plugins: set[str]) -> (dict[str, str], str, str):
data_dict = self._read_file_data(os.path.join(AGENT_PATH_LINUX, 'etc/nrpe.cfg'))
command_dict = {}
allowed_hosts = None
server_port = None
for key, value in data_dict.items():
if key == KEY_IP_SERVER_PORT:
server_port = int(value)
elif key == KEY_IP_ALLOWED_HOSTS:
allowed_hosts = clean_split(value)
else:
m_command = self.RE_COMMAND.match(key)
if m_command:
command_name: str = m_command.group(1)
if command_name not in existing_plugins:
command_dict[command_name] = value
logger.debug("Command: %s: %s", command_name, value)
return command_dict, allowed_hosts, server_port
def get_agent_dir(self) -> str:
return AGENT_PATH_LINUX
@classmethod
def _read_file_data(cls, file_path) -> dict[str, str]:
logger.debug("Importing file: %s", file_path)
file_dir = os.path.dirname(file_path)
data_dict: dict[str, str] = {}
try:
with open(file_path) as f:
for line in f:
clean_line: str = line.strip()
if not clean_line or clean_line.startswith('#'):
continue
key, value = clean_partition(clean_line)
if not value:
logger.warning("Invalid option value for %s", key)
continue
if key == 'include':
child_data_dict = cls._read_file_data(os.path.join(file_dir, value))
data_dict.update(child_data_dict)
elif key == 'include_dir':
child_dir = os.path.join(file_dir, value)
logger.debug("Including dir: %s", child_dir)
for child_file in pathlib.Path(child_dir).rglob('*.cfg'):
child_data_dict = cls._read_file_data(os.path.join(file_dir, child_file))
data_dict.update(child_data_dict)
else:
data_dict[key] = value
except (FileNotFoundError, PermissionError) as ex:
logger.warning("Cannot import from '%s': %s", file_path, ex)
return data_dict
class IniReader(BaseReader):
"""Manages reading from NSClient bases config files (Windows)"""
# These checks are included in the existing Opsview Agent, but are not yet
# included in the new Agent (we can't just copy over, since they have dependencies)
ORIGINAL_CHECKS = {
'check_mountpoint',
'check_services',
'check_clustergroup',
'check_windows_base',
'check_msmq',
'check_ms_iis',
'check_ms_dns',
'check_ms_sql_database_states',
'check_ms_sql_performance',
'check_ms_sql_system',
'check_ms_hyperv_server',
'check_microsoft_exchange2016_backpressure',
'check_microsoft_exchange2013_backpressure',
'check_microsoft_exchange_counters',
'check_microsoft_exchange',
'check_active_directory',
'check_windows_updates',
'check_file_age',
'check_http',
'check_ssl',
}
def scan_config(self, existing_plugins: set[str]) -> (dict, str, str):
config = configparser.ConfigParser(allow_no_value=True, strict=False)
config.read(os.path.join(AGENT_PATH_WINDOWS, 'opsview.ini'))
custom_plugins = {}
try:
commands = config['NRPE Handlers']
for name, command_line in commands.items():
if name not in self.ORIGINAL_CHECKS and name not in existing_plugins:
custom_plugins[name] = command_line
logger.debug("custom_plugins=%s", custom_plugins)
except KeyError:
pass
try:
server_port = int(config['NRPE']['port'])
logger.debug("server_port=%d", server_port)
except KeyError:
server_port = None
try:
allowed_hosts = clean_split(config['Settings']['allowed_hosts'])
logger.debug("allowed_hosts=%s", allowed_hosts)
except KeyError:
allowed_hosts = None
return custom_plugins, allowed_hosts, server_port
def get_agent_dir(self) -> str:
return AGENT_PATH_WINDOWS
def write_path(self, cmd_line) -> str:
# Attempt to fix windows paths to be of the form: "C:/Program\ Files/something"
return cmd_line.replace('\\ ', ' ').replace('\\', '/').replace(' ', '\\ ')
def create_config_reader(is_win: bool):
"""Factory method for creating a config reader"""
return IniReader() if is_win else CfgReader()
class ConfigImporter:
"""Responsible for importing the configuration from previous """
def __init__(self, existing_config: AgentConfig, is_win: bool):
self._existing_config = existing_config
self._is_win = is_win
self._re_cmd = RE_COMMAND_WIN if is_win else RE_COMMAND_NIX
base_dir = pathlib.Path(__file__).parent
if getattr(sys, 'frozen', False):
base_dir = base_dir / '../../../'
base_dir_abs = base_dir.resolve()
logger.info("ConfigImporter base_dir_abs = '%s'", base_dir_abs)
self._old_config_file_path = base_dir_abs / 'cfg/custom/imported.yml'
self._output_config_file_path = base_dir_abs / 'cfg/imported.yml'
self._output_script_dir = base_dir_abs / 'plugins/imported'
def run_if_required(self) -> bool:
"""Runs the import if it hasn't already been run.
Returns True if the config has changed and needs reading again.
"""
if os.path.isfile(self._output_config_file_path):
logger.info("Imported config file already exists at '%s'", self._output_config_file_path)
return False
if os.path.isfile(self._old_config_file_path):
os.replace(self._old_config_file_path, self._output_config_file_path)
logger.info("Moved imported config file to '%s'", self._output_config_file_path)
return True
output_config_dir = self._output_config_file_path.parent
os.makedirs(output_config_dir, exist_ok=True)
rdr: BaseReader = create_config_reader(self._is_win)
existing_plugins = set(self._existing_config.commands.keys())
custom_plugins, allowed_hosts, server_port = rdr.scan_config(existing_plugins)
data_dict = {}
if custom_plugins:
copied_plugins = {}
for command_name, command_line in custom_plugins.items():
try:
new_command_line: str = self._copy_custom_plugin(rdr, command_line)
except FileNotFoundError as ex:
logger.warning("Could not import '%s': %s", command_name, ex)
continue
if new_command_line:
copied_plugins[command_name] = {'path': new_command_line}
if copied_plugins:
data_dict[KEY_OP_COMMANDS] = copied_plugins
if allowed_hosts or server_port:
server_section = {}
if allowed_hosts:
server_section[KEY_OP_SERVER_ALLOWED_HOSTS] = allowed_hosts
if server_port:
server_section[KEY_OP_SERVER_PORT] = server_port
data_dict[KEY_OP_SERVER] = server_section
logger.info("Creating imported config file: '%s'", self._output_config_file_path)
with open(self._output_config_file_path, 'w') as f_out:
f_out.write('---\n')
f_out.write('# Imported configuration file.\n')
f_out.write('# Warning:\n')
f_out.write('# If this file is deleted/moved, the import process will be\n')
f_out.write('# automatically run again when the Agent next starts.\n\n')
if data_dict:
yaml.dump(data_dict, f_out, width=YAML_LINE_WIDTH)
os.chmod(self._output_config_file_path, 0o640)
return True
def _copy_custom_plugin(self, rdr: BaseReader, command_line: str) -> Optional[str]:
"""Copies the existing plugin to the new location and updates the path"""
def check_path_matcher(matcher: re.Match) -> str:
dir_name: str = matcher.group('dir')
check_name: str = matcher.group('check')
agent_dir = rdr.get_agent_dir()
src_file_path = os.path.join(agent_dir, dir_name or '', check_name)
dest_file_path = os.path.join(self._output_script_dir, check_name)
os.makedirs(self._output_script_dir, exist_ok=True)
os.chmod(self._output_script_dir, 0o750)
if copy_file_if_different(src_file_path, dest_file_path):
logger.debug("Copied file '%s' -> '%s'", src_file_path, dest_file_path)
did_sub.append(True)
return rdr.write_path(dest_file_path)
did_sub: list[bool] = []
modified_cmd = self._re_cmd.sub(check_path_matcher, command_line)
return modified_cmd if did_sub else None
def init_logging():
"""Initialises the logging for this module.
Allows for using this file direct from the command lines, as well as via function call.
"""
global logger
logger = logging.getLogger(__name__)
if not len(logger.parent.handlers):
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.DEBUG)
def import_config_if_required(existing_config: AgentConfig) -> bool:
"""Main launcher function for running the config import if it hasn't been run before"""
init_logging()
is_win: bool = is_windows()
ci = ConfigImporter(existing_config, is_win)
return ci.run_if_required()
if __name__ == '__main__':
try:
existing_config: AgentConfig = get_config()
import_config_if_required(existing_config)
except KeyboardInterrupt:
pass