Skip to content

Commit

Permalink
automate processing data logger files from an FTP file
Browse files Browse the repository at this point in the history
If you set the dataloggerfile description for a datalogger file to an FTP or HTTP file then  a script will be generated to begin automatically checking the file once an hour for new  values and will ingest them into the database.
  • Loading branch information
Miguel Leon committed Apr 11, 2019
1 parent 4f75bc8 commit 464fec9
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 55 deletions.
10 changes: 10 additions & 0 deletions managecli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env python
import os
import sys

# if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "templatesAndSettings.settings.development")

from django.core.management import execute_from_command_line

execute_from_command_line(sys.argv)
87 changes: 71 additions & 16 deletions odm2admin/management/commands/ProcessDataLoggerFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@ def getEndDate(results):
#EndDateProperty = Extensionproperties.objects.get(propertyname__icontains="end date")
#enddate = Resultextensionpropertyvalues.objects.filter(resultid=results.resultid).filter(
# propertyid=EndDateProperty).get()
enddate = Timeseriesresultvalues.objects.filter(resultid=results.resultid.resultid).annotate(
Max('valuedatetime')). \
order_by('-valuedatetime')[0].valuedatetime.strftime('%Y-%m-%d %H:%M:%S.%f')

enddate = None
try:
enddate = Timeseriesresultvalues.objects.filter(resultid=results.resultid.resultid).annotate(
Max('valuedatetime')). \
order_by('-valuedatetime')[0].valuedatetime.strftime('%Y-%m-%d %H:%M:%S.%f')
except IndexError:
return None
return enddate


Expand Down Expand Up @@ -116,13 +119,19 @@ def add_arguments(self, parser):
' file locking does not occur allowing for repeated execution' +
' in a cron job or other automation')
parser.add_argument('reversed', nargs=1, type=str, default=False)

parser.add_argument('emailaddress', nargs='?', type=str, default='')

def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson, cmd):
# cmdline = bool(options['cmdline'][0])
filename = str(options['dataloggerfilelink'][0])
file = str(settings.MEDIA_ROOT) + filename # args[0].name
fileid = int(options['dataloggerfileid'][0])
try:
print('email address')
emailaddress = str(options['emailaddress'])
print(emailaddress)
except IndexError:
emailaddress = ''
fileid = Dataloggerfiles.objects.filter(dataloggerfileid=fileid).get()
check_dates = False
reversed = False
Expand All @@ -139,6 +148,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
rowColumnMap = list()
bulktimeseriesvalues = []
bulkcount = 0
valuesadded =0
upper_bound_quality_type = CvDataqualitytype.objects.get(name='Physical limit upper bound')
lower_bound_quality_type = CvDataqualitytype.objects.get(name='Physical limit lower bound')
result_lower_bound = None
Expand All @@ -149,6 +159,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
exceldatetime = False
emailtext = ""
dateTimeColNum = 0

pdlf = ProcessDataloggerfile.objects.get(dataloggerfileid=fileid)
if not pdlf.processingCode == 'done' and not pdlf.processingCode=='locked':
if not cmdline:
Expand All @@ -160,6 +171,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
tolist.append(admin['email'])
try:
with io.open(file, 'rt', encoding='ascii') as f:
print('begin processing ' + str(pdlf))
# reader = csv.reader(f)
columnsinCSV = None
reader, reader2 = itertools.tee(csv.reader(f))
Expand Down Expand Up @@ -217,6 +229,8 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
elif i >= databeginson:
rawdt = row[dateTimeColNum].strip()
# assume date is first column for the moment
dateT = None
datestr = ''
try:
dateT = time.strptime(rawdt, "%m/%d/%Y %H:%M") # '1/1/2013 0:10
datestr = time.strftime("%Y-%m-%d %H:%M:%S", dateT)
Expand Down Expand Up @@ -265,6 +279,11 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
# mrs = Results.objects.filter(
# resultid__in=DataloggerfilecolumnSet.values("resultid"))
# mrvs = Timeseriesresultvalues.objects.filter(resultid__in=mrs)
for dlfc in rowColumnMap:
if len(row) < dlfc.columnnum:
print('bad row ' +str(row))
continue

for colnum in rowColumnMap:
if stop_reading_reversed:
break
Expand Down Expand Up @@ -317,7 +336,19 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
'each column. Both results and time series ' +
'results are needed.')
# only one measurement result is allowed per result
value = row[colnum.columnnum]
# print('error')
# print(str(row))
# print(str(colnum.columnlabel))
# print(str(colnum.columnnum))
try:
value = row[colnum.columnnum]
except IndexError:
continue
if value == '':
# print("error")
# skip blank value
continue

