博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python多进程multiprocessing使用示例
阅读量:4320 次
发布时间:2019-06-06

本文共 9348 字,大约阅读时间需要 31 分钟。

mutilprocess简介

像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

import multiprocessingdef worker(num):    """thread worker function"""    print 'Worker:', num    returnif __name__ == '__main__':    jobs = []    for i in range(5):        p = multiprocessing.Process(target=worker, args=(i,))        jobs.append(p)        p.start()
简单的创建进程

确定当前的进程,即是给进程命名,方便标识区分,跟踪

import multiprocessingimport timedef worker():    name = multiprocessing.current_process().name    print name, 'Starting'    time.sleep(2)    print name, 'Exiting'def my_service():    name = multiprocessing.current_process().name    print name, 'Starting'    time.sleep(3)    print name, 'Exiting'if __name__ == '__main__':    service = multiprocessing.Process(name='my_service',                                      target=my_service)    worker_1 = multiprocessing.Process(name='worker 1',                                       target=worker)    worker_2 = multiprocessing.Process(target=worker) # default name    worker_1.start()    worker_2.start()    service.start()
View Code

守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了

import multiprocessingimport timeimport sysdef daemon():    name = multiprocessing.current_process().name    print 'Starting:', name    time.sleep(2)    print 'Exiting :', namedef non_daemon():    name = multiprocessing.current_process().name    print 'Starting:', name    print 'Exiting :', nameif __name__ == '__main__':    d = multiprocessing.Process(name='daemon',                                target=daemon)    d.daemon = True    n = multiprocessing.Process(name='non-daemon',                                target=non_daemon)    n.daemon = False    d.start()    n.start()    d.join(1)    print 'd.is_alive()', d.is_alive()    n.join()
守护进程

最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态

import multiprocessingimport timedef slow_worker():    print 'Starting worker'    time.sleep(0.1)    print 'Finished worker'if __name__ == '__main__':    p = multiprocessing.Process(target=slow_worker)    print 'BEFORE:', p, p.is_alive()    p.start()    print 'DURING:', p, p.is_alive()    p.terminate()    print 'TERMINATED:', p, p.is_alive()    p.join()    print 'JOINED:', p, p.is_alive()
终止进程
  1. == 0 未生成任何错误  
  2. 0 进程有一个错误,并以该错误码退出
  3. < 0 进程由一个-1 * exitcode信号结束
import multiprocessingimport sysimport timedef exit_error():    sys.exit(1)def exit_ok():    returndef return_value():    return 1def raises():    raise RuntimeError('There was an error!')def terminated():    time.sleep(3)if __name__ == '__main__':    jobs = []    for f in [exit_error, exit_ok, return_value, raises, terminated]:        print 'Starting process for', f.func_name        j = multiprocessing.Process(target=f, name=f.func_name)        jobs.append(j)        j.start()    jobs[-1].terminate()    for j in jobs:        j.join()        print '%15s.exitcode = %s' % (j.name, j.exitcode)
进程的退出状态

方便的调试,可以用logging

import multiprocessingimport loggingimport sysdef worker():    print 'Doing some work'    sys.stdout.flush()if __name__ == '__main__':    multiprocessing.log_to_stderr()    logger = multiprocessing.get_logger()    logger.setLevel(logging.INFO)    p = multiprocessing.Process(target=worker)    p.start()    p.join()
日志

利用class来创建进程,定制子类

import multiprocessingclass Worker(multiprocessing.Process):    def run(self):        print 'In %s' % self.name        returnif __name__ == '__main__':    jobs = []    for i in range(5):        p = Worker()        jobs.append(p)        p.start()    for j in jobs:        j.join()
派生进程
import multiprocessingclass MyFancyClass(object):    def __init__(self, name):        self.name = name    def do_something(self):        proc_name = multiprocessing.current_process().name        print 'Doing something fancy in %s for %s!' % \            (proc_name, self.name)def worker(q):    obj = q.get()    obj.do_something()if __name__ == '__main__':    queue = multiprocessing.Queue()    p = multiprocessing.Process(target=worker, args=(queue,))    p.start()    queue.put(MyFancyClass('Fancy Dan'))    # Wait for the worker to finish    queue.close()    queue.join_thread()    p.join()import multiprocessingimport timeclass Consumer(multiprocessing.Process):    def __init__(self, task_queue, result_queue):        multiprocessing.Process.__init__(self)        self.task_queue = task_queue        self.result_queue = result_queue    def run(self):        proc_name = self.name        while True:            next_task = self.task_queue.get()            if next_task is None:                # Poison pill means shutdown                print '%s: Exiting' % proc_name                self.task_queue.task_done()                break            print '%s: %s' % (proc_name, next_task)            answer = next_task()            self.task_queue.task_done()            self.result_queue.put(answer)        returnclass Task(object):    def __init__(self, a, b):        self.a = a        self.b = b    def __call__(self):        time.sleep(0.1) # pretend to take some time to do the work        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)    def __str__(self):        return '%s * %s' % (self.a, self.b)if __name__ == '__main__':    # Establish communication queues    tasks = multiprocessing.JoinableQueue()    results = multiprocessing.Queue()    # Start consumers    num_consumers = multiprocessing.cpu_count() * 2    print 'Creating %d consumers' % num_consumers    consumers = [ Consumer(tasks, results)                  for i in xrange(num_consumers) ]    for w in consumers:        w.start()    # Enqueue jobs    num_jobs = 10    for i in xrange(num_jobs):        tasks.put(Task(i, i))    # Add a poison pill for each consumer    for i in xrange(num_consumers):        tasks.put(None)    # Wait for all of the tasks to finish    tasks.join()    # Start printing results    while num_jobs:        result = results.get()        print 'Result:', result        num_jobs -= 1
python进程间传递消息

Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

import multiprocessingimport timedef wait_for_event(e):    """Wait for the event to be set before doing anything"""    print 'wait_for_event: starting'    e.wait()    print 'wait_for_event: e.is_set()->', e.is_set()def wait_for_event_timeout(e, t):    """Wait t seconds and then timeout"""    print 'wait_for_event_timeout: starting'    e.wait(t)    print 'wait_for_event_timeout: e.is_set()->', e.is_set()if __name__ == '__main__':    e = multiprocessing.Event()    w1 = multiprocessing.Process(name='block',                                  target=wait_for_event,                                 args=(e,))    w1.start()    w2 = multiprocessing.Process(name='nonblock',                                  target=wait_for_event_timeout,                                  args=(e, 2))    w2.start()    print 'main: waiting before calling Event.set()'    time.sleep(3)    e.set()    print 'main: event is set'
进程间信号传递

Python多进程,一般的情况是Queue来传递。

from multiprocessing import Process, Queuedef f(q):    q.put([42, None, 'hello'])if __name__ == '__main__':    q = Queue()    p = Process(target=f, args=(q,))    p.start()    print q.get()    # prints "[42, None, 'hello']"    p.join()
Queue
import Queueimport threadingimport timeexitFlag = 0class myThread (threading.Thread):    def __init__(self, threadID, name, q):        threading.Thread.__init__(self)        self.threadID = threadID        self.name = name        self.q = q    def run(self):        print "Starting " + self.name        process_data(self.name, self.q)        print "Exiting " + self.namedef process_data(threadName, q):    while not exitFlag:        queueLock.acquire()        if not workQueue.empty():            data = q.get()            queueLock.release()            print "%s processing %s" % (threadName, data)        else:            queueLock.release()        time.sleep(1)threadList = ["Thread-1", "Thread-2", "Thread-3"]nameList = ["One", "Two", "Three", "Four", "Five"]queueLock = threading.Lock()workQueue = Queue.Queue(10)threads = []threadID = 1# Create new threadsfor tName in threadList:    thread = myThread(threadID, tName, workQueue)    thread.start()    threads.append(thread)    threadID += 1# Fill the queuequeueLock.acquire()for word in nameList:    workQueue.put(word)queueLock.release()# Wait for queue to emptywhile not workQueue.empty():    pass# Notify threads it's time to exitexitFlag = 1# Wait for all threads to completefor t in threads:    t.join()print "Exiting Main Thread"
多线程优先队列Queue

多进程使用Queue通信的例子

import timefrom multiprocessing import Process,QueueMSG_QUEUE = Queue(5)def startA(msgQueue):    while True:        if msgQueue.empty() > 0:            print ('queue is empty %d' % (msgQueue.qsize()))        else:            msg = msgQueue.get()            print( 'get msg %s' % (msg,))        time.sleep(1)def startB(msgQueue):    while True:        msgQueue.put('hello world')        print( 'put hello world queue size is %d' % (msgQueue.qsize(),))        time.sleep(3)if __name__ == '__main__':    processA = Process(target=startA,args=(MSG_QUEUE,))    processB = Process(target=startB,args=(MSG_QUEUE,))    processA.start()    print( 'processA start..')
View Code

主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。

转载于:https://www.cnblogs.com/IPYQ/p/5573628.html

你可能感兴趣的文章
BZOJ1015: [JSOI2008]星球大战starwar【并查集】【傻逼题】
查看>>
HUT-XXXX Strange display 容斥定理,线性规划
查看>>
mac修改用户名
查看>>
一道关于员工与部门查询的SQL笔试题
查看>>
Canvas基础
查看>>
[Hive - LanguageManual] Alter Table/Partition/Column
查看>>
可持久化数组
查看>>
去除IDEA报黄色/灰色的重复代码的下划波浪线
查看>>
Linux发送qq、网易邮件服务配置
查看>>
几道面试题
查看>>
【转】使用 WebGL 进行 3D 开发,第 1 部分: WebGL 简介
查看>>
js用正则表达式控制价格输入
查看>>
chromium浏览器开发系列第三篇:chromium源码目录结构
查看>>
java开发操作系统内核:由实模式进入保护模式之32位寻址
查看>>
第五讲:单例模式
查看>>
Python编程语言的起源
查看>>
Azure ARMTemplate模板,VM扩展命令
查看>>
在腾讯云上创建您的SQL Cluster(4)
查看>>
linux ping命令
查看>>
Activiti源码浅析:Activiti的活动授权机制
查看>>