具有共享变量(值)的Python多处理Pool.apply_async

发布于 2021-01-29 16:02:49

在我的大学项目中,我试图开发一个基于python的流量生成器。我在vmware上创建了2台CentOS计算机,我将1台用作客户端,将1台用作服务器。我已经使用IP别名技术来增加客户端数量,并且仅使用一台客户端/服务器计算机就可以进行服务。到目前为止,我已经在客户端计算机上创建了50个IP别名,并在服务器计算机上创建了10个IP别名。我还使用多处理模块来同时生成从所有50个客户端到所有10个服务器的流量。我还在服务器上(由于使用Apache服务器,所以在/
var / www /
html目录中)开发了一些配置文件(1kb,10kb,50kb,100kb,500kb,1mb),并且我使用urllib2从这些配置文件发送请求到这些配置文件我的客户端计算机。我正在使用httplib
+ urllib2
首先绑定到任何一个源别名IP,然后使用urllib2从该IP发送请求。在这里,为了增加TCP连接数,我尝试使用multiprocessing.Pool.apply_async模块。但是我在运行脚本时收到此错误“
RuntimeError:同步的对象仅应通过继承在进程之间共享”。经过一些调试后,我发现此错误是由于使用multiprocessing.Value引起的。但是我想在进程之间共享一些变量,并且我还想增加TCP连接的数量。在这里可以使用什么其他模块(multiprocessing.Value除外)来共享一些公共变量?否则此查询还有其他解决方案吗?

'''
Traffic Generator Script:

 Here I have used IP Aliasing to create multiple clients on single vm machine. 
 Same I have done on server side to create multiple servers. I have around 50 clients and 10 servers
'''
import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script
m=multiprocessing.Manager()
response_time=m.list()    #some shared variables
error_count=multiprocessing.Value('i',0)
def send_request3():    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
def send_request4():    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
#50 such functions are defined here for 50 clients
def func():
    pool=multiprocessing.Pool(processes=750)
    for i in range(5):
        pool.apply_async(send_request3)
        pool.apply_async(send_request4)
        pool.apply_async(send_request5)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return
start=float(time.time())
func()
end=float(time.time())-start
print end
关注者
0
被浏览
561
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    如错误消息所述,您无法multiprocessing.Value通过泡菜。但是,您可以使用multiprocessing.Manager().Value

    import multiprocessing
    import urllib2
    import random
    import myurllist    #list of all destination urls for all 10 servers
    import time
    import socbindtry   #script that binds various virtual/aliased client ips to the script
    
    def send_request3(response_time, error_count):    #function to send requests from alias client ip 1
        opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
        try:
            tstart=time.time()
            for i in range(myurllist.url):
                x=random.choice(myurllist.url[i])
                opener.open(x).read()
                print "file downloaded:",x
                response_time.append(time.time()-tstart)
        except urllib2.URLError, e:
            with error_count.get_lock():
                error_count.value += 1
    
    def send_request4(response_time, error_count):    #function to send requests from alias client ip 2
        opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
        try:
            tstart=time.time()
            for i in range(myurllist.url):
                x=random.choice(myurllist.url[i])
                opener.open(x).read()
                print "file downloaded:",x
                response_time.append(time.time()-tstart)
        except urllib2.URLError, e:
            with error_count.get_lock():
                error_count.value += 1
    
    #50 such functions are defined here for 50 clients
    
    def func(response_time, error_count):
        pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count())
        args = (response_time, error_count)
        for i in range(5):
            pool.apply_async(send_request3, args=args)
            pool.apply_async(send_request4, args=args)
    #append 50 functions here
        pool.close()
        pool.join()
        print"All work Done..!!"
        return
    
    if __name__ == "__main__":
        m=multiprocessing.Manager()
        response_time=m.list()    #some shared variables
        error_count=m.Value('i',0)
    
        start=float(time.time())
        func(response_time, error_count)
        end=float(time.time())-start
        print end
    

    其他一些注意事项:

    1. 使用Pool750个流程不是一个好主意。除非您使用具有数百个CPU内核的服务器,否则这将使您的计算机不堪重负。这样可以更快,更省力地使用更少的进程。更像2 * multiprocessing.cpu_count()
    2. 最佳做法是,应将需要使用的所有共享参数明确传递给子进程,而不要使用全局变量。这增加了在Windows上运行代码的机会。
    3. 看起来您的所有send_request*函数几乎都做同样的事情。为什么不仅仅执行一个函数并使用一个变量来决定socbindtry.BindableHTTPHandler使用哪个呢?这样可以避免 大量 的代码重复。
    4. 您递增的error_count方式不是进程/线程安全的,并且容易受到竞争条件的影响。您需要使用锁来保护增量(就像我在上面的示例代码中所做的那样)。


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看