본문 바로가기
Django/Celery

Django에서 Celery 이용하기 두번째

by 붕어사랑 티스토리 2021. 2. 28.
반응형

docs.celeryproject.org/en/stable/getting-started/next-steps.html#next-steps

 

Next Steps — Celery 5.0.5 documentation

This document describes the current stable version of Celery (5.0). For development docs, go here. Next Steps The First Steps with Celery guide is intentionally minimal. In this guide I’ll demonstrate what Celery offers in more detail, including how to a

docs.celeryproject.org

해당 게시물은 위 주소의 사이트의 내용을 기반으로 합니다. 일부 영양가 없는 내용은 제외되었습니다.

 

 

 

 

앞에 글에서 현재 프로젝트 구조를 보겠습니다.

 

proj/
    manage.py
    config/
        __init__.py
        celery.py
        tasks.py

config/celery.py

from celery import Celery

app = Celery('config',
             broker='amqp://',
             backend='rpc://',
             include=['config.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

 

앞에 글에서 Celery 인스턴스(Django에서는 앱이라고 불리죠)를 생성하였습니다.

허나 broker가 없어서 에러가 떳었죠. 위와같이 broker 설정을 추가 해 줍시다.

 

 

 

  • Celery에 들어갈 변수 목록들은 Celery를 시작 할 때 가져올 목록들입니다.
  • 필요한게 있으시면 직접 추가해주세요.
  • 첫번째 변수에는 사용할 앱의 이름을 지정해줍니다. 여기서는 config겠네요
  • broker 변수는 는 저희가 쓸 broker url을 지정해 줍니다.
  • backend 변수는 result backend를 명시해줍니다. (작업상태및 결과를 추적하는데 사용)
  • include에는 백그라운드에서 돌릴 tasks 파일의 경로를 입력해줍시다.

 

 

 

어떤 broker를 쓸지에 대한 자세한 내용은 하기 링크에 나와있습니다.

docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html#celerytut-broker

 

First Steps with Celery — Celery 5.0.5 documentation

This document describes the current stable version of Celery (5.0). For development docs, go here. First Steps with Celery Celery is a task queue with batteries included. It’s easy to use so that you can get started without learning the full complexities

docs.celeryproject.org

RabbitMQ

rabbitMQ는 쉽고, 안정적인 broker입니다. 개발환경에서 rabbitMQ를 이용하는건 최고의 선택입니다.

 

설치방법은 아래와 같습니다.

 

 

우분투/데비안 

$ sudo apt-get install rabbitmq-server

 

도커 사용시

$ docker run -d -p 5672:5672 rabbitmq

 

 

Redis

Redis는 또한 강력한 기능을 가진 broker이며 갑작스런 서버종료가 발생할 때 데이터를 보호하는 능력이 뛰어난 broker 입니다.

 

도커사용시 아래와 같이 실행이 가능합니다.

$ docker run -d -p 6379:6379 redis

 

 

 

 

저는 rabbitMQ를 선택하도록 하겠습니다.

 

docs.celeryproject.org/en/stable/getting-started/brokers/rabbitmq.html#broker-rabbitmq

 

Using RabbitMQ — Celery 5.0.5 documentation

This document describes the current stable version of Celery (5.0). For development docs, go here. Using RabbitMQ RabbitMQ is the default broker so it doesn’t require any additional dependencies or initial configuration, other than the URL location of th

docs.celeryproject.org

Celery는 기본 broker로 rabbitMQ를 채택하고 있고요.

 

 

 

 

 

 

자 이제 tasks.py 를 작성해 봅시다.

 

config/tasks.py

from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

 

 

 

 

 

- woker 가동하기

 

먼저 worker 가동전에 rabbitmq를 설치해주셨다면 rabbitmq 서버를 가동해줍시다.

 

$ sudo rabbitmq-server

 

첫번째 글에서 쓴것처럼 worker는 하기 커맨드로 실행이 가능합니다.

 

$ celery -A 앱이름 worker -l 로그레벨

 

이제 worker를 실행해봅시다. 위 프로젝트 레이아웃에서는

 

$ celery -A config worker -l INFO

 

이렇게 입력하시면 되겠죠.

 

 

 

 

가동하면 아래와 같은 결과가 나옵니다.

 

 -------------- celery@DESKTOP-TF0CCFF v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-5.4.72-microsoft-standard-WSL2-x86_64-with-glibc2.29 2021-03-08 20:49:22
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         config:0x7f212d8a4790
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

가이드 페이지와 모양이 조금 다르네요. 가이드 페이지가 조금 오래된 Django와 Celery 버전을 쓰기때메 그런거로 추정됩니다.

 

  • transport는 앞서 저희가 명시해준 broker url 입니다. 커맨드라인에서 -b 옵션으로 다른 borker url을 지정 해 주실수도 있습니다.
  • concurrency는 tasks를 수행할 prefork worker process의 숫자를 나타냅니다. 모든 prefork worker가 busy상태이면 새로운 task는 prefork worker의 작업이 끝날 때 까지 기다려야 됩니다.
  • task events는 celery에게 worker에서 발생하는 작업에 대한 모니터링 메세지를 보내도록 하는 옵션입니다. 모니터링 프로그램으로는 celery events나 Flower가 있습니다.
  • queues는 worker가 사용할 queue의 리스트를 나타내 줍니다.

셀러리에게 작업자에게 발생하는 작업에 대한 모니터링 메시지(이벤트)를 보내도록 하는 옵션입니다.

셀러리에게 작업자에게 발생하는 작업에 대한 모니터링 메시지(이벤트)를 보내도록 하는 옵션입니다.

 

concurrency의 기본값은 cpu코어의 숫자에 의해 결정됩니다. celery worker -c 옵션으로 사용할 코어 숫자를 따로 정해줄 수 있습니다.

 

 

 

- woker 멈추기

간단하게 ctrl+c로 worker의 작업을 멈출 수 있습니다.

 

 

- woker 백그라운드로 작업하기

worker를 백그라운드로 작업하기 위해서는 하기 링크를 일단 참조 바랍니다.

docs.celeryproject.org/en/stable/userguide/daemonizing.html#daemonizing

 

Daemonization — Celery 5.0.5 documentation

This document describes the current stable version of Celery (5.0). For development docs, go here. Daemonization Most Linux distributions these days use systemd for managing the lifecycle of system and user services. You can check if your Linux distributio

docs.celeryproject.org

여러개의 celery를 daemon화 시키는 스크립트는 celery multi 커맨드를 이용합니다.

 

$ celery multi start w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Starting nodes...
    > w1.halcyon.local: OK

 

restart 하는 방법은 아래와 같습니다.

$ celery  multi restart w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

 

stop하는 방법은 아래와 같습니다.

$ celery multi stop w1 -A proj -l INFO

허나 위의 방법은 async하게 stop하는 방법이라 worker가 작업중인 상태이면 작업하는걸 그대로 shutdown 해버립니다.

 

하기의 방법으로 worker가 작업이 끝날 때 까지 기다리다가 stop하려면 stopwait를 이용해주세요.

$ celery multi stopwait w1 -A proj -l INFO

 

 

- Task Calling하기

자 앞서 작업했던 tasks에 있는 함수를 calling 해 봅시다.

 

먼저 config 앱에 관한 worker를 가동해주세요.

 

celery -A config worker -l INFO

 

앱이름이 config가 아니고 mySampleApp 이런식이면 celery -A mySampleApp worker -l INFO 이런식으로 해주시면 되겠죠?

 

 

자 워커를 가동하였다면 이번에는 task를 call 해 봅시다.

 

delay라는 함수를 이용하여 add라는 task를 call 하여 2+2를 백그라운드에서 계산해봅시다.

 

Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from config.tasks import add
>>> add.delay(2,2)
<AsyncResult: 6ef39f03-974a-4d0a-8c40-61bf735a2325>

파이썬 인터프리터로 tasks에 정의한 add를 call 하였더니 해시형태로 결과값이 출력되네요.

여기서 worker에서 뜨는 로그를 보면

 

[tasks]
  . config.tasks.add
  . config.tasks.mul
  . config.tasks.xsum

[2021-03-08 21:49:12,470: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2021-03-08 21:49:12,477: INFO/MainProcess] mingle: searching for neighbors
[2021-03-08 21:49:13,508: INFO/MainProcess] mingle: all alone
[2021-03-08 21:49:13,525: INFO/MainProcess] celery@DESKTOP-TF0CCFF ready.
[2021-03-08 21:51:09,879: INFO/MainProcess] Received task: config.tasks.add[6ef39f03-974a-4d0a-8c40-61bf735a2325]
[2021-03-08 21:51:09,892: INFO/ForkPoolWorker-8] Task config.tasks.add[6ef39f03-974a-4d0a-8c40-61bf735a2325] succeeded in 0.011422700000366603s: 4

succeeded in 0.011422700000366603s: 4

 

4가 출력되는걸 보실 수 있습니다.

 

 

 

 

여기서 delay 함수는 apply_async의 shortcut 버전입니다.

위에서 add.delay(2,2)를 해주는 것은 아래와 같습니다.

>>> add.apply_async((2, 2))

아래와 같은 옵션으로 여러 옵션을 지정 해 줄 수 있습니다.

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

2+2를 백그라운드로 작업하되 queue로 lopri를 사용하고 10초뒤에 계산해라 라는 의미입니다.

 

 

task를 call 하는 방법은 아래의 링크를 참고해주세요

docs.celeryproject.org/en/stable/userguide/calling.html#guide-calling

 

Calling Tasks — Celery 5.0.5 documentation

This document describes the current stable version of Celery (5.0). For development docs, go here. Calling Tasks This document describes Celery’s uniform “Calling API” used by task instances and the canvas. The API defines a standard set of execution

docs.celeryproject.org

 

 

 

 

 

 

위에도 보셨겠지만 delay와 apply_async 함수를 호출하면 AsyncResult 인스턴스를 돌려받게 됩니다. 이 AsyncResult로 task의 수행과정을 추적할 수 있습니다. 단 resultBackend를 enable 해 주어야 합니다. 그래야 수행과정이 어딘가 저장될 수 있도록 해주니깐요.

 

 

 

 

 

 

앞서 2+2의 수행결과를 인터프리터에서는 확인하지 못하였고 worker의 로그에서 확인하였습니다.

이번에는 AsyncResult 인스턴스를 이용하여 인터프리터에서 백그라운드 수행결과를 확인 해 보겠습니다.

>>> asyncResult = add.delay(2,2)  // AsyncResult 인스턴스를 받아옴
>>> asyncResult.get(timeout=1)    // get 함수로 결과물을 확인
4

 

 

자 그런데 아까 AsyncResult의 task id를 확인 할 수 있었는데 위에 커맨드로는 확인이 안되네요.

task id 확인 방법은 AsyncResult에 id라는 attribute로 확인하실 수 있습니다.

>>> asyncResult.id
'83c5ef44-0c5e-4c4d-91a4-80d3787b7afa'

celery worker 로그에서 위 결과와 일치하는게 보이는군요.

 

 Task config.tasks.add[83c5ef44-0c5e-4c4d-91a4-80d3787b7afa] succeeded in 0.0005424999999377178s: 4

 

 

 

 

Signature 이용하기

 

자 이번에는 signature라는걸 이용해봅시다. 앞어 task를 calling 하는걸 배웠습니다.

signature는 이렇게 task를 calling 하는것을 다른 함수나 프로세스에 변수로 전달해주는 wrapper입니다.

 

add라는 태스크를 다른 함수로 전달하기위해 signature를 생성해 봅시다.

>>> add.signature((2, 2), countdown=10)
config.tasks.add(2, 2)

2+2 를 수행하고 카운트다운이 10인 signature가 생성되었습니다.

 

 

signature는 s로 줄여서 사용하실 수도 있습니다.

 

>>> add.s((2, 2), countdown=10)
config.tasks.add((2, 2), countdown=10)

 

 

자 그럼 signature에서는 task calling을 어떻게 할까요?

방법은 delay() 함수를 이용하는 겁니다.

>>> s1 = add.s(2,2)
>>> s1
config.tasks.add(2, 2)
>>> result = s1.delay()
>>> result.get()
4

앞서 task calling 할때 delay()와의 차이점은 signature는 delay 함수에 들어갈 인풋을 이미 가지고 있다는 점에서 차이가 있습니다.

 

 

어 그럼 signature에서 원하는 인풋으로 task를 calling 하고 싶으면 어떻게 하나요?

 

정답은 signature 를 incompleted 하게 만들면 됩니다.

 

 

 

>>> s1 = add.s(2) # add(?,2) 형태의 signature가 만들어집니다.
>>> result = s1.delay(8) add(8, 2)
>>> result.get()
10
>>> s2 = add.s() # add(?,?) 형태의 signature가 만들어집니다.
>>> result = s2.delay(10, 10) # add(10, 10)
>>> result.get()
20

 

 

 

The Primitives

celery에는 아래와 같은 Primitive들이 있습니다.

 

 

  • group
  • chain
  • chord
  • map
  • starmap
  • chunks

여기서 많이 쓰이는 group과 chain에 대해서 알아보겠습니다.

 

Group

 

group은 여러가지 task를 동시에 병렬적으로 처리하도록 해줍니다. 

>>> from celery import group
>>> from config.tasks import add
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

 

앞서배운 incompleted signature를 생성하여 task를 calling 할 수도 있습니다.

>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

 

Chain

chain은 한 task가 끝나면 다른 task를 연속적으로 calling 해 줍니다.

 

(4+4)*2

>>> from celery import chain
>>> from config.tasks import add, mul
>>> chain(add.s(4,4) | mul.s(2))().get()
16

 

imcompletd signature 사용 예제는 아래와 같습니다.

(?+4)*2  , ? = 10

 

>>> from celery import chain
>>> from config.tasks import add, mul
>>> c = chain(add.s(4) | mul.s(2))
>>> c(10).get()
28

 

chain은 또한 아래와 같이 즉각적으로 사용 할 수도 있습니다.

>>> (add.s(4, 4) | mul.s(8))().get()
64

 

 

 

라우팅

celery에서는 간단하게 task를 어떤 큐에서 사용할지 명시적으로 지정해 줄 수 있습니다.

 

아래는 celery.py에서 task_routes를 사용한예제입니다.

 

config/celery.py

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

또는 apply_async에서 직접 지정 할 수도 있습니다.

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

아니면 커맨드 라인에서 worker에서 -Q 옵션을 이용해 지정할 수도 있습니다.

$ celery -A config worker -Q hipri

 

 

 

 

Remote Control

하기의 커맨드로 worker가 어떤 작업을 하고 있는지 확인 할 수 있습니다.

$ celery -A proj inspect active

 

TimeZone

하기와 같이 timezone으로 지역시간 설정이 가능합니다.

app.conf.timezone = 'Europe/London'
반응형

'Django > Celery' 카테고리의 다른 글

Celery beat으로 주기적으로 tasks 수행하기  (0) 2021.03.16
Django에 Celery 적용하기 첫번째  (4) 2021.02.28

댓글