# print("value to save")
# print(value)
censorcode = CvCensorcode.objects.filter(name="Not censored").get()
Expand All @@ -332,9 +363,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
tsvrbulk = True
# print(mresults)
try:
if value == '':
# print("error")
raise IntegrityError

if check_dates:
try:
enddatestr = getEndDate(mresults)
Expand All @@ -348,7 +377,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
if reversed:
stop_reading_reversed = True
break
except ObjectDoesNotExist:
except (ObjectDoesNotExist, TypeError) as e:
pass
try:
newdatavalue = float(row[colnum.columnnum])
Expand All @@ -357,6 +386,9 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
newdatavalue = float('NaN')
else:
continue
if datestr == '':
print('bad date on row: ' +str(row))
raise IntegrityError
qualitycode = qualitycodegood
# print(newdatavalue)
if dataqualitybool:
Expand Down Expand Up @@ -387,6 +419,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
)
annotation.save()
tsvr.save()
valuesadded +=1
tsvrbulk = False
tsrva = Timeseriesresultvalueannotations(valueid=tsvr,
annotationid=annotation).save()
Expand Down Expand Up @@ -420,6 +453,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
)
annotation.save()
tsvr.save()
valuesadded += 1
tsvrbulk = False
tsrva = Timeseriesresultvalueannotations(valueid=tsvr,
annotationid=annotation).save()
Expand Down Expand Up @@ -465,6 +499,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
)
annotation.save()
tsvr.save()
valuesadded += 1
tsvrbulk = False
tsrva = Timeseriesresultvalueannotations(valueid=tsvr,
annotationid=annotation).save()
Expand Down Expand Up @@ -511,6 +546,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
)
annotation.save()
tsvr.save()
valuesadded += 1
tsvrbulk = False
# print(tsvr)
tsrva = Timeseriesresultvalueannotations(valueid=tsvr,
Expand Down Expand Up @@ -546,11 +582,13 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
if tsvrbulk:
bulktimeseriesvalues.append(tsvr)
bulkcount +=1
valuesadded += 1
if bulkcount > 20000:
Timeseriesresultvalues.objects.bulk_create(bulktimeseriesvalues)
del bulktimeseriesvalues[:]
tsvr = None
bulkcount = 0
# print("saved value")
# print("saved value - bulk create")
if tsvr is None and not tsvrbulk:
tsvr = Timeseriesresultvalues(
resultid=mresults,
Expand All @@ -565,6 +603,7 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
.intendedtimespacingunitsid
)
tsvr.save()
valuesadded += 1
except IntegrityError:
pass
# Timeseriesresultvalues.delete()
Expand All @@ -575,11 +614,13 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
break
# Timeseriesresults.objects.raw("SELECT odm2.\
# "TimeseriesresultValsToResultsCountvalue\"()")
# print('last bulk create')
Timeseriesresultvalues.objects.bulk_create(bulktimeseriesvalues)
bulktimeseriesvalues = None
del bulktimeseriesvalues[:]

except IndexError:
raise ValidationError('encountered a problem with row ' + str(i) for i in row)
# raise ValidationError('encountered a problem with row ' + str(i) for i in row)
pass
bulkpropertyvals = []
for colnum in rowColumnMap:
results = Timeseriesresults.objects.filter(resultid=colnum.resultid)
Expand All @@ -598,9 +639,23 @@ def handle(self, *args, **options): # (f,fileid, databeginson,columnheaderson,
# bulkpropertyvals.append(repvend)
# will bulk create or update the property values
# Resultextensionpropertyvalues.objects.bulk_create(bulkpropertyvals)
#print('email?')
# print('email?')
# pdlf.processingCode = 'done'
# pdlf.save()
if sendemail:
email = EmailMessage(emailtitle, emailtext, settings.EMAIL_FROM_ADDRESS, tolist)
print('email')
print(emailtext)
# print('email')
# print(emailtext)
email.send()
emailtitle = 'ODM2 Admin - file processing complete for: ' + str(pdlf)
emailtext = 'ODM2 Admin - file processing complete for: ' + str(pdlf) + ' \n'
emailtext += 'total time series result values add: ' + str(valuesadded) + ' \n'
emailtext += 'These time series were updated: \n'
tolist.append(emailaddress)
for colnum in rowColumnMap:
results = Timeseriesresults.objects.filter(resultid=colnum.resultid)
for result in results:
emailtext += str(result)
email2 = EmailMessage(emailtitle, emailtext, settings.EMAIL_FROM_ADDRESS, tolist)
email2.send()
# print('email sent')
Loading

0 comments on commit 464fec9

Please sign in to comment.