-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregate_traceroute_features.py
158 lines (137 loc) · 5.88 KB
/
aggregate_traceroute_features.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import argparse
import configparser
import logging
import sys
from collections import defaultdict
from datetime import datetime
from kafka_wrapper.kafka_reader import KafkaReader
from kafka_wrapper.kafka_writer import KafkaWriter
from network_dependency.utils.helper_functions import parse_timestamp_argument
from metis.shared_extract_functions import AS_HOPS_FEATURE, IP_HOPS_FEATURE, \
RTT_FEATURE, VALID_FEATURES, VALID_MODES, \
extract_as_hops, extract_ip_hops, \
extract_rtts
def parse_csv(value: str) -> list:
return [entry.strip() for entry in value.split(',')]
def check_config(config_path: str) -> configparser.ConfigParser:
config = configparser.ConfigParser(converters={'csv': parse_csv})
config.read(config_path)
try:
config.get('input', 'kafka_topic')
config.get('output', 'kafka_topic')
enabled_features = config.getcsv('options', 'enabled_features')
mode = config.get('options', 'mode')
config.get('kafka', 'bootstrap_servers')
except configparser.NoSectionError as e:
logging.error(f'Missing section in config file: {e}')
return configparser.ConfigParser()
except configparser.NoOptionError as e:
logging.error(f'Missing option in config file: {e}')
return configparser.ConfigParser()
for feature in enabled_features:
if feature not in VALID_FEATURES:
logging.error(f'Invalid feature specified: {feature}')
return configparser.ConfigParser()
if mode not in VALID_MODES:
logging.error(f'Invalid mode specified: {mode}')
return configparser.ConfigParser()
return config
def generate_messages(interval_start: int,
interval_end: int,
feature_values: dict) -> dict:
logging.info(f'Building ASN set for interval: {interval_start} - '
f'{interval_end}')
asn_set = set()
for feature in feature_values:
for peer in feature_values[feature]:
for dst in feature_values[feature][peer]:
asn_set.add((peer, dst))
logging.info(f'Found {len(asn_set)} ASN pairs.')
for peer, dst in sorted(asn_set):
msg = {'timestamp': interval_end,
'interval_start': interval_start,
'peer': peer,
'dst': dst,
'features': dict()}
for feature, values in feature_values.items():
if peer in values and dst in values[peer]:
msg['features'][feature] = values[peer][dst]
if not msg['features']:
logging.error(f'No feature for pair {peer} {dst}')
yield msg
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('config')
parser.add_argument('start',
help='Start timestamp (as UNIX epoch in seconds or '
'milliseconds, or in YYYY-MM-DDThh:mm format)')
parser.add_argument('stop',
help='Stop timestamp (as UNIX epoch in seconds or '
'milliseconds, or in YYYY-MM-DDThh:mm format)')
args = parser.parse_args()
log_fmt = '%(asctime)s %(levelname)s %(message)s'
logging.basicConfig(
format=log_fmt,
level=logging.INFO,
filename='aggregate_traceroute_features.log',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.info(f'Started: {sys.argv}')
config = check_config(args.config)
if not config.sections():
sys.exit(1)
start_ts_arg = args.start
start_ts = parse_timestamp_argument(start_ts_arg)
if start_ts == 0:
logging.error(f'Invalid start timestamp specified: {start_ts_arg}')
sys.exit(1)
logging.info(f'Starting read at timestamp: {start_ts} '
f'{datetime.utcfromtimestamp(start_ts).strftime("%Y-%m-%dT%H:%M")}')
end_ts_arg = args.stop
end_ts = parse_timestamp_argument(end_ts_arg)
if end_ts == 0:
logging.error(f'Invalid end timestamp specified: {end_ts_arg}')
sys.exit(1)
logging.info(f'Ending read at timestamp: {end_ts} '
f'{datetime.utcfromtimestamp(end_ts).strftime("%Y-%m-%dT%H:%M")}')
mode = config.get('options', 'mode')
enabled_features = config.getcsv('options', 'enabled_features')
feature_values = {feature: defaultdict(dict)
for feature in enabled_features}
input_topic = config.get('input', 'kafka_topic')
bootstrap_servers = config.get('kafka', 'bootstrap_servers')
reader = KafkaReader([input_topic],
bootstrap_servers,
start_ts * 1000,
end_ts * 1000)
with reader:
for msg in reader.read():
if AS_HOPS_FEATURE in enabled_features:
extract_as_hops(msg,
feature_values[AS_HOPS_FEATURE],
mode)
if IP_HOPS_FEATURE in enabled_features:
extract_ip_hops(msg,
feature_values[IP_HOPS_FEATURE],
mode)
if RTT_FEATURE in enabled_features:
extract_rtts(msg,
feature_values[RTT_FEATURE],
mode)
output_topic = config.get('output', 'kafka_topic')
writer = KafkaWriter(output_topic,
bootstrap_servers,
num_partitions=10,
# 6 months
config={'retention.ms': 15552000000})
msg_count = 0
with writer:
for msg in generate_messages(start_ts, end_ts, feature_values):
writer.write(msg['peer'],
msg,
end_ts * 1000)
msg_count += 1
logging.info(f'Generated {msg_count} messages.')
if __name__ == '__main__':
main()
sys.exit(0)