How it works...

In order to use Celery, the first thing to do is to run the RabbitMQ service, and then execute the Celery worker server (that is, the addTask.py file script) by typing the following:

C:>celery -A addTask worker --loglevel=info

The output is as follows:

Microsoft Windows [Versione 10.0.17134.648]
(c) 2018 Microsoft Corporation. Tutti i diritti sono riservati.

C:UsersGiancarlo>cd C:UsersGiancarloDesktopPython Parallel Programming CookBook 2nd editionPython Parallel Programming NEW BOOKchapter_6 - Distributed Pythonesempi

C:UsersGiancarloDesktopPython Parallel Programming CookBook 2nd editionPython Parallel Programming NEW BOOKchapter_6 - Distributed Pythonesempi>celery -A addTask worker --loglevel=info

-------------- celery@pc-giancarlo v4.2.2 (windowlicker)
---- **** -----
--- * *** * -- Windows-10.0.17134 2019-04-01 21:32:37
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x1deb8f46940
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. addTask.add

[2019-04-01 21:32:37,650: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2019-04-01 21:32:37,745: INFO/MainProcess] mingle: searching for neighbors
[2019-04-01 21:32:39,353: INFO/MainProcess] mingle: all alone
[2019-04-01 21:32:39,479: INFO/SpawnPoolWorker-2] child process 10712 calling self.run()
[2019-04-01 21:32:39,512: INFO/SpawnPoolWorker-3] child process 10696 calling self.run()
[2019-04-01 21:32:39,536: INFO/MainProcess] celery@pc-giancarlo ready.
[2019-04-01 21:32:39,551: INFO/SpawnPoolWorker-1] child process 6084 calling self.run()
[2019-04-01 21:32:39,615: INFO/SpawnPoolWorker-4] child process 2080 calling self.run()

Then, the second script is launched using Python:

C:>python addTask_main.py

Finally, the result should be as follows in the first Command Prompt:

[2019-04-01 21:33:00,451: INFO/MainProcess] Received task: addTask.add[6fc350a9-e925-486c-bc41-c239ebd96041]
[2019-04-01 21:33:00,452: INFO/SpawnPoolWorker-2] Task addTask.add[6fc350a9-e925-486c-bc41-c239ebd96041] succeeded in 0.0s: 10

As you can see, the result is 10. Let's focus on the first script, addTask.py: in the first two lines of code, we create a Celery application instance that uses the RabbitMQ service broker:

from celery import Celery
app = Celery('addTask', broker='amqp://guest@localhost//')

The first argument in the Celery function is the name of the current module (addTask.py), and the second is the broker keyboard argument; this indicates the URL that is used to connect the broker (RabbitMQ).

Now, let's introduce the task to be accomplished.

Each task must be added with the @app.task annotation (namely, decorator); the decorator helps Celery to identify which functions can be scheduled in the task queue.

After the decorator, we create the task that the workers can execute: this will be a simple function that performs the sum of two numbers:

@app.task
def add(x, y):
return x + y

In the second script, addTask_main.py, we call our task by using the delay() method:

if __name__ == '__main__':
result = addTask.add.delay(5,5)

Let's remember that this method is a shortcut to the apply_async() method, which gives us greater control over the task execution.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset