当前位置:博客首页 > Python > 正文

第17课 多进程编程Process、multiprocessing.Pool

作者: Jarvan 分类: Python 发布时间: 2019-06-03 15:43 百度已收录

一、进程和程序的概念

进程:计算机资源分配(CPU、内存)的最小单位

  • 每个进程使用的CPU和内存是分开的,互不影响
  • 孤儿进程:父进程已经死掉,但是子进程还活着,没人认领,就成为了孤儿
  • 僵尸进程:子进程已经死掉了,但是父进程没有对子进程进行资源回收

线程:计算机资源调度的最小单位(就是程序运行的时候,运行的是线程)

协程:轻量级的线程,但是仍然是在线程中运行的

关系:线程是在进程里面执行的,协程是在线程中执行

  1. 编写完毕的代码,在没有运行之前称之为程序。
  2. 运行中的程序称之为进程,每一个运行的程序都是一个进程。
  3. 进程除了包含执行的代码还需要运行的环境,比如内存空间等。
  4. 操作系统主要是通过时间片切换的方式来进行进程间运行切换的。
  5. 每个进程都有自己独立的内存空间,跟其它进程的空间是分开的,这就使得进程间共享数据成为一个难题
from multiprocessing import Process
import os
import time
# 目标执行函数
def work(name):
    print("子进程运行中,我的名字是%s, 我的进程id是%s" % (name, os.getpid()))
    time.sleep(3)
if __name__ == "__main__":
    print("主进程开始执行,进程id=%s" % os.getpid()) # os.getpid() 获取当前进程号
    subp = Process(target=work, args=("john",))
    subp.start()
    subp.join()
    print("子进程已结束")

二、进程的创建

2.1 使用multiprocessing模块创建进程

每个程序一开始运行的时候就会创建一个进程(也叫主进程或主线程),因为此时只有一个进程和线程,所以他们代表的是一样的东西。

multiprocessing模块提供了一个Process类来代表一个进程对象,我们来看下使用Process类来创建一个进程的案例:

程序说明

1. 创建子进程时,只需要创建一个Process实例对象,然后传入一个执行函数和函数的参数(如果需要),接着调用实例对象的start方法启动进程,最后调用join方法,让主进程等待子进程执行完毕然后再继续执行主进程。

2. join()方法可以使得主进程等待子进程结束后再继续往下执行,主要用于进程间的同步, 如果不写join那么主进程会继续往下执行,而不等待子进程执行完毕。

from multiprocessing import Process
import os
import time

def work(name,delay):
    print("我是子进程{},我的进程ID是{}".format(name,os.getpid()))
    time.sleep(delay) # 让程序执行到这里的时候,睡眠30秒

if __name__=='__main__':
    print('我是主进程,我的pid是{}'.format(os.getpid()))
    p = Process(target=work,args=("Jarvan",20))
    p.start()
    p.join()  # 可以阻塞主进程
    print('主进程执行到这里啦....')

Process使用说明
Process([group [, target [, name [, args [, kwargs]]]]])target:表示这个进程实例所调用的目标执行函数;
args:表示调用对象的位置参数元组;
kwargs:表示调用对象的关键字参数字典;
name:当前进程实例的别名;
group:进程所属组,目前值必须为None否则会引发断言异常;

Process类常用方法:
is_alive():判断进程实例是否还在执行(存活);
join([timeout]):等待进程实例执行结束,如果给定timeout参数,那么将会等待给定的秒数;
start():启动进程实例(创建子进程);
run():如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法, 通常会使用继承的方式来重写该方法;
terminate():不管任务是否完成,立即终止;is_alive显示仍然活着,但是被强制退出了

Process类常用属性:
name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数;
pid:当前进程实例的PID值

2.2 多进程修改全局变量的问题

主要是通过这个案例来说明每个进程的空间都是独立的互不影响的

num = 0
def addnum():
    global num
    for i in range(10):
        num += 1
    print(num)
if __name__ == "__main__":
    p1 = Process(target=addnum)
    p2 = Process(target=addnum)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)

多进程中,每个进程中所有数据(包括全局变量)都各有拥有一份,互不影响

三、进程使用案例

3.1 手动创建多个进程

如果有很多的url,每个url都要提取标题该怎么使用多进程来执行呢?我们当然可以使用一个for循环来创建跟url数量一样多的进程数,但是如果有几千几万的url的时候,这样的进程数量反而会拖垮系统,而且效率非常的低。

3.2 进程池的使用

这时候就需要使用到multiprocessing提供的进程池(Pool)类来解决问题了。

初始化Pool类实例时, 可以指定最大的进程数量,当有新的请求提交到Pool中时,如果池还没满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。

multiprocessing.Pool常用函数解析:

apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退

出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;

apply(func[, args[, kwds]]):使用阻塞方式调用func

close():关闭Pool,使其不再接受新的任务;

terminate():不管任务是否完成,立即终止;

join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用

# -*- coding: utf-8 -*-
"""
https://blog.csdn.net/ 爬虫
"""
from urllib import request, error
from multiprocessing import Pool
import re
import time

header = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                  'AppleWebKit/537.36 (KHTML, like Gecko) '
                  'Chrome/66.0.3359.170 Safari/537.36'
}


def download(url):
    try:
        req = request.Request(url, headers=header)
        resp = request.urlopen(req, timeout=5)
    except (error.HTTPError, error.URLError):
        html = None
    else:
        html = resp.read().decode('utf-8')
    return html


def extract(html):
    title = re.search(r'<title>([^<]+)</title>', html)
    return title.group(1) if title else ""


def spider(url):
    source = download(url)
    if source:
        print(extract(source))
    else:
        print('download err:', url)


if __name__ == '__main__':
    start = time.time()  # 记录开始时间
    pool = Pool(15)
    for link in open('links.txt', encoding='utf-8'):
        p = pool.apply_async(spider, args=(link,))

    pool.close()  # 把池子关闭
    pool.join()  # 阻塞等待池子中的任务全部完成
    print('consume:', time.time() - start)

3.3 通过继承的方式重写Process的run方法创建进程

创建新的进程还能够使用类继承的方式,可以自定义一个类,继承Process类,然后重写run方法,每次实例化这个类的时候,就等同于实例化一个进程对象。

# -*- coding: utf-8 -*-
from urllib import request, error
from multiprocessing import Process
import re


class Spider(Process):
    def __init__(self, url):
        super().__init__()
        self.url = url
        self.title = ""
        self.header = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) '
                          'Chrome/66.0.3359.170 Safari/537.36'
        }

    def run(self):
        print('downloading:', self.url)
        source = self.download()
        if source:
            self.extract(source)
        else:
            print('download err:', self.url)
        print(self.title)

    def download(self):
        try:
            req = request.Request(self.url, headers=self.header)
            resp = request.urlopen(req, timeout=5)
        except (error.HTTPError, error.URLError):
            html = None
        else:
            html = resp.read().decode('utf-8')
        return html

    def extract(self, html):
        title = re.search(r'<title>([^<]+)</title>', html)
        self.title = title.group(1) if title else ""


if __name__ == '__main__':
    s = Spider('https://blog.csdn.net/yH0VLDe8VG8ep9VGe/article/details/81267571')
    s.start()

四、进程间通信

python给进程间的通信提供了好几种解决方法,主要有管道Pipe和队列Queue,这里我们只介绍如果使用队列来实现进程间通信。关于管道的使用如果感兴趣的同学可以自己看下相关的文档。实际的开发工作中队列的方式更常用。

4.1 multiprocessing中Queue的使用

初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
Queue.qsize():返回当前队列包含的消息数量;
Queue.empty():如果队列为空,返回True,反之False ;
Queue.full():如果队列满了,返回True,反之False;
Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True默认阻塞;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则出”Queue.Empty”异常;
2)如果block值为False,消息列队如果为空,则会立刻抛出”Queue.Empty”异常;
Queue.get_nowait():相当Queue.get(False);
Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出”Queue.Full”异常;
2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出”Queue.Full”异常;
Queue.put_nowait(item):相当Queue.put(item, False);

进程池实现生产者消费者模式

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则可能会得到一条如下的错误信息或直接无反应:
RuntimeError: Queue objects should only be shared between processes through inheritance.

发表评论