-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask.py
82 lines (69 loc) · 2.62 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import sys
import collections.abc
from dbruntime import UserNamespaceInitializer
def parse_args():
"""
Parse arguments from sys.argv into dictionary.
Repeated arguments will be grouped into an array.
For example, "--foo=bar --foo=party --bar=foo" will generate the following:
dict(
"foo": ["bar", "party"],
"bar": "foo"
)
"""
args = {}
for arg in sys.argv[1:]:
split = arg.split("=", 1)
key = split[0]
if key.startswith("--"):
key = key[2:]
val = split[1]
if key in args:
if not isinstance(args[key], collections.abc.Sequence):
args[key] = [args[key]]
args[key].append(val)
else:
args[key] = val
return args
def main():
"""
This program will take in the following arguments:
--securable
Securable that needs to be materialized
--projection_selection_clause
Selects specific columns from the securable
--row_selection_clause
Predicates pushdown clause used to filter the rows
--location
The location the materialization will be stored
It will then read the securable, apply projection selection and row selection clauses if available, then write it to the specified storage location.
"""
args = parse_args()
user_namespace_initializer = UserNamespaceInitializer.getOrCreate()
spark = user_namespace_initializer.namespace_globals["spark"]
# Read the securable
securable = args["securable"]
print(f"Attempting to materialize {securable} to location")
print("Reading credentials")
dbutils = user_namespace_initializer.namespace_globals["dbutils"]
print(dbutils.credentials.getCurrentCredentials())
print("Print Spark conf")
print(spark.conf.getAll)
print(f"Reading {securable}")
data_frame = spark.read.table(securable)
# Apply projection selection clause if specified
if "projection_selection_clause" in args:
projection_selection_clause = args["projection_selection_clause"]
print(f"Applying {projection_selection_clause} to dataframe")
data_frame = data_frame.select(projection_selection_clause)
# Apply row selection clause if specified
if "row_selection_clause" in args:
row_selection_clause = args["row_selection_clause"]
print(f"Applying {row_selection_clause} to dataframe")
data_frame = data_frame.filter(args["row_selection_clause"])
# Save the materialization
print("Saving materialization...")
location = args["location"]
data_frame.write.format("delta").save(location)
if __name__ == '__main__':
main()