多进程或者多线程等并行加速目前已经不是什么难事了,相信很多读者都体验过。一般来说,我们会有这样的结论:多进程的加速比很难达到1。换句话说,当你用10进程去并行跑一个任务时,一般只能获得不到10倍的加速,而且进程越多,这个加速比往往就越低。

要注意,我们刚才说“很难达到1”,说明我们的潜意识里就觉得加速比最多也就是1。理论上确实是的,难不成用10进程还能获得20倍的加速?这不是天上掉馅饼吗?不过我前几天确实碰到了一个加速比远大于1的例子,所以在这里跟大家分享一下。

词频统计 #

我的原始任务是统计词频:我有很多文章,然后我们要对这些文章进行分词,最后汇总出一个词频表出来。一般的写法是这样的:

tokens = {}

for text in read_texts():
    for token in tokenize(text):
        tokens[token] = tokens.get(token, 0) + 1

这种写法在我统计THUCNews全部文章的词频时,大概花了20分钟。

多进程版本 #

然后,我们来比较一下多进程版的。多进程的写法我在《Python的多进程编程技巧》一文已经介绍过了,为了方便重复使用,我就将其封装为一个函数了:

def parallel_apply(func,
                   iterable,
                   workers,
                   max_queue_size,
                   callback=None,
                   dummy=False):
    """多进程或多线程地将func应用到iterable的每个元素中。
    注意这个apply是异步且无序的,也就是说依次输入a,b,c,但是
    输出可能是func(c), func(a), func(b)。
    参数:
        dummy: False是多进程/线性,True则是多线程/线性;
        callback: 处理单个输出的回调函数;
    """
    if dummy:
        from multiprocessing.dummy import Pool, Queue
    else:
        from multiprocessing import Pool, Queue
    from six.moves import queue

    in_queue, out_queue = Queue(max_queue_size), Queue()

    def worker_step(in_queue, out_queue):
        # 单步函数包装成循环执行
        while True:
            d = in_queue.get()
            r = func(d)
            out_queue.put(r)

    # 启动多进程/线程
    pool = Pool(workers, worker_step, (in_queue, out_queue))

    if callback is None:
        results = []

    # 后处理函数
    def process_out_queue():
        out_count = 0
        for _ in range(out_queue.qsize()):
            d = out_queue.get()
            out_count += 1
            if callback is None:
                results.append(d)
            else:
                callback(d)
        return out_count

    # 存入数据,取出结果
    in_count, out_count = 0, 0
    for d in iterable:
        in_count += 1
        while True:
            try:
                in_queue.put(d, block=False)
                break
            except queue.Full:
                out_count += process_out_queue()
        if in_count % max_queue_size == 0:
            out_count += process_out_queue()

    while out_count != in_count:
        out_count += process_out_queue()

    pool.terminate()

    if callback is None:
        return results

调用这个函数来多进程统计词频,大致代码如下:

def _batch_texts():
    texts = []
    for text in read_texts():
        texts.append(text)
        if len(texts) == 1000:
            yield texts
            texts = []
    if texts:
        yield texts

def _tokenize_and_count(texts):
    tokens = {}
    for text in texts:
        for token in tokenize(text):
            tokens[token] = tokens.get(token, 0) + 1
    return tokens

tokens = {}
def _total_count(result):
    for k, v in result.items()
        tokens[k] = tokens.get(k, 0) + v

# 10进程来完成词频统计
parallel_apply(
    func=_tokenize_and_count,
    iterable=_batch_texts(),
    workers=10,
    max_queue_size=200,
    callback=_total_count,
)

整个流程是:_batch_texts将文本按批划分,每批为1000个文本;_tokenize_and_count用来对每一批样本进行统计;_total_count对每一批样本的结果进行汇总;最后parallel_apply用10进程实现这个过程。

这个用时多少呢?结果是55秒!这意味着加速20倍,加速比是2!

原理分析 #

为什么能实现大于1的加速比呢?其实,原因在于最开始的单进程实现中,tokens[token] = tokens.get(token, 0) + 1一句会越来越慢,因为随着统计的推进,tokens里边的元素越来越多,对tokens的增删改查就会越来越慢。

而在多进程版本中,tokens[token] = tokens.get(token, 0) + 1一句只对不超过1000个样本执行,显然会一直保持很快的速度,最后的合并统计结果虽然对tokens的读写也很频繁,但远比不上原始实现的读写频率,因此也是很快的。所以多进程版本就可以实现20倍的加速,而不仅仅是理论上的极限10倍。

当然,读者可能已经感觉到,这并不是真正地让加速比超过了1,而是原始的单进程版写得不好的表象,换成下述代码就好了:

count = 0
tokens = {}
_tokens = {}

for text in read_texts():
    for token in tokenize(text):
        _tokens[token] = _tokens.get(token, 0) + 1
    count += 1
    if count == 1000:
        for k, v in _tokens.items():
            tokens[k] = tokens.get(k, 0) + v
        count = 0
        _tokens = {}

for k, v in _tokens.items():
    tokens[k] = tokens.get(k, 0) + v

也就还是分批统计再汇总的做法,只不过这是单进程的,看上去这种写法很迂回,很不直观,但事实上只用了8分钟,大约只是原来版本的三分之一!由此可见,实际上的加速比大约是0.8。

本文小结 #

文本简单讨论了一下Python的多进程问题,给出了一个看上去加速比可以大于1的例子,然后分析了其原因。从侧面来看,这其实也给我们写类似的代码提了个醒:哪怕在单进程的情况下,分批计算然后在汇总的效率,也通常会高于一整批一次性计算。

转载到请包括本文地址:http://hvdu.cn/archives/7031

更详细的转载事宜请参考:《大发红黑大战-大发棋牌玩法空间FAQ》

如果您还有什么疑惑或建议,欢迎在下方评论区继续讨论。

如果您觉得本文还不错,欢迎分享/打赏本文。打赏并非要从中获得收益,而是希望知道大发红黑大战-大发棋牌玩法空间获得了多少读者的真心关注。当然,如果你无视它,也不会影响你的阅读。再次表示欢迎和感谢!

如果您需要引用本文,请参考:

苏剑林. (2019, Oct 27). 《什么时候多进程的加速比可以大于1? 》[Blog post]. Retrieved from http://hvdu.cn/archives/7031