This repository has been archived by the owner on Sep 16, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_connection.py
189 lines (143 loc) · 6.27 KB
/
data_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""
Created a data connection object to make it easier to reimplement without
affecting the main api app code.
Originally started to implement a mongodb connector out of curiousity but
spent more time debugging the connection and decided to do a simulated
data implementation instead.
"""
import json
import sys
import os
class dataconn(object):
def __init__(self, csv_file=None, json_quickload=None):
# Let us simulate persistence
self.DATASET = {}
persistent_path = "data_fulldump.json"
if os.path.exists(persistent_path):
self.DATASET = json.load(open(persistent_path, "r"))
# TODO: Implement a way to digest errors,
# currently just a placeholder to store the data.
self.error_list = {}
if csv_file is not None:
self._csv_parse(csv_file)
def write_data_row(self, dict_rowdata):
"""
#### Input:
- Dictionary data row
#### Desc:
To depict writing data into a database.
If the id exists, what should we do? Update or return collision error?
Went ahead with collision error.
"""
if dict_rowdata["id"] in self.DATASET:
self.error_list[dict_rowdata["id"]] = dict_rowdata
self.error_list[dict_rowdata["id"]]["error"] = "doc_id collision"
else:
self.DATASET[dict_rowdata["id"]] = dict_rowdata
# TODO: Implement proper data verification to reduce junk data.
if "id" not in dict_rowdata:
return False, "Missing Fields"
return True, dict_rowdata["id"]
def commit(self):
# Primitive approach, but just write new to file to "save"
json.dump(self.DATASET, open("data_fulldump.json", "w+"))
def get_data_row_iter(self):
"""
TODO: Implement a real iterator instead of creating a new data struct
#### Output:
- Should return an iterator, so it can just go through all the rows.
"""
list_alldata = []
for row_id in self.DATASET:
row = self.DATASET[row_id]
list_alldata.append(row)
return list_alldata
def get_data_row_by_id(self, id):
# If we need to search a real DB by document ID, do it here.
return self.DATASET[id]
def _get_data_from_line(self, str_line, len_fieldnames):
"""
#### Input:
- The string we will parse
- Length of fields
#### Output:
- List of values
#### Description:
Since the description of the location has commas. Will throw off a
normal split(","). Using standard lib csv to parse will fail because
description includes new lines. (I might be doing something wrong here)
So had to build manual line parser. by reading the lines one by one.
Should be a better way to determine if name string is complete.
What we know, a string with special char are wrapped in double quotes.
Could determine position of double quotes.
"""
# Name can throw off the split function. So grab the first element and
# iterate backwards. Assume whatever is left as the Name as rejoin.
lst_dataline = str_line.rstrip().split(",")
data_list = []
data_list.append(lst_dataline.pop(0))
int_num_of_cols_look = len_fieldnames - 1
list_data_hold = []
for x in range(int_num_of_cols_look):
list_data_hold.append(lst_dataline.pop(-1))
# Whatever is left should be the "name".
# TODO: better optimization, this is just quick and dirty.
list_data_hold.append(",".join(lst_dataline))
# Because we were working backwards, we need to reverse the list.
data_list.extend(reversed(list_data_hold))
return data_list
def _csv_parse(self, csv_file_path):
"""
#### Input:
- CSV file path
#### Desc:
Just parse the CSV file line by line and write to the simulated
DATASET variable in the object.
"""
fileread_csv = open(csv_file_path, "r", newline='')
header_row = None
expected_commas = 0
len_col = 0
build_string = ""
for data_line in fileread_csv:
# Assume the first row is the header row. will get col's names
if header_row is None:
header_row = data_line.rstrip().split(",")
len_col = len(header_row)
expected_commas = len_col - 1
else:
# Make an assumption data lines can span multiple lines
build_string += data_line
# If we are short on expected commas, then the request will
# fail due to not enough info.
if build_string.count(",") < expected_commas:
continue
else:
# Will pass if we have enough column values to work with.
lst_rowdata = self._get_data_from_line(
build_string, expected_commas)
dict_rowdata = {}
for x in range(0, len_col):
dict_rowdata[header_row[x]] = lst_rowdata[x]
# Try to convert to lat and long, required fields.
# If they fail, then save error row and we can use that
# to send an error message.
try:
dict_rowdata['latitude'] = float(
dict_rowdata['latitude'])
dict_rowdata['longitude'] = float(
dict_rowdata['longitude'])
self.write_data_row(dict_rowdata)
except ValueError:
self.error_list[dict_rowdata["id"]] = dict_rowdata
self.error_list[dict_rowdata["id"]]["error"] = (
"lat long float conv error")
# Assuming we have to build each string before processing,
# so reinitialize it as empty.
build_string = ""
if __name__ == "__main__":
# E.G.: python3 data_connection.py AB_NYC_2019.csv
data_file = sys.argv[1]
dbc = dataconn(csv_file=data_file)
# Dump it into json for quick loading using the json.load()
dbc.commit()