def _accept_new_connection(self, s):
# accepting the connection
clt_sock, clt_info = s.accept()
# Getting the service ability
new_abl = self.callback()
# Giving to the service ability the informations about the client
new_abl.set_opt(self.client_info_name, '{}:{}'.format(clt_info[0], clt_info[1]))
# Creating the pipes
in_pipe_in, in_pipe_out = multiprocessing.Pipe()
out_pipe_in, out_pipe_out = multiprocessing.Pipe()
new_abl.add_in_pipe(in_pipe_out)
new_abl.add_out_pipe(out_pipe_in)
# Starting the service ability
new_abl.start()
return clt_sock, in_pipe_in, out_pipe_out, new_abl
python类Pipe()的实例源码
def _accept_new_connection(self, s):
# accepting the connection
clt_sock, clt_info = s.accept()
# Getting the service ability
new_abl = self.callback()
# Giving to the service ability the information about the client
if not isinstance(self.client_info_name, type(None)):
new_abl.set_opt(self.client_info_name, '{}:{}'.format(clt_info[0], clt_info[1]))
# Creating the pipes
in_pipe_in, in_pipe_out = multiprocessing.Pipe()
out_pipe_in, out_pipe_out = multiprocessing.Pipe()
new_abl.add_in_pipe(in_pipe_out)
new_abl.add_out_pipe(out_pipe_in)
# Starting the service ability
new_abl.start()
return clt_sock, in_pipe_in, out_pipe_out, new_abl
def test_basic_usage(self):
""""""
pipp, pipc = Pipe()
pips, pipr = Pipe()
self.print_bar()
print('Test Task Interface')
ret_process = ProcessTask(id='test-1', target=testfun, args=(5,),
status_monitor_pipe=pipc,
result_pipe=pips,
result_hook_function=result_callback)
ret_process.start()
print('Test get threads status')
time.sleep(1)
#print(ret_process.subthreads_count)
threads_status = pipp.recv()
self.assertIsInstance(threads_status, dict)
#print pipr.recv()
#print pipr.recv()
#print pipr.recv()
#print pipr.recv()
self.print_end_bar()
def test_basic_usage(self):
""""""
pipp, pipc = Pipe()
pips, pipr = Pipe()
self.print_bar()
print('Test Task Interface')
ret_process = ProcessTask(id='test-1', target=testfun, args=(5,),
status_monitor_pipe=pipc,
result_pipe=pips,
result_hook_function=result_callback)
ret_process.start()
print('Test get threads status')
time.sleep(1)
#print(ret_process.subthreads_count)
threads_status = pipp.recv()
self.assertIsInstance(threads_status, dict)
#print pipr.recv()
#print pipr.recv()
#print pipr.recv()
#print pipr.recv()
self.print_end_bar()
def runpool():
parsecli()
try:
httpd = ThreadedTCPServer((LISTEN, PORT), Proxy)
except OSError as e:
print(e)
return
mainsock = httpd.socket
if hasattr(socket, "fromshare"):
workers = MAX_WORKERS
for i in range(workers-1):
(pipeout, pipein) = multiprocessing.Pipe()
p = multiprocessing.Process(target=start_worker, args=(pipeout,))
p.daemon = True
p.start()
while p.pid == None:
time.sleep(1)
pipein.send(mainsock.share(p.pid))
serve_forever(httpd)
def test_recursion(self):
rconn, wconn = self.Pipe(duplex=False)
self._test_recursion(wconn, [])
time.sleep(DELTA)
result = []
while rconn.poll():
result.append(rconn.recv())
expected = [
[],
[0],
[0, 0],
[0, 1],
[1],
[1, 0],
[1, 1]
]
self.assertEqual(result, expected)
def test_thousand(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
passes = 1000
lock = self.Lock()
conn, child_conn = self.Pipe(False)
for j in range(self.N):
p = self.Process(target=self._test_thousand_f,
args=(self.barrier, passes, child_conn, lock))
p.start()
for i in range(passes):
for j in range(self.N):
self.assertEqual(conn.recv(), i)
#
#
#
def test_spawn_close(self):
# We test that a pipe connection can be closed by parent
# process immediately after child is spawned. On Windows this
# would have sometimes failed on old versions because
# child_conn would be closed before the child got a chance to
# duplicate it.
conn, child_conn = self.Pipe()
p = self.Process(target=self._echo, args=(child_conn,))
p.daemon = True
p.start()
child_conn.close() # this might complete before child initializes
msg = latin('hello')
conn.send_bytes(msg)
self.assertEqual(conn.recv_bytes(), msg)
conn.send_bytes(SENTINEL)
conn.close()
p.join()
def test_fd_transfer(self):
if self.TYPE != 'processes':
self.skipTest("only makes sense with processes")
conn, child_conn = self.Pipe(duplex=True)
p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
p.daemon = True
p.start()
self.addCleanup(test.support.unlink, test.support.TESTFN)
with open(test.support.TESTFN, "wb") as f:
fd = f.fileno()
if msvcrt:
fd = msvcrt.get_osfhandle(fd)
reduction.send_handle(conn, fd, p.pid)
p.join()
with open(test.support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"foo")
def test_dont_merge(self):
a, b = self.Pipe()
self.assertEqual(a.poll(0.0), False)
self.assertEqual(a.poll(0.1), False)
p = self.Process(target=self._child_dont_merge, args=(b,))
p.start()
self.assertEqual(a.recv_bytes(), b'a')
self.assertEqual(a.poll(1.0), True)
self.assertEqual(a.poll(1.0), True)
self.assertEqual(a.recv_bytes(), b'b')
self.assertEqual(a.poll(1.0), True)
self.assertEqual(a.poll(1.0), True)
self.assertEqual(a.poll(0.0), True)
self.assertEqual(a.recv_bytes(), b'cd')
p.join()
#
# Test of sending connection and socket objects between processes
#
def test_wait_timeout(self):
from multiprocessing.connection import wait
expected = 5
a, b = multiprocessing.Pipe()
start = time.time()
res = wait([a, b], expected)
delta = time.time() - start
self.assertEqual(res, [])
self.assertLess(delta, expected * 2)
self.assertGreater(delta, expected * 0.5)
b.send(None)
start = time.time()
res = wait([a, b], 20)
delta = time.time() - start
self.assertEqual(res, [a])
self.assertLess(delta, 0.4)
def test_ignore(self):
conn, child_conn = multiprocessing.Pipe()
try:
p = multiprocessing.Process(target=self._test_ignore,
args=(child_conn,))
p.daemon = True
p.start()
child_conn.close()
self.assertEqual(conn.recv(), 'ready')
time.sleep(0.1)
os.kill(p.pid, signal.SIGUSR1)
time.sleep(0.1)
conn.send(1234)
self.assertEqual(conn.recv(), 1234)
time.sleep(0.1)
os.kill(p.pid, signal.SIGUSR1)
self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
time.sleep(0.1)
p.join()
finally:
conn.close()
def test_ignore_listener(self):
conn, child_conn = multiprocessing.Pipe()
try:
p = multiprocessing.Process(target=self._test_ignore_listener,
args=(child_conn,))
p.daemon = True
p.start()
child_conn.close()
address = conn.recv()
time.sleep(0.1)
os.kill(p.pid, signal.SIGUSR1)
time.sleep(0.1)
client = multiprocessing.connection.Client(address)
self.assertEqual(client.recv(), 'welcome')
p.join()
finally:
conn.close()
def __init__(self, maxsize=0):
if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize
self._reader, self._writer = Pipe(duplex=False)
self._rlock = Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
self._after_fork()
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
def listen_and_send(self, send_data):
'''Listen on socket and send data in response.
:param bytes send_data: data to send
'''
self.socket_pipe, child_pipe = multiprocessing.Pipe()
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.bind(os.path.join(self.socket_dir, 'sock'))
self.socket.listen(1)
def worker(sock, pipe, send_data_):
conn, addr = sock.accept()
pipe.send(conn.makefile('rb').read())
conn.sendall(send_data_)
conn.close()
self.proc = multiprocessing.Process(target=worker,
args=(self.socket, child_pipe, send_data))
self.proc.start()
self.socket.close()
def test_basic_usage(self):
""""""
pipp, pipc = Pipe()
pips, pipr = Pipe()
self.print_bar()
print('Test Task Interface')
ret_process = ProcessTask(id='test-1', target=testfun, args=(5,),
status_monitor_pipe=pipc,
result_pipe=pips,
result_hook_function=result_callback)
ret_process.start()
print('Test get threads status')
time.sleep(1)
#print(ret_process.subthreads_count)
threads_status = pipp.recv()
self.assertIsInstance(threads_status, dict)
#print pipr.recv()
#print pipr.recv()
#print pipr.recv()
#print pipr.recv()
self.print_end_bar()
def newproc(self):
global plock
self.timer_update()
self.pq, self.cq = Queue(1), Queue(1) # two queue needed
# self.pc, self.cc = Pipe()
self.p = Process(
target = standalone_headless_isolated,
args=(self.pq, self.cq, plock)
)
self.p.daemon = True
self.p.start()
self.reset_count = 0 # how many times has this instance been reset() ed
self.step_count = 0
self.timer_update()
return
# send x to the process
def blocking_run(self):
parent_conn, child_conn = Pipe()
q = Queue()
q.put(self.parameters)
self.p = Process(target=job_process, args=(self.job_id, self.job_class, q, child_conn, self.server_url, self.log_filename, ))
self.p.start()
while self.p.is_alive():
while parent_conn.poll():
self.output_recieved_from_job(parent_conn.recv())
time.sleep(1)
self.p.join()
while parent_conn.poll():
self.output_recieved_from_job(parent_conn.recv())
if self.terminated:
self.result = {'job_id':self.job_id, 'success':False, 'retcode':1, 'exception':'Terminated by server', 'progress':'terminated'}
else:
self.result = q.get()
self.result['progress'] = self.status
parent_conn.close()
return self.result
def __init__(self, conn, keepAlive=60):
self.conn = conn
self.connLock = threading.Lock()
self.requests = multiprocessing.Pipe(False)
self.keepAlive = keepAlive
self.resTable = {}
self.resTableLock = threading.Lock()
self.lastTaskId = 0
self.workers = []
self.lastRead = time.time()
self._stopping = False
self._stoppingLock = threading.Lock()
self.startWorker(self.sendWorker)
self.startWorker(self.recvWorker)
self.startWorker(self.mainWorker)
atexit.register(self.cancel)
def _pipe(self):
"""On Windows we use a pipe to emulate a Linux style character
buffer."""
if NIX:
return None
if not self.__pipe:
target_function = self._get_target_function()
if not target_function:
return None
self.__pipe, child_conn = Pipe(duplex=False)
self._listener = Process(target=target_function,
args=(child_conn,))
self._listener.start()
return self.__pipe