본문으로 바로가기

소개

 

큐(queue) 를 이용해서 독립적으로 실행되는 쓰레드의 상호간 데이터 통신에 대해 알아보겠습니다.

 

 

예제 1

 

큐의 put() 과 get() 을 사용한 예제입니다.

import time
import threading
from queue import Queue

def sender(q):
    data = [2020, 8, 12, 1, 55]
    while data:
        d = data.pop(0)
        q.put(d)
        print(f'sender : {d}')
        time.sleep(1)
    q.put(None)
    print('sender done')

def receiver(q):
    while True:
        data = q.get()
        if data is None:
            break
        print(f'receiver : {data}')
    print('receiver done')

if __name__ == '__main__':
    q = Queue()
    t1 = threading.Thread(target=sender, args=(q,))
    t2 = threading.Thread(target=receiver, args=(q,))
    t1.start()
    t2.start()
sender : 2020
receiver : 2020
sender : 8
receiver : 8
sender : 12
receiver : 12
sender : 1
receiver : 1
sender : 55
receiver : 55
sender done
receiver done

 

큐 객체를 생성합니다.

q = Queue()

 

큐에 데이터를 삽입합니다.

d = data.pop(0)
q.put(d)

 

큐가 비어있는 상태에서 get() 함수는 데이터가 삽입되기까지 대기합니다.

data = q.get()

 

https://docs.python.org/ko/3.7/library/queue.html

 

이해를 돕기위한 get() 함수에 대한 예제입니다.

timeout 파라미터에 대기시간을 지정할 수 있습니다.

from queue import Queue

#1
q = Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())

#2
q = Queue()
try:
    q.get(timeout=1)
except Exception as e:
    print('Queue is empty')
    print(str(e))

#3
# q = Queue()
# print(q.get(False))
# print(q.get_nowait())
1
2
Queue is empty
get raise Empty
queue.Empty

 

 

예제 2

 

데이터를 전송하는 쪽과 데이터를 받는 쪽에서의 처리 속도가 다를 때를 비교해 봅니다.

import time
import threading
from queue import Queue

def sender(q):
    data = [2020, 8, 12, 1, 55]
    while data:
        d = data.pop(0)
        q.put(d)
        print(f'sender : {d}')
        # time.sleep(1)
    q.put(None)
    print('sender done')

def receiver(q):
    while True:
        data = q.get()
        if data is None:
            break
        print(f'receiver : {data}')
        time.sleep(1)
    print('receiver done')

if __name__ == '__main__':
    q = Queue()
    t1 = threading.Thread(target=sender, args=(q,))
    t2 = threading.Thread(target=receiver, args=(q,))
    t1.start()
    t2.start()
sender : 2020
sender : 8
sender : 12
receiver : 2020
sender : 1
sender : 55
sender done
receiver : 8
receiver : 12
receiver : 1
receiver : 55
receiver done

 

 

받는 쪽에서는 큐에 있는 데이터를 하나씩 꺼내서 처리하면 되니까 문제는 없습니다만

순차적인 처리가 필요할 때도 있습니다.

 

이런 경우에는 join()task_done() 을 사용합니다.

import time
import threading
from queue import Queue

def sender(q):
    data = [2020, 8, 12, 1, 55]
    while data:
        d = data.pop(0)
        q.put(d)
        print(f'* sender : {d}')
        print('* sender waiting ...')
        q.join()

    q.put(None)
    print('* sender done')

def receiver(q):
    while True:
        data = q.get()
        if data is None:
            break
        print(f'** receiver : {data}')
        time.sleep(2)
        q.task_done()
    print('* receiver done')

if __name__ == '__main__':
    q = Queue()
    t1 = threading.Thread(target=sender, args=(q,))
    t2 = threading.Thread(target=receiver, args=(q,))
    t1.start()
    t2.start()
* sender : 2020
* sender waiting ...
** receiver : 2020
* sender : 8
* sender waiting ...
** receiver : 8
* sender : 12
* sender waiting ...
** receiver : 12
* sender : 1
* sender waiting ...
** receiver : 1
* sender : 55
* sender waiting ...
** receiver : 55
* sender done
* receiver done

 

join() 은 큐의 모든 항목이 꺼내서 처리할 때까지 대기합니다.

def sender(q):
    data = [2020, 8, 12, 1, 55]
    while data:
        d = data.pop(0)
        q.put(d)
        print(f'* sender : {d}')
        print('* sender waiting ...')
        q.join()

    q.put(None)
    print('* sender done')

 

task_done() 은 받는 쪽에서 사용하며, 대기중인 큐의 모든 항목이 처리되었으면 재개합니다.

def receiver(q):
    while True:
        data = q.get()
        if data is None:
            break
        print(f'** receiver : {data}')
        time.sleep(2)
        q.task_done()
    print('* receiver done')