2
2
import logging
3
3
4
4
from celery .exceptions import TimeoutError
5
- from celery .result import AsyncResult
6
- from celery .states import FAILURE , PENDING , SUCCESS
5
+ # from celery.result import AsyncResult
6
+ from celery .states import FAILURE , PENDING , STARTED , SUCCESS
7
7
from django .contrib .auth .decorators import login_required
8
8
from django .core import serializers
9
9
from django .http import (
22
22
23
23
from app .constants .str import PERMISSION_DENIED
24
24
from app .models import Item
25
- from app .worker .app_celery import ATTEMPT_LIMIT , PROGRESS
25
+ from app .worker .app_celery import ATTEMPT_LIMIT
26
26
from app .worker .tasks import receiptor
27
27
from app .worker .tasks .exporter import exporter
28
28
from app .worker .tasks .importers import historical_data_importer
29
+ from reboot .celery import app
29
30
30
31
logger = logging .getLogger (__name__ )
31
-
32
+ tasks_cache = {}
32
33
33
34
@require_GET
34
35
@login_required (login_url = "/login" )
@@ -121,14 +122,16 @@ def poll_state(request: HttpRequest):
121
122
request = request ,
122
123
err_msg = "The task_id query parameter of the request was omitted." )
123
124
124
- task = AsyncResult (task_id )
125
+ task = app . AsyncResult (task_id )
125
126
res = JsonResponse (_poll_state (PENDING , 0 , 200 ))
127
+ print (f"!!! task id={ task_id } ,state={ task .state } ,successful={ task .successful ()} ,ready={ task .ready ()} ,failed={ task .failed ()} " )
126
128
if task .state == FAILURE or task .failed ():
127
129
res = JsonResponse (_poll_state (FAILURE , 0 , 400 ))
128
- elif task .state == PROGRESS :
130
+ elif task .state == STARTED :
129
131
res = JsonResponse (task .result ) if isinstance (
130
132
task .result , dict ) else HttpResponse (task .result )
131
133
elif task .state == SUCCESS or task .successful () or task .ready ():
134
+ tasks_cache [task_id ] = task
132
135
res = HttpResponse (SUCCESS )
133
136
return res
134
137
@@ -142,12 +145,16 @@ def download_file(request: HttpRequest):
142
145
task_id = request .GET .get ("task_id" )
143
146
task_name = request .GET .get ("task_name" , "task" )
144
147
attempts = 0
145
- # CloudAMQP free tier is unstable and must be circuit breakered
146
148
while (attempts < ATTEMPT_LIMIT ):
147
149
try :
148
150
attempts += 1
149
- task = AsyncResult (task_id )
150
- result = task .get (timeout = 0.5 * attempts )
151
+ if tasks_cache [task_id ]:
152
+ task = tasks_cache [task_id ]
153
+ del tasks_cache [task_id ]
154
+ else :
155
+ task = app .AsyncResult (task_id )
156
+ print (f"!!! task id={ task_id } ,state={ task .state } ,successful={ task .successful ()} ,ready={ task .ready ()} ,failed={ task .failed ()} " )
157
+ result = task .get (timeout = 1.0 * attempts )
151
158
print (f"{ task } { task_name } success #{ attempts } : { result } " )
152
159
break
153
160
except TimeoutError :
@@ -158,6 +165,7 @@ def download_file(request: HttpRequest):
158
165
err_msg = "Download exceeded max attempts" )
159
166
return result
160
167
except Exception as e :
168
+ print (f"!!! error" , e )
161
169
return _error (request = request , err_msg = f"Failed to download file: { e } " )
162
170
163
171
0 commit comments