7
7
"""
8
8
from argparse import ArgumentParser
9
9
from datetime import datetime
10
+ from datetime import timedelta
10
11
from datetime import timezone
11
12
from uuid import UUID
12
13
27
28
UUID_1_EPOCH = datetime (1582 , 10 , 15 , tzinfo = timezone .utc )
28
29
UUID_TICKS = 10000000
29
30
UUID_VARIANT_1 = 0b1000000000000000
30
- ROWS_TO_COMMIT_AT_ONCE = 100000
31
+ ROWS_TO_COMMIT_AT_ONCE = 10000
32
+ MONTHS_TO_KEEP = 3
33
+ MIGRATION_TIME_FRAME = MONTHS_TO_KEEP * 30 * 24 * 60 * 60 # 3 months in seconds
34
+ MIGRATION_LIMIT = 10000000000 # mostly for testing purposes
31
35
32
36
Base = declarative_base ()
33
37
session = None
36
40
TABLE_MAP = [
37
41
("groups" , Group ),
38
42
("projects" , Project ),
43
+ ("widgetConfigs" , WidgetConfig ),
39
44
("runs" , Run ),
40
45
("results" , Result ),
41
- ("widgetConfigs" , WidgetConfig ),
42
46
]
43
47
FILE_MAP = [
44
48
# only convert artifacts, the reports in the existing DB aren't particularly useful
60
64
"run" ,
61
65
]
62
66
67
+ # indexes for the tables
68
+ INDEXES = {
69
+ "results" : [
70
+ "CREATE INDEX results_env ON results(env);" ,
71
+ "CREATE INDEX results_component ON results(component);" ,
72
+ "CREATE INDEX results_project ON results(project_id);" ,
73
+ "CREATE INDEX results_start_time ON results(start_time);" ,
74
+ "CREATE INDEX results_run ON results(run_id);" ,
75
+ ],
76
+ "runs" : [
77
+ "CREATE INDEX runs_env ON runs(env);" ,
78
+ "CREATE INDEX runs_component ON runs(component);" ,
79
+ "CREATE INDEX runs_project ON runs(project_id);" ,
80
+ "CREATE INDEX runs_start_time ON runs(start_time);" ,
81
+ ],
82
+ "widget_configs" : ["CREATE INDEX widget_configs_project ON widget_configs(project_id);" ],
83
+ "artifacts" : [
84
+ "CREATE INDEX artifact_result ON artifacts(result_id);" ,
85
+ "CREATE INDEX artifact_upload_date ON artifacts(upload_date);" ,
86
+ ],
87
+ }
88
+
63
89
64
90
def is_uuid (candidate ):
65
91
"""Determine if this is a uuid"""
@@ -112,15 +138,25 @@ def setup_postgres(postgres_url):
112
138
def migrate_table (collection , Model , vprint ):
113
139
"""Migrate a collection from MongoDB into a table in PostgreSQL"""
114
140
# TODO: update indexes once we know them
115
- # indexes = collection.list_indexes()
116
- # conn = Base.metadata.bind.connect()
117
- # for idx in indexes:
118
- # if idx['name'] == '_id_':
119
- # continue
120
- # vprint('.', end='')
121
- # sql = convert_index(idx, Model.__tablename__)
122
- # conn.execute(sql)
123
- for idx , row in enumerate (collection .find ()):
141
+ conn = Base .metadata .bind .connect ()
142
+ for sql_index in INDEXES .get (Model .__tablename__ , []):
143
+ vprint ("." , end = "" )
144
+ conn .execute (sql_index )
145
+
146
+ # for runs and results, sort by descending start_time
147
+ if Model .__tablename__ == "runs" or Model .__tablename__ == "results" :
148
+ sort = [("start_time" , - 1 )]
149
+ most_recent_record = collection .find_one (sort = sort )
150
+ most_recent_start_time = most_recent_record ["start_time" ]
151
+ # only include most recent runs and results
152
+ filter_ = {"start_time" : {"$gt" : most_recent_start_time - MIGRATION_TIME_FRAME }}
153
+ else :
154
+ sort = None
155
+ filter_ = None
156
+
157
+ for idx , row in enumerate (collection .find (filter_ , sort = sort )):
158
+ if idx > MIGRATION_LIMIT :
159
+ break
124
160
vprint ("." , end = "" )
125
161
mongo_id = row .pop ("_id" )
126
162
# overwrite id with PSQL uuid
@@ -136,54 +172,71 @@ def migrate_table(collection, Model, vprint):
136
172
137
173
# promote some metadata fields to the appropriate column
138
174
for field in FIELDS_TO_PROMOTE :
139
- if row .get ("metadata" ) and field in row ["metadata" ]:
175
+ if row .get ("metadata" ) and row ["metadata" ]. get ( field ) :
140
176
row [field ] = row ["metadata" ][field ]
141
177
# convert some ObjectId's to UUID's
142
178
for field in ID_FIELDS :
143
- if field == "project" :
144
- if row . get ( "metadata" ) and field in row [ "metadata" ] :
179
+ if row . get ( "metadata" ) and row [ "metadata" ]. get ( field ) :
180
+ if field == "project" :
145
181
row ["project_id" ] = convert_objectid_to_uuid (row ["metadata" ][field ])
146
182
# also update the metadata field
147
183
row ["metadata" ][field ] = row ["project_id" ]
148
- elif field == "run" :
149
- if row .get ("metadata" ) and field in row ["metadata" ]:
184
+ elif field == "run" :
150
185
row ["run_id" ] = convert_objectid_to_uuid (row ["metadata" ][field ])
151
186
# also update the metadata field
152
187
row ["metadata" ][field ] = row ["run_id" ]
153
- elif field in ["result_id" , "resultId" ]:
154
- if row .get ("metadata" ) and field in row ["metadata" ]:
188
+ elif field in ["result_id" , "resultId" ]:
155
189
row ["result_id" ] = convert_objectid_to_uuid (row ["metadata" ][field ])
156
- else :
157
- if row .get ("metadata" ) and field in row ["metadata" ]:
190
+ else :
158
191
row ["metadata" ][field ] = convert_objectid_to_uuid (row ["metadata" ][field ])
192
+
159
193
obj = Model .from_dict (** row )
160
194
session .add (obj )
161
195
if idx % ROWS_TO_COMMIT_AT_ONCE == 0 :
162
196
session .commit ()
163
197
session .commit ()
198
+ # at the end of the session do a little cleanup
199
+ if Model .__tablename__ == "runs" or Model .__tablename__ == "results" :
200
+ conn = Base .metadata .bind .connect ()
201
+ # delete any results or runs without start_time
202
+ sql_delete = f"DELETE FROM { Model .__tablename__ } where start_time IS NULL;"
203
+ conn .execute (sql_delete )
164
204
vprint (" done" )
165
205
166
206
167
207
def migrate_file (collection , Model , vprint ):
168
208
"""Migrate a GridFS collection from MongoDB into a table in PostgreSQL"""
169
209
# Access the underlying collection object
170
210
# TODO: update indexes once we know them
171
- # indexes = collection._collection.list_indexes()
172
- # conn = Base.metadata.bind.connect()
173
- # for idx in indexes:
174
- # if idx['name'] == '_id_':
175
- # continue
176
- # vprint('.', end='')
177
- # sql = convert_index(idx, Model.__tablename__)
178
- # conn.execute(sql)
179
- for idx , row in enumerate (collection .find ({})):
211
+ conn = Base .metadata .bind .connect ()
212
+ for sql_index in INDEXES .get (Model .__tablename__ , []):
213
+ vprint ("." , end = "" )
214
+ conn .execute (sql_index )
215
+
216
+ # for runs and results, sort by descending start_time
217
+ if Model .__tablename__ == "artifacts" :
218
+ sort = [("_id" , - 1 )] # in MongoDB sorting by id is like sorting by a 'created' field
219
+ most_recent_record = [x for x in collection .find (limit = 1 , sort = sort )][0 ]
220
+ most_recent_upload_date = most_recent_record .upload_date
221
+ # only include most recent runs and results
222
+ filter_ = {
223
+ "uploadDate" : {"$gt" : (most_recent_upload_date - timedelta (days = 30 * MONTHS_TO_KEEP ))}
224
+ }
225
+ else :
226
+ sort = None
227
+ filter_ = None
228
+
229
+ for idx , row in enumerate (collection .find (filter_ , sort = sort )):
230
+ if idx > MIGRATION_LIMIT :
231
+ break
180
232
vprint ("." , end = "" )
181
233
pg_id = convert_objectid_to_uuid (row ._id )
182
- data = {}
234
+ data = dict ()
183
235
data ["metadata" ] = row .metadata
184
236
data ["id" ] = pg_id
185
237
data ["filename" ] = row .filename
186
238
data ["content" ] = row .read ()
239
+ data ["upload_date" ] = row .upload_date
187
240
for field in ID_FIELDS :
188
241
if field == "resultId" :
189
242
data ["result_id" ] = convert_objectid_to_uuid (row .metadata [field ])
@@ -198,14 +251,15 @@ def migrate_file(collection, Model, vprint):
198
251
vprint (" done" )
199
252
200
253
201
- def migrate_tables (mongo , vprint ):
254
+ def migrate_tables (mongo , vprint , migrate_files = False ):
202
255
"""Migrate all the tables"""
203
256
for collection , model in TABLE_MAP :
204
257
vprint ("Migrating {} " .format (collection ), end = "" )
205
258
migrate_table (mongo [collection ], model , vprint )
206
- for collection , model in FILE_MAP :
207
- vprint ("Migrating {} " .format (collection ), end = "" )
208
- migrate_file (GridFSBucket (mongo , collection ), model , vprint )
259
+ if migrate_files :
260
+ for collection , model in FILE_MAP :
261
+ vprint ("Migrating {} " .format (collection ), end = "" )
262
+ migrate_file (GridFSBucket (mongo , collection ), model , vprint )
209
263
210
264
211
265
def build_mongo_connection (url ):
@@ -225,6 +279,7 @@ def parse_args():
225
279
parser .add_argument ("mongo_url" , help = "URL to MongoDB database" )
226
280
parser .add_argument ("postgres_url" , help = "URL to PostgreSQL database" )
227
281
parser .add_argument ("-v" , "--verbose" , action = "store_true" , help = "Say what I'm doing" )
282
+ parser .add_argument ("-f" , "--files" , action = "store_true" , help = "Migrate artifact files" )
228
283
return parser .parse_args ()
229
284
230
285
@@ -234,7 +289,7 @@ def main():
234
289
mongo_url , database = build_mongo_connection (args .mongo_url )
235
290
mongo = get_mongo (mongo_url , database )
236
291
setup_postgres (args .postgres_url )
237
- migrate_tables (mongo , vprint )
292
+ migrate_tables (mongo , vprint , args . files )
238
293
239
294
240
295
if __name__ == "__main__" :
0 commit comments