Skip to content
This repository was archived by the owner on Aug 10, 2024. It is now read-only.

Commit 817ed8f

Browse files
committed
feat: improve celery stability
1 parent d2abb22 commit 817ed8f

File tree

6 files changed

+36
-24
lines changed

6 files changed

+36
-24
lines changed

app/views/views.py

+14-8
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
import logging
33

44
from celery.exceptions import TimeoutError
5-
from celery.result import AsyncResult
6-
from celery.states import FAILURE, PENDING, SUCCESS
5+
# from celery.result import AsyncResult
6+
from celery.states import FAILURE, PENDING, STARTED, SUCCESS
77
from django.contrib.auth.decorators import login_required
88
from django.core import serializers
99
from django.http import (
@@ -22,13 +22,14 @@
2222

2323
from app.constants.str import PERMISSION_DENIED
2424
from app.models import Item
25-
from app.worker.app_celery import ATTEMPT_LIMIT, PROGRESS
25+
from app.worker.app_celery import ATTEMPT_LIMIT
2626
from app.worker.tasks import receiptor
2727
from app.worker.tasks.exporter import exporter
2828
from app.worker.tasks.importers import historical_data_importer
29+
from reboot.celery import app
2930

3031
logger = logging.getLogger(__name__)
31-
32+
tasks_cache = {}
3233

3334
@require_GET
3435
@login_required(login_url="/login")
@@ -121,14 +122,16 @@ def poll_state(request: HttpRequest):
121122
request=request,
122123
err_msg="The task_id query parameter of the request was omitted.")
123124

124-
task = AsyncResult(task_id)
125+
task = app.AsyncResult(task_id)
125126
res = JsonResponse(_poll_state(PENDING, 0, 200))
127+
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
126128
if task.state == FAILURE or task.failed():
127129
res = JsonResponse(_poll_state(FAILURE, 0, 400))
128-
elif task.state == PROGRESS:
130+
elif task.state == STARTED:
129131
res = JsonResponse(task.result) if isinstance(
130132
task.result, dict) else HttpResponse(task.result)
131133
elif task.state == SUCCESS or task.successful() or task.ready():
134+
tasks_cache[task_id] = task
132135
res = HttpResponse(SUCCESS)
133136
return res
134137

@@ -142,12 +145,15 @@ def download_file(request: HttpRequest):
142145
task_id = request.GET.get("task_id")
143146
task_name = request.GET.get("task_name", "task")
144147
attempts = 0
148+
if tasks_cache[task_id]:
149+
result = tasks_cache[task_id].get(timeout=2)
145150
# CloudAMQP free tier is unstable and must be circuit breakered
146151
while (attempts < ATTEMPT_LIMIT):
147152
try:
148153
attempts += 1
149-
task = AsyncResult(task_id)
150-
result = task.get(timeout=0.5 * attempts)
154+
task = app.AsyncResult(task_id)
155+
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
156+
result = task.get(timeout=1.0 * attempts)
151157
print(f"{task} {task_name} success #{attempts}: {result}")
152158
break
153159
except TimeoutError:

app/worker/app_celery.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,33 @@
11
import traceback
2-
import celery
3-
from celery.states import SUCCESS, FAILURE
42
from http import HTTPStatus
53

6-
PROGRESS = 'PROGRESS'
4+
from celery.states import FAILURE, STARTED, SUCCESS
5+
6+
from reboot.celery import app
77

88
ATTEMPT_LIMIT = 5
99

1010

1111
def update_state(state, percent, http_status):
1212
print('{0!r} state: {1!r}, progress: {2!r}'.format(
13-
celery.current_task.request.id, state, percent))
14-
celery.current_task.update_state(state=state, meta={
13+
app.current_task.request.id, state, percent))
14+
app.current_task.update_state(state=state, meta={
1515
'state': state,
1616
'process_percent': percent,
1717
'status': http_status,
1818
})
1919

2020

2121
def update_percent(percent):
22-
update_state(PROGRESS, percent, HTTPStatus.ACCEPTED)
22+
update_state(STARTED, percent, HTTPStatus.ACCEPTED)
2323

2424

2525
def set_success():
2626
update_state(SUCCESS, 100, HTTPStatus.OK)
2727

2828

2929
def set_failure(e):
30-
celery.current_task.update_state(
30+
app.current_task.update_state(
3131
state=FAILURE,
3232
meta={
3333
'exc_type': type(e).__name__,
@@ -38,7 +38,7 @@ def set_failure(e):
3838
})
3939

4040

41-
class AppTask(celery.Task):
41+
class AppTask(app.Task):
4242
max_retries = 0
4343
# default_retry_delay = 10
4444

app/worker/tasks/__init__.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
'''
22
Module for tasks to be sent on task queue
33
'''
4-
from celery import task
5-
64
from app.worker.app_celery import AppTask
5+
# from celery import task
6+
from reboot.celery import app
7+
78
from .create_receipt import Receiptor
89

910

10-
@task(bind=True, base=AppTask)
11+
@app.task(bind=True, base=AppTask)
1112
def receiptor(self, queryset, total_count):
1213
receiptor = Receiptor(queryset, total_count)
1314
return receiptor()

app/worker/tasks/exporter.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import csv
22

3-
from celery import task
3+
# from celery import task
44
from celery.utils.log import get_task_logger
55
from django.core import serializers
66
from django.db.models.query import QuerySet
77
from django.http import HttpResponse
88

99
from app.constants.field_names import CURRENT_FIELDS
1010
from app.worker.app_celery import AppTask, update_percent
11+
from reboot.celery import app
1112

1213

13-
@task(bind=True, base=AppTask)
14+
@app.task(bind=True, base=AppTask)
1415
def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0):
1516
rows = serializers.deserialize('json', qs)
1617
csv_exporter = CsvExporter(file_name, rows, total_count)
+4-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
"""
22
Module for csv file importers to be sent to queue
33
"""
4-
from celery import task
4+
# from celery import task
55

66
from app.worker.app_celery import AppTask
7+
from reboot.celery import app
8+
79
from .historical_data_importer import HistoricalDataImporter
810

911

10-
@task(bind=True, base=AppTask)
12+
@app.task(bind=True, base=AppTask)
1113
def historical_data_importer(self, csvpath):
1214
importer = HistoricalDataImporter(csvpath)
1315
importer()

reboot/celeryconfig.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
broker_pool_limit = 1
99
event_queue_expires = 60
1010
worker_prefetch_multiplier = 1
11-
worker_concurrency = 10
11+
worker_concurrency = 1
1212
accept_content = ['json', 'pickle']
1313
result_backend = config("REDIS_URL")
1414
task_serializer = 'pickle'
1515
result_serializer = 'pickle'
16+
task_track_started = True
17+
task_ignore_result = False
1618

1719
# Use PROD settings if valid CLOUDAMQP_URl, else dev
1820
if config('CLOUDAMQP_URL', default=False):

0 commit comments

Comments
 (0)