Async Tasks Management

Goal: many web application tasks take time to complete. For such tasks it is better to submit the tasks to a queue and continue normal application, while checking the status of the tasks periodically.


Start the redis backend. It requires docker installation.

$ docker run -it -p 6379:6379 --name redis-db redis

1:C 18 Aug 08:12:55.246 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo

1:C 18 Aug 08:12:55.246 # Redis version=4.0.11, bits=64, commit=00000000, modified=0, pid=1, just started

1:C 18 Aug 08:12:55.246 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf

_._

_.-``__ ''-._

_.-`` `. `_. ''-._ Redis 4.0.11 (00000000/0) 64 bit

.-`` .-```. ```\/ _.,_ ''-._

( ' , .-` | `, ) Running in standalone mode

|`-._`-...-` __...-.``-._|'` _.-'| Port: 6379

| `-._ `._ / _.-' | PID: 1

`-._ `-._ `-./ _.-' _.-'

|`-._`-._ `-.__.-' _.-'_.-'|

| `-._`-._ _.-'_.-' | http://redis.io

`-._ `-._`-.__.-'_.-' _.-'

|`-._`-._ `-.__.-' _.-'_.-'|

| `-._`-._ _.-'_.-' |

`-._ `-._`-.__.-'_.-' _.-'

`-._ `-.__.-' _.-'

`-._ _.-'

`-.__.-'


1:M 18 Aug 08:12:55.250 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.

1:M 18 Aug 08:12:55.250 # Server initialized

1:M 18 Aug 08:12:55.250 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.

1:M 18 Aug 08:12:55.251 * Ready to accept connections

...


Create a flask application

from flask import Flask

from celery import Celery


def make_celery(app):

celery = Celery(

app.import_name,

backend=app.config['CELERY_RESULT_BACKEND'],

broker=app.config['CELERY_BROKER_URL']

)

celery.conf.update(app.config)


class ContextTask(celery.Task):

def __call__(self, *args, **kwargs):

with app.app_context():

return self.run(*args, **kwargs)


celery.Task = ContextTask

return celery


app = Flask(__name__)

app.config.update(

CELERY_BROKER_URL='redis://localhost:6379',

CELERY_RESULT_BACKEND='redis://localhost:6379'

)


celery = make_celery(app)


@app.route("/")

def index():

return "Hello world"

@celery.task(name = "app.celery")

def reverse(name):

return name[::-1]


@app.route("/reverse/<name>")

def process(name):

result = reverse.delay(name)

return "Task submitted"


if __name__ == "__main__":

app.run(debug = True, host = "0.0.0.0", port = 5000)



Start the flask app

$ python app.py

* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

* Restarting with stat

* Debugger is active!

* Debugger PIN: 763-063-315

* Detected change in '/Users/abulbasar/workspace/python/celery-flask/app.py', reloading

* Restarting with stat

* Debugger is active!

* Debugger PIN: 763-063-315

^C[celery-flask]$ python app.py

* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

* Restarting with stat

* Debugger is active!

* Debugger PIN: 763-063-315

127.0.0.1 - - [21/Aug/2018 01:04:02] "GET / HTTP/1.1" 200 -

127.0.0.1 - - [21/Aug/2018 01:04:08] "GET /reverse/abul HTTP/1.1" 200 -

^C[celery-flask]$

[celery-flask]$ python app.py

* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

* Restarting with stat

* Debugger is active!

* Debugger PIN: 763-063-315

127.0.0.1 - - [21/Aug/2018 01:05:16] "GET /reverse/abul HTTP/1.1" 200 -

127.0.0.1 - - [21/Aug/2018 01:05:32] "GET /reverse/abul HTTP/1.1" 200 -

127.0.0.1 - - [21/Aug/2018 01:05:33] "GET /reverse/abul HTTP/1.1" 200 -

127.0.0.1 - - [21/Aug/2018 01:10:06] "GET /reverse/abul HTTP/1.1" 200 -



Star celery task manager

$ celery -A app.celery worker --loglevel=info

-------------- celery@einext01 v4.2.1 (windowlicker)

---- **** -----

--- * *** * -- Darwin-17.5.0-x86_64-i386-64bit 2018-08-21 01:09:56

-- * - **** ---

- ** ---------- [config]

- ** ---------- .> app: app:0x10819e7b8

- ** ---------- .> transport: redis://localhost:6379//

- ** ---------- .> results: redis://localhost:6379/

- *** --- * --- .> concurrency: 4 (prefork)

-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)

--- ***** -----

-------------- [queues]

.> celery exchange=celery(direct) key=celery


[tasks]

. app.celery


[2018-08-21 01:09:56,662: INFO/MainProcess] Connected to redis://localhost:6379//

[2018-08-21 01:09:56,677: INFO/MainProcess] mingle: searching for neighbors

[2018-08-21 01:09:57,723: INFO/MainProcess] mingle: all alone

[2018-08-21 01:09:57,753: INFO/MainProcess] celery@einext01 ready.

[2018-08-21 01:10:06,990: INFO/MainProcess] Received task: app.celery[252d279c-7a2e-492e-a25c-f771698d3d6b]

[2018-08-21 01:10:07,036: INFO/ForkPoolWorker-2] Task app.celery[252d279c-7a2e-492e-a25c-f771698d3d6b] succeeded in 0.039711610006634146s: 'luba'


Deploy flask app using gunicorn

Gunicorn is WSGI (web service gateway interface). We create WSGI entry point so that any WSGI-capable application server such as Nginx can interface with it.

$ cat gunicorn.sh

#!/usr/bin/env bash

PROCESSOR_COUNT=$(nproc)

GUNICORN_WORKER_COUNT=$((PROCESSOR_COUNT*2+1))

gunicorn -w ${GUNICORN_WORKER_COUNT} -b 0.0.0.0:5000 app:app


$ bash gunicorn.sh

[2018-08-21 10:45:08 +0530] [45175] [INFO] Starting gunicorn 19.9.0

[2018-08-21 10:45:08 +0530] [45175] [INFO] Listening at: http://0.0.0.0:5000 (45175)

[2018-08-21 10:45:08 +0530] [45175] [INFO] Using worker: sync

[2018-08-21 10:45:08 +0530] [45178] [INFO] Booting worker with pid: 45178

^C[2018-08-21 10:48:06 +0530] [45175] [INFO] Handling signal: int

[2018-08-21 10:48:06 +0530] [45178] [INFO] Worker exiting (pid: 45178)

[2018-08-21 10:48:06 +0530] [45175] [INFO] Shutting down: Master