(.)(.) dev thoughts ...

Distributed architecture with celery
2018-03-25 22:03:00

For side project I am working on I have a problem with long running tasks that blocks requests inside http server for 1 to 30 seconds depending on the task.
I decided to use flask for this project since it have nice blueprint feature that each could be transformed into microservice when necessary.
To scale my http servers and distribute workload among other machines I need some broker / publish subscribe / some sort of queue.
Since all my backend stack is in python for rapid development I decided to use celery with redis.
Redis because I can also use it as session store and leverage as cache layer and I don't need to introduce another element or language if I try ex. RabbitMQ
I also decided to store my task results in postgresql since I like sql database to store important data and I can use one of the nice plugins for flask - Flask SQLAlchemy
And also I can later convert my database to some cloud solution like this great cockroachdb created by former google employees without scarifying code since it's using postgres driver.
Since I wrote about almost whole stack but one I want to mention last part which is haproxy that will be my load balancer of choice.
I will draw whole part in great opensource UML plugin/standalone tool umlet. But please forgive me uml champions for my inconvenient diagram.
Celery workers schema
Ok so now to the celery itself since it's healthy to eat it.
First I want all my workers be classes so I can define some internal methods if I want to.
Obviously I am not building distributed calculator from examples but actual application ( I hope so )
So instead of this :

  
@app.task
def add(a, b):  
    return a+b

my actual task looks like this:

  
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import celery

class Hello(celery.Task):  
    serializer = 'json'
    name = 'tasks.hello'
    def run(self, *args, **kwargs):
        return 'hello {0}'.format(*args)

and it's located in some file called test.py in package foo
my celery task runner runner.py on the other hand besides some checking if database for results exist looks like that:

  
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import celery  
import foo.test

app = celery.Celery("hello")  
app.config_from_object('foo.conf')  
app.tasks.register(foo.test.Hello())  

let me also paste here details from my config file named conf located in foo package:

  
#!/usr/bin/env python
# -*- coding: utf-8 -*-

task_serializer = 'json'  
result_serializer = 'json'  
accept_content = ['json']

timezone = 'Europe/London'  
enable_utc = True

broker_url = 'redis://localhost:6379/1'

result_backend = 'db+sqlite:///result.db'  

with that in place I can start my worker from command line using:
celery --app=runner worker --loglevel=info it would result with nice log output where I can check that my task test.hello exists

 -------------- [email protected] v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- bla-bla-bla-64bit 2018-03-25 22:08:17
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         hello:0x10e81b080
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     sqlite:///result.db
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . test.hello

[2018-03-25 22:08:18,403: INFO/MainProcess] Connected to redis://localhost:6379/1
[2018-03-25 22:08:18,424: INFO/MainProcess] mingle: searching for neighbors
[2018-03-25 22:08:19,497: INFO/MainProcess] mingle: all alone
[2018-03-25 22:08:19,518: INFO/MainProcess] [email protected] ready.

Now if I would like to use database other then sqlite I need to check if connection and database exists. See documentation for details.
Ok so now I can run my test_celery.py
located in test directory

  
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import celery  
import foo.test

app = celery.Celery('hello')  
app.config_from_object('foo.conf')

def hello_test():  
    t = foo.test.Hello()
    result = t.delay('siema')
    res = result.get(timeout=1)
    print(res)

if __name__ == '__main__':  
    hello_test()

from command line:
PYTHONPATH=./ python3 test/test_celery.py and get response:
hello siema on the other hand in celery console I got:

  
[2018-03-25 22:16:39,838: INFO/MainProcess] Received task: test.hello[c49e0bac-5587-48da-8c01-72ae4a53f93e]  
[2018-03-25 22:16:39,974: INFO/ForkPoolWorker-2] Task test.hello[c49e0bac-5587-48da-8c01-72ae4a53f93e] succeeded in 0.13112432399066165s: 'hello siema'

I can also see my peresisted result inside result.db using great opensource sqlitebrowser.
There should be one entry inside celery_taskmeta table and if we click twice into result there should be hello siema inside that blob.
sqlite celery task table
So that's it I can now leverage multiple files with workers and use them as I want to. Also I can run 2 workers in 2 terminals to see which one would pick task and how the tasks are distributed.
All the code from this celery example is available here.
enjoy