码迷,mamicode.com
首页 > 编程语言 > 详细

Python并行实例

时间:2019-03-12 09:13:05      阅读:127      评论:0      收藏:0      [点我收藏+]

标签:pre   进程   star   eth   start   use   rgs   pen   queue   

任务

def single():
    # 单进程单线程实现
    s = 0
    for i in range(1, N):
        s += math.sqrt(i)
    return s

结论

  • Python多线程无法利用多核
  • Python多进程可以利用多核
  • Numpy速度远超并行的Python代码
  • twisted无法利用多核

实现:

import math
import multiprocessing
import threading
import timeit

import numpy as np
from twisted.internet import reactor
import time

N = 10000000


def single():
    # 单进程单线程实现
    s = 0
    for i in range(1, N):
        s += math.sqrt(i)
    return s


def useThread():
    # 多线程实现
    total_sum = 0

    def go(beg, end):
        nonlocal total_sum
        s = 0
        for i in range(beg, end):
            s += math.sqrt(i)
        total_sum += s  # python无法利用多核,所以这句话每个时刻只有一个线程在执行

    thread_count = 4
    per = math.ceil(N / thread_count)
    thread_list = []
    for i in range(thread_count):
        th = threading.Thread(target=go, args=(i * per, (i + 1) * per))
        thread_list.append(th)
        th.start()
    for th in thread_list:
        th.join()
    return total_sum


def useMultiprocess():
    # 使用多进程
    def go(q: multiprocessing.Queue, beg, end):
        s = 0
        for i in range(beg, end):
            s += math.sqrt(i)
        q.put(s)

    process_count = 4
    per = math.ceil(N / process_count)
    process_list = []

    q = multiprocessing.Queue()
    for i in range(process_count):
        th = multiprocessing.Process(target=go, args=(q, i * per, (i + 1) * per))
        process_list.append(th)
        th.start()
    for th in process_list:
        th.join()
    total_sum = 0
    try:
        while 1:
            x = q.get_nowait()
            total_sum += x
    except:
        pass
    return total_sum


def useTwisted():
    # reactor是单例模式,一个进程只有一个reactor,一个reactor包括多个线程
    total_sum = 0
    ok_count = 0
    thread_count = 4

    def go(beg, end):
        nonlocal total_sum
        s = 0
        for i in range(beg, end):
            s += math.sqrt(i)
        reactor.callFromThread(accumulate, s)

    def accumulate(s):
        nonlocal total_sum
        nonlocal ok_count
        ok_count += 1
        if ok_count == thread_count:
            reactor.stop()
        total_sum += s

    def process_work(q):
        reactor.suggestThreadPoolSize(thread_count)
        per = math.ceil(N / thread_count)
        for i in range(thread_count):
            reactor.callInThread(go, i * per, i * per + per)
        reactor.run()
        q.put(total_sum)

    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=process_work, args=(q,))
    p.start()
    p.join()
    return q.get()


def useTwisted2():
    # reactor是单例模式,一个进程只有一个reactor,一个reactor包括一个线程
    total_sum = 0
    thread_count = 4
    ok_count = 0
    beg_time = time.time()

    def go(beg, end):
        nonlocal total_sum
        s = 0
        for i in range(beg, end):
            s += math.sqrt(i)
        reactor.callFromThread(accumulate, s)

    def accumulate(s):
        nonlocal total_sum
        nonlocal ok_count
        total_sum += s
        ok_count += 1
        if ok_count == thread_count:
            print(time.time() - beg_time, "value", total_sum)

    reactor.suggestThreadPoolSize(thread_count)
    per = math.ceil(N / thread_count)
    for i in range(thread_count):
        reactor.callInThread(go, i * per, i * per + per)


def useNumpy():
    a = np.linspace(1, N, N)
    return np.sum(np.sqrt(a))


def main():
    for method in (single, useThread, useMultiprocess, useNumpy, useTwisted, useTwisted2):
        print(method.__name__, "result", method(), "time", timeit.timeit(method, number=10))
    reactor.run()


if __name__ == '__main__':
    main()

twisted无法利用多核

from twisted.internet import threads, reactor
import time
import math

beg_time = time.time()


def go():
    print("go start")
    s = 0
    for i in range(10000000):
        s += math.sqrt(i + 1)
    print("go over", time.time() - beg_time)


import timeit

reactor.suggestThreadPoolSize(8)
print(timeit.timeit(go, number=1))
for i in range(10):
    reactor.callInThread(go)
reactor.run()

Python并行实例

标签:pre   进程   star   eth   start   use   rgs   pen   queue   

原文地址:https://www.cnblogs.com/weiyinfu/p/10514432.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有 京ICP备13008772号-2
迷上了代码!