multiprocessing
파이썬은 인터프리터 언어로서 기본적으로 single thread에서 순차적으로 동작한다. 그렇다면 많은 양의 처리를 나누어 처리하여 시간과 자원의 효율을 높이는 병렬처리를 하기 위해서는 어떻게 해야할까? 바로 지금 소개할 multiprocessing 모듈을 사용하는 것이다.
multiprocessing 모듈로 부모 프로세스가 운영체제에 요청하여 자식 프로세스를 새로 만들어내는 과정인 프로세스 스포닝을 쉽게 수행할 수 있는데, 이를 통해 병렬로 cpu 작업을 할 수 있고 분산 처리 프로그래밍도 구현 가능하다.
하지만 프로세스는 각자가 고유한 메모리 영역을 가지기 때문에 쓰레드에 비하면 메모리 사용이 늘어난다는 단점도 있다.
공식 문서 https://docs.python.org/ko/3/library/multiprocessing.html
Process
multiprocessing에서는 Process 객체를 생성하여 start() 메소드를 호출하여 프로세스 스포닝을 시작한다. 이 때 생성될 자식 프로세스의 이름과 하고자 하는 일(함수)을 전달한다.
import multiprocessing as mp
def func():
proc = mp.current_process()
print("Process name: "+proc.name)
print("Process pid: "+str(proc.pid))
print("===SubProcess End===")
if __name__ == "__main__":
# main process
proc = mp.current_process()
print("Process name: "+proc.name)
print("Process pid: "+str(proc.pid))
# process spawning
p = mp.Process(name="SubProcess", target=func)
p.start()
print("===MainProcess End===")
output >>>
Process name: MainProcess
Process pid: 21436
===MainProcess End===
Process name: SubProcess
Process pid: 15508
===SubProcess End===
join
join() 메서드가 호출한 프로세스가 종료될 때까지 차단된다. 즉, 메인 프로세스는 서브 프로세스가 종료될 때까지 기다린다.
import multiprocessing as mp
def func():
proc = mp.current_process()
print("Process name: "+proc.name)
print("Process pid: "+str(proc.pid))
print("===SubProcess End===")
if __name__ == "__main__":
# main process
proc = mp.current_process()
print("Process name: "+proc.name)
print("Process pid: "+str(proc.pid))
# process spawning
p = mp.Process(name="SubProcess", target=func)
p.start()
p.join()
print("===MainProcess End===")
output >>>
Process name: MainProcess
Process pid: 17704
Process name: SubProcess
Process pid: 404
===SubProcess End===
===MainProcess End===
daemon
프로세스가 종료할 때 모든 데몬 자식 프로세스를 강제 종료한다. 즉, 메인 프로세스가 종료하면 서브 프로세스도 종료한다.
import multiprocessing as mp
def func():
proc = mp.current_process()
print("Process name: "+proc.name)
print("Process pid: "+str(proc.pid))
print("===SubProcess End===")
if __name__ == "__main__":
# main process
proc = mp.current_process()
print("Process name: "+proc.name)
print("Process pid: "+str(proc.pid))
# process spawning
p = mp.Process(name="SubProcess", target=func, daemon=True)
p.start()
print("===MainProcess End===")
output >>>
Process name: MainProcess
Process pid: 6140
===MainProcess End===
여러 개의 프로세스 실행
from multiprocessing import Process
def func(num):
print([n**2 for n in range(num)])
if __name__ == '__main__':
processes = []
for num in [2, 4, 8, 16, 32]:
p = Process(target=func, args=(num,))
processes.append(p)
p.start()
for p in processes:
p.join()
output >>>
[0, 1]
[0, 1, 4, 9, 16, 25, 36, 49]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961]
[0, 1, 4, 9]
Pool
worker process 풀 개체이다. 함수 실행을 병렬로 처리하고 입력 데이터를 프로세스에 분산하는 데이터 병렬 처리가 가능하다.
from multiprocessing import Pool
def func(x):
print(x)
if __name__ == "__main__":
pool = Pool(4)
data = range(1, 20)
pool.map(func, data)
output >>>
1
3
4
5
6
7
8
9
10
11
12
2
13
14
15
16
19
17
18
프로세스 간 자원 공유
프로세스내의 스레드는 한 프로세스에 할당된 자원을 공유하지만 ,프로세스와 프로세스는 자원을 공유하지 않는다. 따라서 프로세스와 프로세스 사이에서 자원을 전달하려면 Queue 또는 Pipe를 사용하여야 한다.
Queue
우리가 아는 FIFO(First In First Out)의 성질을 가지고 있는 자료구조 queue와 같은 역할을 한다.
from multiprocessing import Process, Queue
def func(q):
q.put(42)
q.put(None)
q.put('hello')
if __name__ == '__main__':
q = Queue()
p = Process(target=func, args=(q,))
p.start()
print(q.get())
print(q.get())
print(q.get())
p.join()
output >>>
42
None
hello
Pipe
파이프 모양처럼, 파이프의 양 끝을 한 쌍의 연결 객체인 (conn1, conn2)로 반환한다.
from multiprocessing import Process, Pipe
def func(conn):
conn.send(42)
conn.send(None)
conn.send('hello')
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=func, args=(child_conn,))
p.start()
print(parent_conn.recv())
print(parent_conn.recv())
print(parent_conn.recv())
p.join()
output >>>
42
None
hello
Manager
파이썬의 객체를 유지하고, 다른 프로세스가 proxy를 사용하여 객체를 조작할 수 있도록 하는 server process를 제어한다.
각 프로세스들이 결과를 하나의 객체(리스트, 큐, 딕셔너리 등)에 저장하는 방법이라고 생각하면 된다.
Manager는 임의의 객체 형을 지원하도록 만들 수 있으므로 공유 메모리 객체를 사용하는 것보다 융통성 있고, 단일 Manage를 네트워크를 통해 서로 다른 컴퓨터의 프로세스에서 공유 할 수 있다. 그러나 공유 메모리를 사용할 때보다 느리다는 단점이 있다.
from multiprocessing import Process, Manager
def func(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=func, args=(d, l))
p.start()
p.join()
print(d)
print(l)
output >>>
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Pool + Manager
pool에 map되는 함수가 여러가지 arguments를 인자로 받으려면 Pool.map() 대신 Pool.starmap()을 사용해야한다.
from multiprocessing import Pool, Manager
def func1():
return list(range(20))
def func2(result_list, data):
result_list.append(data**2)
if __name__ == '__main__':
p = Pool(4)
m = Manager()
result_list = m.list()
datas = func1()
p.starmap(func2, [(result_list, data) for data in datas])
p.close()
p.join()
print(result_list)
output >>>
[16, 0, 25, 1, 4, 9, 36, 49, 64, 81, 100, 121, 144, 169, 196, 256, 225, 289, 324, 361]
참고 문서 https://wikidocs.net/82575
https://zzaebok.github.io/python/python-multiprocessing/
'Python' 카테고리의 다른 글
[python] datetime hour 추출 후 zfill (0) | 2023.08.17 |
---|---|
[python] anaconda 설정 (0) | 2023.08.16 |
[python] 동적 변수명 - globals(), locals() (0) | 2023.08.11 |
[python error] pandas.read_excel TypeError: got an unexpected keyword argument (1) | 2023.08.08 |
[python] datetime resample, groupby with Grouper (0) | 2023.08.07 |