-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlambda_function.py
163 lines (152 loc) · 6.54 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import json, email, requests, os, time
from email import policy
from sparkpost import SparkPost
from sparkpost.exceptions import SparkPostAPIException
# read config from env vars, where present
def getConfig():
cfg ={
'sparkpost_host': os.getenv('SPARKPOST_HOST', 'https://api.sparkpost.com'),
'sparkpost_api_key': os.getenv('SPARKPOST_API_KEY'),
'attachmentscanner_api_key': os.getenv('ATTACHMENTSCANNER_API_KEY'),
'from_email': os.getenv('FROM_EMAIL'),
'helpdesk_endpoint': os.getenv('HELPDESK_ENDPOINT'),
'max_attachment_size': int(os.getenv('MAX_ATTACHMENT_SIZE')) # TODO: error checking
}
if not cfg['sparkpost_host'].startswith('https://'):
cfg['sparkpost_host'] = 'https://' + cfg['sparkpost_host'] # Add schema
if cfg['sparkpost_host'].endswith('/'):
cfg['sparkpost_host'] = cfg['sparkpost_host'][:-1] # Strip /
for k, v in cfg.items():
if v == None:
print('Environment var {} not set - stopping'.format(k))
exit(1)
else:
return cfg
# start scan with payload p, appending the unique ID to the scanResource list
def startScan(fname, p, cfg):
headers = {
'accept': 'application/json',
'authorization': 'bearer '+cfg['attachmentscanner_api_key']
}
res = requests.post('https://beta.attachmentscanner.com/requests', headers=headers, files= {'file': (fname, p)})
if res.status_code == 200:
return res
else:
print('Unexpected return code', res.status_code)
exit(1)
def scanResults(r, cfg):
headers = {
'accept': 'application/json',
'authorization': 'bearer ' + cfg['attachmentscanner_api_key']
}
res = requests.get('https://beta.attachmentscanner.com/requests/'+r, headers=headers)
if res.status_code == 200:
return res
else:
print('Unexpected return code', res.status_code)
exit(1)
# Inject the messages into SparkPost for delivery to recipients, using the specified transmission parameters
def sendToRecips(sp, recipBatch, sendObj):
print(' To', str(len(recipBatch)).rjust(5, ' '),'recipients | campaign "'+sendObj['campaign']+'" | ', end='', flush=True)
# Compose in additional API-call parameters
sendObj.update({
'recipients': recipBatch,
})
startT = time.time()
try:
res = sp.transmissions.send(**sendObj) # Unpack for the call
endT = time.time()
if res['total_accepted_recipients'] != len(recipBatch):
print(res)
else:
print('OK - in', round(endT - startT, 3), 'seconds')
return res['total_accepted_recipients'], ''
except SparkPostAPIException as err:
errMsg = 'error code ' + str(err.status) + ' : ' + str(err.errors)
print(errMsg)
return 0, errMsg
# Handle SparkPost Inbound Relay Webhook carrying one or more messages
# TODO: check Authorisation in SparkPost inbound relay webhook
def lambda_handler(event, context):
# Collect config
cfg = getConfig()
print('Max attachment size: {} bytes'.format(cfg['max_attachment_size']))
for i in event:
m = i['msys']['relay_message']
print('From:', m['msg_from'])
print('To:', m['rcpt_to'])
c= m['content']
print('Subject:', c['subject'])
rxMail = c['email_rfc822']
msg = email.message_from_string(rxMail, policy=policy.default)
pendingScans = []
scanVerdict = {
'ok': 0,
'bad': 0,
'toobig': 0
}
for part in msg.walk():
# multipart/* are just containers
if part.get_content_maintype() == 'multipart':
continue
ct = part.get_content_type()
if ct == 'text/plain' or ct == 'text/html':
pass
else:
p = part.get_payload(decode=True)
fname = part.get_filename()
print('Attachment: {}, {} bytes'.format(fname, len(p)))
if len(p) > cfg['max_attachment_size']:
scanVerdict['toobig'] += 1
res = startScan(fname, p, cfg)
verdict = res.json()['status']
if verdict == 'ok':
scanVerdict['ok'] += 1
elif verdict == 'found':
scanVerdict['bad'] += 1
elif verdict == 'pending':
pendingScans.append(res.json()['id'])
elif verdict == 'failed':
print('Scan failed', res.text())
# Now collect in the pending results which take some time to appear
while(len(pendingScans) > 0):
time.sleep(0.4) # self rate-limiting towards scanner endpoint
for j in range(0, len(pendingScans)):
res = scanResults(pendingScans[j], cfg)
verdict = res.json()['status']
if verdict == 'ok':
scanVerdict['ok'] += 1
del pendingScans[j]
elif verdict == 'found':
scanVerdict['bad'] += 1
del pendingScans[j]
elif verdict == 'pending':
pass
elif verdict == 'failed':
print('Scan failed', res.text())
# All results now in
print('Scans OK: {}, Bad: {}, Too big: {}'.format(scanVerdict['ok'], scanVerdict['bad'], scanVerdict['toobig']))
sp = SparkPost(api_key=cfg['sparkpost_api_key'], base_uri=cfg['sparkpost_host'])
print('Opened connection to', sp.base_uri)
txObj = {
'campaign': 'helpdesk-filter-replies',
'track_opens': True,
'track_clicks': True,
'substitution_data': {'max_attachment_size': str(cfg['max_attachment_size'] / (1024 * 1024)) + ' MB'}
}
recipList = [msg['From']]
if scanVerdict['bad'] > 0 or scanVerdict['toobig'] > 0:
print('Verdict: reject')
txObj['template'] = 'reject-inbound'
done, err = sendToRecips(sp, recipList, txObj)
else:
print('Verdict: accept')
res = requests.post(cfg['helpdesk_endpoint'], json={'message': m, 'scanVerdict': scanVerdict})
print('Helpdesk endpoint {} replied: {}'.format(cfg['helpdesk_endpoint'], res.status_code))
txObj['template'] = 'accept-inbound'
done, err = sendToRecips(sp, recipList, txObj)
return 'Done'
# Test code
with open('rq.json') as f:
body = json.loads(f.read())
print(lambda_handler(body, None))