-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathset_edge_weights.py
156 lines (132 loc) · 5.96 KB
/
set_edge_weights.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
import argparse
import csv
import logging
import os
import sys
import typing
from typing import Dict, Iterator, Optional, Tuple
from common import IgnoresConfiguration, IncludeChange
from filter_include_changes import filter_changes
from include_analysis import ParseError, parse_raw_include_analysis_output
from utils import (
GENERATED_FILE_PREFIX_REGEX,
get_include_analysis_edges_centrality,
get_include_analysis_edge_expanded_sizes,
get_include_analysis_edge_prevalence,
get_include_analysis_edge_sizes,
load_config,
)
def set_edge_weights(
changes_file: typing.TextIO,
edge_weights: Dict[str, Dict[str, int]],
filter_third_party: bool = False,
ignores: IgnoresConfiguration = None,
header_mappings: Dict[str, str] = None,
) -> Iterator[Tuple[IncludeChange, int, str, str, Optional[int]]]:
"""Set edge weights in the include changes output"""
change_type_value: str
filtered_changes = filter_changes(
csv.reader(changes_file),
filter_third_party=filter_third_party,
ignores=ignores,
header_mappings=header_mappings,
)
for change_type_value, line, filename, include, *_ in filtered_changes:
change_type = IncludeChange.from_value(change_type_value)
change = (line, filename, include)
if change_type is IncludeChange.REMOVE:
# Strip off the path prefix for generated file includes so matching will work
filename = GENERATED_FILE_PREFIX_REGEX.match(filename).group(1)
include = GENERATED_FILE_PREFIX_REGEX.match(include).group(1)
# For now, only removes have edge weights
if filename not in edge_weights:
logging.warning(f"Skipping filename not found in weights, file may be removed: {filename}")
elif include not in edge_weights[filename]:
# Include may be a relative filename
absolute_path = os.path.join(os.path.dirname(filename), include)
if absolute_path not in edge_weights[filename]:
logging.warning(f"Skipping edge not found in weights: {filename},{include}")
else:
include = absolute_path
else:
change = change + (edge_weights[filename][include],)
elif change_type is IncludeChange.ADD:
# TODO - Some metric for how important they are to add, if there
# is one? Maybe something like the ratio of occurrences to
# direct includes, suggesting it's used a lot, but has lots
# of missing includes? That metric wouldn't really work well
# since leaf headers of commonly included headers would end
# up with a high ratio, despite not really being important to
# add anywhere. Maybe there's no metric here and instead an
# analysis is done at the end to rank headers by how many
# suggested includes there are for that file.
pass
full_change: Tuple[IncludeChange, int, str, str, Optional[int]] = (change_type_value, *change)
yield full_change
def main():
parser = argparse.ArgumentParser(description="Set edge weights in include changes output")
parser.add_argument(
"changes_file",
type=argparse.FileType("r"),
help="CSV of include changes to set edge weights for.",
)
parser.add_argument(
"include_analysis_output",
type=argparse.FileType("r"),
help="The include analysis output to use.",
)
parser.add_argument(
"--metric",
choices=["centrality", "expanded_size", "input_size", "prevalence"],
default="input_size",
help="Metric to use for edge weights.",
)
parser.add_argument("--config", help="Name of config file to use.")
parser.add_argument("--filter-third-party", action="store_true", help="Filter out third_party/ (excluding blink) and v8.")
parser.add_argument("--no-filter-ignores", action="store_true", help="Don't filter out ignores.")
parser.add_argument("--verbose", action="store_true", default=False, help="Enable verbose logging.")
args = parser.parse_args()
try:
include_analysis = parse_raw_include_analysis_output(args.include_analysis_output.read())
except ParseError as e:
message = str(e)
print("error: Could not parse include analysis output file")
if message:
print(message)
return 2
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
config = None
ignores = None
if args.config:
config = load_config(args.config)
if config and not args.no_filter_ignores:
ignores = config.ignores
csv_writer = csv.writer(sys.stdout)
if args.metric == "input_size":
edge_weights = get_include_analysis_edge_sizes(include_analysis, config.includeDirs if config else None)
elif args.metric == "expanded_size":
edge_weights = get_include_analysis_edge_expanded_sizes(
include_analysis, config.includeDirs if config else None
)
elif args.metric == "centrality":
edge_weights = get_include_analysis_edges_centrality(include_analysis, config.includeDirs if config else None)
elif args.metric == "prevalence":
edge_weights = get_include_analysis_edge_prevalence(include_analysis, config.includeDirs if config else None)
try:
for row in set_edge_weights(
args.changes_file, edge_weights, filter_third_party=args.filter_third_party, ignores=ignores, header_mappings=config.headerMappings if config else None
):
csv_writer.writerow(row)
sys.stdout.flush()
except BrokenPipeError:
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())
sys.exit(1)
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
pass # Don't show the user anything