This repository was archived by the owner on Oct 12, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroutes.py
247 lines (197 loc) · 7.86 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import time
import os
import shutil
import json
from os.path import join, exists
from datetime import timedelta
import docker
from django.template.defaultfilters import slugify
from channels import Group
from channels.routing import route
from channels.auth import (
http_session_user,
channel_session_user,
channel_session_user_from_http
)
from .models import (
TestRun,
TestRunDetail,
Test
)
from .exceptions import NucleusException
REPOSITORY = 'registry.gitlab.com/devine-industries/nucleus-tests'
TAG = 'latest'
CURRENT_DIRECTORY = os.getcwd()
# We keep track of the path in the regular Windows format for later.
OUTPUT_DIRECTORY_WIN = join(CURRENT_DIRECTORY, 'results')
# Docker expects our Windows paths to be in the form: /c/Users/Batman/...
OUTPUT_DIRECTORY_DOCKER = OUTPUT_DIRECTORY_WIN.replace('C:\\', '/c/').replace('\\', '/')
# If we are running inside the compose configuration, instruct the dind container
# to write it to the /nucleus folder.
if os.environ.get('NUCLEUS_IN_CONTAINER', False):
OUTPUT_DIRECTORY_WIN = '/nucleus'
OUTPUT_DIRECTORY_DOCKER = '/nucleus'
VOLUMES = {
OUTPUT_DIRECTORY_DOCKER: {
'bind': '/nucleus/results',
'mode': 'rw'
}
}
client = docker.from_env()
@channel_session_user_from_http
def ws_connect(message):
message.reply_channel.send({'accept': True})
_send_message(message.user.email, 'Connected', 'Socket connection established...');
Group(slugify(message.user.email)).add(message.reply_channel)
@channel_session_user
def ws_disconnect(message):
Group(slugify(message.user.email)).discard(message.reply_channel)
def run_tests(message):
'''
Runs the test suite and outputs the results
in the results folder.
You can run this as below, passing the
id of a TestRun model instance.
```python
from channels import Channel
Channel('run-tests').send({
'id': 2 # change to new instance id
})
```
'''
# Try and load the test run model instance
# that this was provided with.
try:
run = TestRun.objects.get(id=message.content.get('id'))
except TestRun.DoesNotExist:
_send_message(None, message.user.email, 'Error', ':: Could not find test run instance.');
return
if run.status == 'Complete':
_send_message(run, message.user.email, 'Complete', ':: Test Run already complete.');
return
try:
# Get details.
student_email = run.student.email
repository_url = run.repository_url
# Send status to websocket.
_send_message(run, student_email, 'Starting', ':: Loading test run details..')
# Update status.
run.status = 'Running'
run.save()
# Log the start time.
start = time.time()
# Set up our environment variables and
# volumes for running the container.
environment = {
'TESTS_STUDENT': student_email,
'TESTS_REPO_URL': repository_url
}
# Remove previous results
_send_message(run, student_email, 'Running', ':: Removing previous results directory..')
student_directory = join(OUTPUT_DIRECTORY_WIN, student_email)
if exists(student_directory):
shutil.rmtree(student_directory)
# Send status to websocket.
_send_message(run, student_email, 'Running', ':: Starting container...')
# Run the container.
image = client.images.pull(REPOSITORY, tag=TAG, auth_config={
'username': os.environ['NUCLEUS_REGISTRY_USERNAME'],
'password': os.environ['NUCLEUS_REGISTRY_PASSWORD']
})
container = client.containers.run(image.id, environment=environment,
volumes=VOLUMES, detach=True)
# As the logs come in, stream them back to the
# listening websocket, if it exists.
for line in container.logs(stream=True):
_send_message(run, student_email, 'Running', line.decode('utf-8'))
# Get the end time.
end = time.time()
# Update the details of the run instance
# with the details from the container.
run.log = container.logs()
run.status = 'Running'
run.test_version = 'N/A'
run.time_taken = timedelta(seconds=end-start)
run.save()
# Send status to websocket.
_send_message(run, student_email, 'Running', ':: Gathering results..')
_collect_results(student_email, run)
except Exception as e:
run.status = 'Error'
run.save()
_send_message(run, student_email, 'Error', str(e))
def _collect_results(student_email, run):
# Check if we can find the results directory for the student.
_check_path(student_email=student_email,
path=join(OUTPUT_DIRECTORY_WIN, student_email),
error='Can\'t find output directory for {}'.format(student_email),
run=run)
# Check if we can find the results directory for the student.
_check_path(student_email=student_email,
path=join(OUTPUT_DIRECTORY_WIN, student_email, 'results.json'),
error='Can\'t find results.json for {}'.format(student_email),
run=run)
# Send status to websocket.
_send_message(run, student_email, 'Running', ':: Processing results..')
# Load the json from the primary results.json file.
with open(join(OUTPUT_DIRECTORY_WIN, student_email, 'results.json')) as f:
data = json.loads(f.read())
for t in data['tests']:
# Our Test model instances are populated automatically from running tests
# so new tests are automatically from the first time they are run.
test, is_new = Test.objects.get_or_create(test=t['test'],
case=t['case'])
# If the test didn't pass, we need to grab the log.
passed = t['passed']
log = ''
if not passed:
# Build an error message if reqd.
message = 'Can\'t find error output for {} test for {}'
message = message.format(str(test), student_email)
error_file = t['error']
path = join(OUTPUT_DIRECTORY_WIN, student_email, error_file)
# Check that the error file exists.
_check_path(student_email=student_email,
path=path, error=message, run=run)
# Read the contents of the error file.
with open(path) as f:
log = f.read()
detail = TestRunDetail(record=run, test=test, passed=passed, log=log)
detail.save()
# Send status to websocket.
_send_message(run, student_email, 'Complete', 'Finished testing.')
# Update the version number.
run.test_version = data['version']
run.status = 'Complete'
run.save()
def _check_path(student_email, path, error, run):
# Check if we can find the results directory for the student.
if not exists(path):
_send_message(run, student_email, 'Error', ':: {}'.format(error))
run.log += '\n{}\n'.format(error)
run.status = 'Error'
run.save()
raise NucleusException(error)
def _send_message(run, student_email, status, message):
# We can only send a dict with text, accept, close and bytes
# so we dump some json to a string and send it in the text
# field.
if not message.endswith('\n'):
message += '\n'
# Log to the console.
if run:
print(':: Test Run for {} ({}): {}: {}'.format(
student_email, run.id, status, message))
else:
print(':: Test Run for {}: {}: {}'.format(
student_email, status, message))
data = {
'status': status,
'message': message
}
Group(slugify(student_email)).send({'text': json.dumps(data)})
channel_routing = [
route('run-tests', run_tests),
route('websocket.connect', ws_connect),
route('websocket.disconnect', ws_disconnect)
]