Skip to content

Commit 0cc9631

Browse files
author
Kenny Ng
committed
implemented dockerfile and docker compose file
1 parent 7ebb2e9 commit 0cc9631

11 files changed

+143
-50
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,4 @@ cython_debug/
157157
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
158158
# and can be added to the global gitignore or merged into this file. For a more nuclear
159159
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
160-
#.idea/
160+
.idea/

README.md

+32-6
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,34 @@
1-
# celery-fastapi-integration
1+
# FastAPI Celery Integration Example
2+
3+
## Forword
4+
I created this repository to show a barebone implementation of integrating FastAPI with Celery.
5+
Unlike other repositories on GitHub demonstrating the same possibility,
6+
flower is not used in this one as I wanted to make it as simple as it can be.
7+
8+
In this example, rabbitmq is used as the message broker and redis is used as the result backend.
9+
10+
Also, I added an example on how async function can be used on celery.
211

312
## Project Setup
4-
1. Run `pip install -r requirements` to install required libraries
5-
2. Run `docker run -d -p 5672:5672 rabbitmq` to run rabbitmq on docker, in which we will use it as the message broker
6-
3. Run `docker run -d -p 6379:6379 redis` to start redis, in which we use as the result backend for celery
7-
3. Run `celery -A tasks worker --loglevel=INFO` to start celery
8-
4. Run `uvicorn api.main:app --reload` to start FastAPI
13+
### Docker Build
14+
You can host the project by running the following commands:
15+
```
16+
# build image for celery
17+
docker build . -f worker.Dockerfile -t worker
18+
# build image for api
19+
docker build . -f api.Dockerfile -t api
20+
# host celery, fastapi, rabbitmq and redis containers together
21+
docker-compose up -d --build
22+
```
23+
After hosting the containers on Docker, you can go to http://localhost:8000/docs to go the swagger UI or just call the API directly.
24+
25+
## Usage
26+
By trigger the `POST run_task` endpoint, a task which takes 10 second will be trigger. A task ID will be issued for tracking progress and getting result.
27+
28+
`GET status` can be used to track the status of a task with its ID.
29+
30+
`GET result` can be used to get the result of the task.
31+
32+
## Contact
33+
Please give the repo a star if you like it!
34+
This app is written by Kenny Ng ([email protected]).

api.Dockerfile

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM python:3.8-slim
2+
3+
WORKDIR /app
4+
5+
COPY requirements.txt ./
6+
7+
RUN pip install -r requirements.txt
8+
9+
COPY . .
10+
11+
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]

api/main.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from typing import Union
2-
from task_queue import tasks
2+
from worker import tasks
33
from celery.result import AsyncResult
44
from fastapi import FastAPI
55
from pydantic import BaseModel
6-
from task_queue.tasks import wait_10_sec
6+
from worker.tasks import example_task
77

88
app = FastAPI()
99

1010

11-
class InputDto(BaseModel):
11+
class ExampleInputModel(BaseModel):
1212
message: str = ''
1313

1414

@@ -17,14 +17,14 @@ def read_root():
1717
return {"Hello": "World"}
1818

1919

20-
@app.post("/run_task")
21-
def run_task(input_dto: InputDto):
20+
@app.post("/example_task")
21+
def run_task(input_dto: ExampleInputModel):
2222
json_body = input_dto.dict()
23-
result = wait_10_sec.delay(json_body)
23+
result = example_task.delay(json_body)
2424
return {'task_id': result.id}
2525

2626

27-
@app.get("/task_status")
27+
@app.get("/status")
2828
def get_task_status(task_id):
2929
task_result = AsyncResult(id=task_id)
3030
res = {
@@ -34,11 +34,11 @@ def get_task_status(task_id):
3434
return res
3535

3636

37-
@app.get("/tasK_result")
37+
@app.get("/result")
3838
def get_result(task_id):
3939
task_result = AsyncResult(task_id)
4040
result = {
4141
"id": task_id,
42-
"task_result": task_result.result
42+
"task_result": task_result.get()
4343
}
4444
return result

docker-compose.yml

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
version: '3'
2+
3+
services:
4+
redis:
5+
image: redis
6+
ports:
7+
- 6379:6379
8+
9+
rabbitmq:
10+
image: rabbitmq
11+
ports:
12+
- 5672:5672
13+
- 15672:15672
14+
15+
api:
16+
image: api:latest
17+
ports:
18+
- 8000:8000
19+
depends_on:
20+
- rabbitmq
21+
- redis
22+
23+
worker:
24+
image: worker:latest
25+
ports:
26+
- 5555:5555
27+
depends_on:
28+
- rabbitmq
29+
- redis

requirements.txt

+26-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,26 @@
1-
celery
2-
asyncio
3-
asgiref
4-
fastapi
1+
amqp==5.1.1
2+
anyio==3.6.2
3+
asgiref==3.6.0
4+
async-timeout==4.0.2
5+
asyncio==3.4.3
6+
billiard==3.6.4.0
7+
celery==5.2.7
8+
click==8.1.3
9+
click-didyoumean==0.3.0
10+
click-plugins==1.1.1
11+
click-repl==0.2.0
12+
fastapi==0.95.2
13+
h11==0.14.0
14+
idna==3.4
15+
kombu==5.2.4
16+
prompt-toolkit==3.0.38
17+
pydantic==1.10.7
18+
pytz==2023.3
19+
redis==4.5.5
20+
six==1.16.0
21+
sniffio==1.3.0
22+
starlette==0.27.0
23+
typing_extensions==4.5.0
24+
uvicorn==0.22.0
25+
vine==5.0.0
26+
wcwidth==0.2.6

task_queue/tasks.py

-29
This file was deleted.

test.py

-1
This file was deleted.

worker.Dockerfile

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM python:3.8-slim
2+
3+
WORKDIR /app
4+
5+
COPY requirements.txt ./
6+
7+
RUN pip install -r requirements.txt
8+
9+
COPY . .
10+
11+
CMD celery -A worker.tasks worker --loglevel=INFO
File renamed without changes.

worker/tasks.py

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from celery import Celery
2+
from asgiref.sync import async_to_sync
3+
import asyncio
4+
5+
# Initializing celery app
6+
app = Celery('tasks', backend='redis://redis:6379', broker='pyamqp://rabbitmq:5672')
7+
app.set_default()
8+
app.conf.update(task_track_started=True)
9+
app.conf.update(result_persistent=True)
10+
app.conf.update(worker_send_task_events=False)
11+
12+
13+
# A dummy async function which emulates the behavior of computations
14+
async def wait_10_sec_and_return(user_input):
15+
await asyncio.sleep(10)
16+
return user_input['message']
17+
18+
19+
@app.task
20+
def example_task(user_input):
21+
# Turning async function into a sync one, as async functions are not supported in celery
22+
async_service_to_sync = async_to_sync(wait_10_sec_and_return)
23+
return async_service_to_sync(user_input)
24+

0 commit comments

Comments
 (0)