
Django에서 Celery 이용하기 두번째

해당 게시물은 위 주소의 사이트의 내용을 기반으로 합니다.





앞에 글에서 현재 프로젝트 구조를 보겠습니다.




from celery import Celery

app = Celery('config',

# Optional configuration, see the application user guide.

if __name__ == '__main__':


앞에 글에서 Celery 인스턴스(Django에서는 앱이라고 불리죠)를 생성하였습니다.

허나 broker가 없어서 에러가 떳었죠. 위와같이 broker 설정을 추가 해 줍시다.




  • Celery에 들어갈 변수 목록들은 Celery를 시작 할 때 가져올 목록들입니다.
  • 필요한게 있으시면 직접 추가해주세요.
  • 첫번째 변수에는 사용할 앱의 이름을 지정해줍니다. 여기서는 config겠네요
  • broker 변수는 는 저희가 쓸 broker url을 지정해 줍니다.
  • backend 변수는 result backend를 명시해줍니다. (작업상태및 결과를 추적하는데 사용)
  • include에는 백그라운드에서 돌릴 tasks 파일의 경로를 입력해줍시다.




어떤 broker를 쓸지에 대한 자세한 내용은 하기 링크에 나와있습니다.


rabbitMQ는 쉽고, 안정적인 broker입니다. 개발환경에서 rabbitMQ를 이용하는건 최고의 선택입니다.


설치방법은 아래와 같습니다.




$ sudo apt-get install rabbitmq-server


도커 사용시

$ docker run -d -p 5672:5672 rabbitmq




Redis는 또한 강력한 기능을 가진 broker이며 갑작스런 서버종료가 발생할 때 데이터를 보호하는 능력이 뛰어난 broker 입니다.


도커사용시 아래와 같이 실행이 가능합니다.

$ docker run -d -p 6379:6379 redis





저는 rabbitMQ를 선택하도록 하겠습니다.


Celery는 기본 broker로 rabbitMQ를 채택하고 있고요.







자 이제 를 작성해 봅시다.



from .celery import app

def add(x, y):
    return x + y

def mul(x, y):
    return x * y

def xsum(numbers):
    return sum(numbers)






- woker 가동하기


먼저 worker 가동전에 rabbitmq를 설치해주셨다면 rabbitmq 서버를 가동해줍시다.


$ sudo rabbitmq-server


첫번째 글에서 쓴것처럼 worker는 하기 커맨드로 실행이 가능합니다.


$ celery -A 앱이름 worker -l 로그레벨


이제 worker를 실행해봅시다. 위 프로젝트 레이아웃에서는


$ celery -A config worker -l INFO


이렇게 입력하시면 되겠죠.





가동하면 아래와 같은 결과가 나옵니다.


 -------------- celery@DESKTOP-TF0CCFF v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-5.4.72-microsoft-standard-WSL2-x86_64-with-glibc2.29 2021-03-08 20:49:22
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         config:0x7f212d8a4790
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

가이드 페이지와 모양이 조금 다르네요. 가이드 페이지가 조금 오래된 Django와 Celery 버전을 쓰기때메 그런거로 추정됩니다.


  • transport는 앞서 저희가 명시해준 broker url 입니다. 커맨드라인에서 -b 옵션으로 다른 borker url을 지정 해 주실수도 있습니다.
  • concurrency는 tasks를 수행할 prefork worker process의 숫자를 나타냅니다. 모든 prefork worker가 busy상태이면 새로운 task는 prefork worker의 작업이 끝날 때 까지 기다려야 됩니다.
  • task events는 celery에게 worker에서 발생하는 작업에 대한 모니터링 메세지를 보내도록 하는 옵션입니다. 모니터링 프로그램으로는 celery events나 Flower가 있습니다.
  • queues는 worker가 사용할 queue의 리스트를 나타내 줍니다.

셀러리에게 작업자에게 발생하는 작업에 대한 모니터링 메시지(이벤트)를 보내도록 하는 옵션입니다.


concurrency의 기본값은 cpu코어의 숫자에 의해 결정됩니다. celery worker -c 옵션으로 사용할 코어 숫자를 따로 정해줄 수 있습니다.




- woker 멈추기

간단하게 ctrl+c로 worker의 작업을 멈출 수 있습니다.



- woker 백그라운드로 작업하기

worker를 백그라운드로 작업하기 위해서는 하기 링크를 일단 참조 바랍니다.


여러개의 celery를 daemon화 시키는 스크립트는 celery multi 커맨드를 이용합니다.


$ celery multi start w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Starting nodes...
    > w1.halcyon.local: OK


restart 하는 방법은 아래와 같습니다.

$ celery  multi restart w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052


stop하는 방법은 아래와 같습니다.

$ celery multi stop w1 -A proj -l INFO

허나 위의 방법은 async하게 stop하는 방법이라 worker가 작업중인 상태이면 작업하는걸 그대로 shutdown 해버립니다.


하기의 방법으로 worker가 작업이 끝날 때 까지 기다리다가 stop하려면 stopwait를 이용해주세요.

$ celery multi stopwait w1 -A proj -l INFO



- Task Calling하기

자 앞서 작업했던 tasks에 있는 함수를 calling 해 봅시다.


먼저 config 앱에 관한 worker를 가동해주세요.


celery -A config worker -l INFO


앱이름이 config가 아니고 mySampleApp 이런식이면 celery -A mySampleApp worker -l INFO 이런식으로 해주시면 되겠죠?



자 워커를 가동하였다면 이번에는 task를 call 해 봅시다.


delay라는 함수를 이용하여 add라는 task를 call 하여 2+2를 백그라운드에서 계산해봅시다.


Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from config.tasks import add
>>> add.delay(2,2)
<AsyncResult: 6ef39f03-974a-4d0a-8c40-61bf735a2325>

파이썬 인터프리터로 tasks에 정의한 add를 call 하였더니 해시형태로 결과값이 출력되네요.

여기서 worker에서 뜨는 로그를 보면


  . config.tasks.add
  . config.tasks.mul
  . config.tasks.xsum

[2021-03-08 21:49:12,470: INFO/MainProcess] Connected to amqp://guest:**@
[2021-03-08 21:49:12,477: INFO/MainProcess] mingle: searching for neighbors
[2021-03-08 21:49:13,508: INFO/MainProcess] mingle: all alone
[2021-03-08 21:49:13,525: INFO/MainProcess] celery@DESKTOP-TF0CCFF ready.
[2021-03-08 21:51:09,879: INFO/MainProcess] Received task: config.tasks.add[6ef39f03-974a-4d0a-8c40-61bf735a2325]
[2021-03-08 21:51:09,892: INFO/ForkPoolWorker-8] Task config.tasks.add[6ef39f03-974a-4d0a-8c40-61bf735a2325] succeeded in 0.011422700000366603s: 4

succeeded in 0.011422700000366603s: 4


4가 출력되는걸 보실 수 있습니다.





여기서 delay 함수는 apply_async의 shortcut 버전입니다.

위에서 add.delay(2,2)를 해주는 것은 아래와 같습니다.

>>> add.apply_async((2, 2))

아래와 같은 옵션으로 여러 옵션을 지정 해 줄 수 있습니다.

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

2+2를 백그라운드로 작업하되 queue로 lopri를 사용하고 10초뒤에 계산해라 라는 의미입니다.



task를 call 하는 방법은 아래의 링크를 참고해주세요


위에도 보셨겠지만 delay와 apply_async 함수를 호출하면 AsyncResult 인스턴스를 돌려받게 됩니다. 이 AsyncResult로 task의 수행과정을 추적할 수 있습니다. 단 resultBackend를 enable 해 주어야 합니다. 그래야 수행과정이 어딘가 저장될 수 있도록 해주니깐요.







앞서 2+2의 수행결과를 인터프리터에서는 확인하지 못하였고 worker의 로그에서 확인하였습니다.

이번에는 AsyncResult 인스턴스를 이용하여 인터프리터에서 백그라운드 수행결과를 확인 해 보겠습니다.

>>> asyncResult = add.delay(2,2)  // AsyncResult 인스턴스를 받아옴
>>> asyncResult.get(timeout=1)    // get 함수로 결과물을 확인



자 그런데 아까 AsyncResult의 task id를 확인 할 수 있었는데 위에 커맨드로는 확인이 안되네요.

task id 확인 방법은 AsyncResult에 id라는 attribute로 확인하실 수 있습니다.


celery worker 로그에서 위 결과와 일치하는게 보이는군요.


 Task config.tasks.add[83c5ef44-0c5e-4c4d-91a4-80d3787b7afa] succeeded in 0.0005424999999377178s: 4





Signature 이용하기


자 이번에는 signature라는걸 이용해봅시다. 앞어 task를 calling 하는걸 배웠습니다.

signature는 이렇게 task를 calling 하는것을 다른 함수나 프로세스에 변수로 전달해주는 wrapper입니다.


add라는 태스크를 다른 함수로 전달하기위해 signature를 생성해 봅시다.

>>> add.signature((2, 2), countdown=10)
config.tasks.add(2, 2)

2+2 를 수행하고 카운트다운이 10인 signature가 생성되었습니다.



signature는 s로 줄여서 사용하실 수도 있습니다.


>>> add.s((2, 2), countdown=10)
config.tasks.add((2, 2), countdown=10)



자 그럼 signature에서는 task calling을 어떻게 할까요?

방법은 delay() 함수를 이용하는 겁니다.

>>> s1 = add.s(2,2)
>>> s1
config.tasks.add(2, 2)
>>> result = s1.delay()
>>> result.get()

앞서 task calling 할때 delay()와의 차이점은 signature는 delay 함수에 들어갈 인풋을 이미 가지고 있다는 점에서 차이가 있습니다.



어 그럼 signature에서 원하는 인풋으로 task를 calling 하고 싶으면 어떻게 하나요?


정답은 signature 를 incompleted 하게 만들면 됩니다.




>>> s1 = add.s(2) # add(?,2) 형태의 signature가 만들어집니다.
>>> result = s1.delay(8) add(8, 2)
>>> result.get()
>>> s2 = add.s() # add(?,?) 형태의 signature가 만들어집니다.
>>> result = s2.delay(10, 10) # add(10, 10)
>>> result.get()




The Primitives

celery에는 아래와 같은 Primitive들이 있습니다.



  • group
  • chain
  • chord
  • map
  • starmap
  • chunks

여기서 많이 쓰이는 group과 chain에 대해서 알아보겠습니다.




group은 여러가지 task를 동시에 병렬적으로 처리하도록 해줍니다. 

>>> from celery import group
>>> from config.tasks import add
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


앞서배운 incompleted signature를 생성하여 task를 calling 할 수도 있습니다.

>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]



