2015年8月18日 星期二

[How to] Python Create worker threads in Multi-processing Application

In case of processing large amount of data, multi-processing / multi-threading is a good way to speed up the whole process. In Python, some of the library is not thread safe, So using a multi processing as an example for the simple application

For simply, just look at the code:

 """
Author: Samuel Fok (foksanho@gmail.com)

No license: Free to Use and no warranty

Tested on the following Environment:
Python 2.7.10
Windows XP SP3

Version
20150818 - Initial Version

"""
import time

import multiprocessing

def process_task(task):
    #just do something takes time
    time.sleep(task)
    return task * 10

def multi_process_worker(task_queue, result_queue, target):
    while True:
        next_task = task_queue.get()
        if next_task is None:
            task_queue.task_done()
            break
        answer = target(next_task)
        task_queue.task_done()
        """
        if (answer):
            result_queue.put(next_task)
        else:
            result_queue.put(None)
        """
        result_queue.put(answer)
    return

def multi_process_task(task_list, target, num_task_per_core = 4):
    tasks = multiprocessing.JoinableQueue() #task list
    results = multiprocessing.Queue() #result list
    p_list = []
    result_list = []

    #TODO: adjust number of workers yourself
    #depends on your hardware
    num_workers = multiprocessing.cpu_count() * num_task_per_core

    if len(task_list) < num_workers:
        num_workers = len(task_list)

    for i in xrange(num_workers):
        p = multiprocessing.Process(target=multi_process_worker, args=(tasks,results, target))
        p_list.append(p)

    for p in p_list:
        p.start()

    for task in task_list:
        tasks.put(task)

    #At the time of no more task exit each process gracefully
    for i in xrange(num_workers):
        tasks.put(None)

    tasks.join()
   
    for task in task_list:
        result = results.get()
        if result is not None:
            result_list.append(result)

    for p in p_list:
        p.join()

    return result_list


if __name__ == '__main__':
    #INPUT
    task_list = range(0,20)

    #PROCESS
    result_list = multi_process_task(task_list, process_task, 4)

    #OUTPUT
    for result in result_list:
        print (result)

沒有留言:

張貼留言