728x90
소개
큐(queue) 를 이용해서 독립적으로 실행되는 쓰레드의 상호간 데이터 통신에 대해 알아보겠습니다.
예제 1
큐의 put() 과 get() 을 사용한 예제입니다.
import time
import threading
from queue import Queue
def sender(q):
data = [2020, 8, 12, 1, 55]
while data:
d = data.pop(0)
q.put(d)
print(f'sender : {d}')
time.sleep(1)
q.put(None)
print('sender done')
def receiver(q):
while True:
data = q.get()
if data is None:
break
print(f'receiver : {data}')
print('receiver done')
if __name__ == '__main__':
q = Queue()
t1 = threading.Thread(target=sender, args=(q,))
t2 = threading.Thread(target=receiver, args=(q,))
t1.start()
t2.start()
sender : 2020
receiver : 2020
sender : 8
receiver : 8
sender : 12
receiver : 12
sender : 1
receiver : 1
sender : 55
receiver : 55
sender done
receiver done
큐 객체를 생성합니다.
q = Queue()
큐에 데이터를 삽입합니다.
d = data.pop(0)
q.put(d)
큐가 비어있는 상태에서 get() 함수는 데이터가 삽입되기까지 대기합니다.
data = q.get()
이해를 돕기위한 get() 함수에 대한 예제입니다.
timeout 파라미터에 대기시간을 지정할 수 있습니다.
from queue import Queue
#1
q = Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
#2
q = Queue()
try:
q.get(timeout=1)
except Exception as e:
print('Queue is empty')
print(str(e))
#3
# q = Queue()
# print(q.get(False))
# print(q.get_nowait())
1
2
Queue is empty
get raise Empty
queue.Empty
예제 2
데이터를 전송하는 쪽과 데이터를 받는 쪽에서의 처리 속도가 다를 때를 비교해 봅니다.
import time
import threading
from queue import Queue
def sender(q):
data = [2020, 8, 12, 1, 55]
while data:
d = data.pop(0)
q.put(d)
print(f'sender : {d}')
# time.sleep(1)
q.put(None)
print('sender done')
def receiver(q):
while True:
data = q.get()
if data is None:
break
print(f'receiver : {data}')
time.sleep(1)
print('receiver done')
if __name__ == '__main__':
q = Queue()
t1 = threading.Thread(target=sender, args=(q,))
t2 = threading.Thread(target=receiver, args=(q,))
t1.start()
t2.start()
sender : 2020
sender : 8
sender : 12
receiver : 2020
sender : 1
sender : 55
sender done
receiver : 8
receiver : 12
receiver : 1
receiver : 55
receiver done
받는 쪽에서는 큐에 있는 데이터를 하나씩 꺼내서 처리하면 되니까 문제는 없습니다만
순차적인 처리가 필요할 때도 있습니다.
이런 경우에는 join() 과 task_done() 을 사용합니다.
import time
import threading
from queue import Queue
def sender(q):
data = [2020, 8, 12, 1, 55]
while data:
d = data.pop(0)
q.put(d)
print(f'* sender : {d}')
print('* sender waiting ...')
q.join()
q.put(None)
print('* sender done')
def receiver(q):
while True:
data = q.get()
if data is None:
break
print(f'** receiver : {data}')
time.sleep(2)
q.task_done()
print('* receiver done')
if __name__ == '__main__':
q = Queue()
t1 = threading.Thread(target=sender, args=(q,))
t2 = threading.Thread(target=receiver, args=(q,))
t1.start()
t2.start()
* sender : 2020
* sender waiting ...
** receiver : 2020
* sender : 8
* sender waiting ...
** receiver : 8
* sender : 12
* sender waiting ...
** receiver : 12
* sender : 1
* sender waiting ...
** receiver : 1
* sender : 55
* sender waiting ...
** receiver : 55
* sender done
* receiver done
join() 은 큐의 모든 항목이 꺼내서 처리할 때까지 대기합니다.
def sender(q):
data = [2020, 8, 12, 1, 55]
while data:
d = data.pop(0)
q.put(d)
print(f'* sender : {d}')
print('* sender waiting ...')
q.join()
q.put(None)
print('* sender done')
task_done() 은 받는 쪽에서 사용하며, 대기중인 큐의 모든 항목이 처리되었으면 재개합니다.
def receiver(q):
while True:
data = q.get()
if data is None:
break
print(f'** receiver : {data}')
time.sleep(2)
q.task_done()
print('* receiver done')
'Language > Python' 카테고리의 다른 글
파이썬 엘라스틱서치(Elasticsearch) 연동 -3 : 노리(Nori) 형태소 분석기 설치 (0) | 2020.08.13 |
---|---|
파이썬 쓰레드(Thread) 알아보기 - 4 : 쓰레드 동작 순서 제어를 위한 Event (0) | 2020.08.12 |
파이썬 쓰레드(Thread) 알아보기 - 2 : 뮤텍스(Lock) (2) | 2020.08.12 |
파이썬 쓰레드(Thread) 알아보기 - 1 (0) | 2020.08.12 |
파이썬 엘라스틱서치(Elasticsearch) 연동 -2 : CRUD (0) | 2020.08.11 |