2
2
import logging
3
3
4
4
from celery .exceptions import TimeoutError
5
- from celery .result import AsyncResult
6
- from celery .states import FAILURE , PENDING , SUCCESS
5
+ # from celery.result import AsyncResult
6
+ from celery .states import FAILURE , PENDING , STARTED , SUCCESS
7
7
from django .contrib .auth .decorators import login_required
8
8
from django .core import serializers
9
9
from django .http import (
22
22
23
23
from app .constants .str import PERMISSION_DENIED
24
24
from app .models import Item
25
- from app .worker .app_celery import ATTEMPT_LIMIT , PROGRESS
25
+ from app .worker .app_celery import ATTEMPT_LIMIT
26
26
from app .worker .tasks import receiptor
27
27
from app .worker .tasks .exporter import exporter
28
28
from app .worker .tasks .importers import historical_data_importer
29
+ from reboot .celery import app
29
30
30
31
logger = logging .getLogger (__name__ )
31
-
32
+ tasks_cache = {}
33
+ results_cache = {}
32
34
33
35
@require_GET
34
36
@login_required (login_url = "/login" )
@@ -121,14 +123,21 @@ def poll_state(request: HttpRequest):
121
123
request = request ,
122
124
err_msg = "The task_id query parameter of the request was omitted." )
123
125
124
- task = AsyncResult (task_id )
126
+ task = app . AsyncResult (task_id )
125
127
res = JsonResponse (_poll_state (PENDING , 0 , 200 ))
128
+ print (f"!!! task id={ task_id } ,state={ task .state } ,successful={ task .successful ()} ,ready={ task .ready ()} ,failed={ task .failed ()} " )
126
129
if task .state == FAILURE or task .failed ():
127
130
res = JsonResponse (_poll_state (FAILURE , 0 , 400 ))
128
- elif task .state == PROGRESS :
131
+ elif task .state == STARTED :
129
132
res = JsonResponse (task .result ) if isinstance (
130
133
task .result , dict ) else HttpResponse (task .result )
131
134
elif task .state == SUCCESS or task .successful () or task .ready ():
135
+ tasks_cache [task_id ] = task
136
+ try :
137
+ results_cache [task_id ] = task .get (timeout = 5 )
138
+ print ("!!! saved" , results_cache [task_id ], task .result )
139
+ except Exception as e :
140
+ print (f"!!! error" , e )
132
141
res = HttpResponse (SUCCESS )
133
142
return res
134
143
@@ -142,13 +151,22 @@ def download_file(request: HttpRequest):
142
151
task_id = request .GET .get ("task_id" )
143
152
task_name = request .GET .get ("task_name" , "task" )
144
153
attempts = 0
145
- # CloudAMQP free tier is unstable and must be circuit breakered
154
+ if task_id in results_cache :
155
+ return results_cache [task_id ]
146
156
while (attempts < ATTEMPT_LIMIT ):
147
157
try :
148
158
attempts += 1
149
- task = AsyncResult (task_id )
150
- result = task .get (timeout = 0.5 * attempts )
159
+ # if tasks_cache[task_id]:
160
+ # task = tasks_cache[task_id]
161
+ # del tasks_cache[task_id]
162
+ # else:
163
+ # task = app.AsyncResult(task_id)
164
+ task = tasks_cache [task_id ] if task_id in tasks_cache else app .AsyncResult (task_id )
165
+ print (f"!!! task id={ task_id } ,state={ task .state } ,successful={ task .successful ()} ,ready={ task .ready ()} ,failed={ task .failed ()} " )
166
+ result = task .get (timeout = 1.0 * attempts )
151
167
print (f"{ task } { task_name } success #{ attempts } : { result } " )
168
+ if task_id in tasks_cache :
169
+ del tasks_cache [task_id ]
152
170
break
153
171
except TimeoutError :
154
172
print (f"{ task } { task_name } failed #{ attempts } " )
@@ -158,6 +176,7 @@ def download_file(request: HttpRequest):
158
176
err_msg = "Download exceeded max attempts" )
159
177
return result
160
178
except Exception as e :
179
+ print (f"!!! error" , e )
161
180
return _error (request = request , err_msg = f"Failed to download file: { e } " )
162
181
163
182
0 commit comments