-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathutils.py
111 lines (94 loc) · 3.94 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: utf-8 -*-
"""
Created on Fri Aug 6 10:10:44 2021
"""
import pandas as pd
import time
import seaborn as sns
import matplotlib.pyplot as plt
class SimpleTimer:
def __init__(self):
self.start = None
self.end = None
self.elapsed = None
def __enter__(self):
self.start = time.perf_counter_ns()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end = time.perf_counter_ns()
self.elapsed = self.end - self.start
class ResultsLogger(object):
def __init__(self, path):
self.path = path
self.run_logs = []
def log(self, row: dict):
self.run_logs.append(row)
def write(self):
df = pd.DataFrame(self.run_logs)
df.to_csv(self.path)
def scale_workers(client, n_workers, timeout=300):
client.cluster.scale(n_workers)
m = len(client.has_what().keys())
start = end = time.perf_counter_ns()
while ((m != n_workers) and (((end - start) / 1e9) < timeout)):
time.sleep(5)
m = len(client.has_what().keys())
end = time.perf_counter_ns()
if (((end - start) / 1e9) >= timeout):
raise RuntimeError(f"Failed to rescale cluster in {timeout} sec."
"Try increasing timeout for very large containers,"
"and verify available compute resources.")
def visualize_data_cuml(path, size=(12, 8)):
# Returns the latencies from the cuML
# Plots the graph with melted dataframe
perf_df = pd.read_csv(path)
perf_df = perf_df.loc[:, ~perf_df.columns.str.contains('^Unnamed:')]
dd = pd.melt(perf_df,
id_vars=['n_workers'],
value_vars=['overall', 'data_read', 'data_preprocessing',
'hashing_vectorizer', 'tfidf_transformer'],
var_name='latency')
plt.figure(figsize=size, dpi=100, facecolor='w', edgecolor='k')
sns.boxplot(x='latency', y='value', data=dd, orient="v", hue="n_workers")
plt.xlabel("Overall Latency and latencies of different stages")
plt.ylabel("Latency in Seconds")
plt.show()
return perf_df, dd
def visualize_data(path, size=(12, 8)):
# Returns the latencies from the Spark and Scikit results
# Plots the graph with melted dataframe
perf_df = pd.read_csv(path)
perf_df = perf_df.loc[:, ~perf_df.columns.str.contains('^Unnamed:')]
dd = pd.melt(perf_df,
id_vars=['n_workers'],
value_vars=['overall', 'data_read', 'data_preprocessing',
'hashing_vectorizer', 'tfidf_transformer'],
var_name='latency')
plt.figure(figsize=size, dpi=100, facecolor='w', edgecolor='k')
sns.boxplot(x='latency', y='value', data=dd, orient="v")
plt.xlabel("Overall Latency and latencies of different stages")
plt.ylabel("Latency in Seconds")
plt.show()
return perf_df, dd
def visualize_data_spark_adjusted(path):
# Returns the adjusted dataframe with the latencies of each
# stage calculated from the cumulative latencies
perf_df = pd.read_csv(path)
perf_df = perf_df.loc[:, ~perf_df.columns.str.contains('^Unnamed:')]
perf_df["tfidf_transformer"] = perf_df["tfidf_transformer"] - \
perf_df["hashing_vectorizer"]
perf_df["hashing_vectorizer"] = perf_df["hashing_vectorizer"] - \
perf_df["data_preprocessing"]
perf_df["data_preprocessing"] = perf_df["data_preprocessing"] - \
perf_df["data_read"]
plt.figure(figsize=(12, 8), dpi=100, facecolor='w', edgecolor='k')
dd = pd.melt(perf_df,
id_vars=['n_workers'],
value_vars=['overall', 'data_read', 'data_preprocessing',
'hashing_vectorizer', 'tfidf_transformer'],
var_name='latency')
sns.boxplot(x='latency', y='value', data=dd, orient="v")
plt.xlabel("Overall Latency and latencies of different stages")
plt.ylabel("Latency in Seconds")
plt.show()
return perf_df, dd