chain은 한 task가 끝나면 다른 task를 연속적으로 calling 해 줍니다.



>>> from celery import chain
>>> from config.tasks import add, mul
>>> chain(add.s(4,4) | mul.s(2))().get()


imcompletd signature 사용 예제는 아래와 같습니다.

(?+4)*2  , ? = 10


>>> from celery import chain
>>> from config.tasks import add, mul
>>> c = chain(add.s(4) | mul.s(2))
>>> c(10).get()


chain은 또한 아래와 같이 즉각적으로 사용 할 수도 있습니다.

>>> (add.s(4, 4) | mul.s(8))().get()





celery에서는 간단하게 task를 어떤 큐에서 사용할지 명시적으로 지정해 줄 수 있습니다.


아래는 celery.py에서 task_routes를 사용한예제입니다.



    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},

또는 apply_async에서 직접 지정 할 수도 있습니다.

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

아니면 커맨드 라인에서 worker에서 -Q 옵션을 이용해 지정할 수도 있습니다.

$ celery -A config worker -Q hipri





Remote Control

하기의 커맨드로 worker가 어떤 작업을 하고 있는지 확인 할 수 있습니다.

$ celery -A proj inspect active



하기와 같이 timezone으로 지역시간 설정이 가능합니다.

app.conf.timezone = 'Europe/London'