def waitForIPC(ipcPort, timeout, request):
"""Timeout is in seconds."""
#time.sleep(2)
# assume 127.0.0.1 for now
address = "127.0.0.1"
port = int(ipcPort)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# overall timeout in microseconds
timeout_us = timeout * 1000000
# set socket timeout (10 ms per attempt)
sec = 0
usec = 10000
timeval = struct.pack('ll', sec, usec)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval)
count = 0
maxCount = timeout_us / (sec * 1000000 + usec)
response = False
while not response and count < maxCount:
sock.sendto(request, (address, port))
try:
data, addr = sock.recvfrom(65536)
except socket.error as serr:
if serr.errno != errno.EAGAIN: # EAGAIN == Resource Temporarily Unavailable
raise serr
else:
response = len(data) > 0
count += 1
sock.close()
return response
python类SO_RCVTIMEO的实例源码
def _monitor_socket(self):
"""Monitor the local socket for connections.
AE.start(): Monitors the local socket to see if anyone tries to connect
and if so, creates a new association. Separated out from start() to
enable better unit testing
"""
# FIXME: this needs to be dealt with properly
try:
read_list, _, _ = select.select([self.local_socket], [], [], 0)
except (socket.error, ValueError):
return
# If theres a connection
if read_list:
client_socket, _ = self.local_socket.accept()
client_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_RCVTIMEO,
pack('ll', 10, 0))
# Create a new Association
# Association(local_ae, local_socket=None, max_pdu=16382)
assoc = Association(self,
client_socket,
max_pdu=self.maximum_pdu_size,
acse_timeout=self.acse_timeout,
dimse_timeout=self.dimse_timeout)
assoc.start()
self.active_associations.append(assoc)
def connect(self):
"""
Connect to an SSL port using the OpenSSL library and apply
per-connection parameters.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.timeout is not None:
# '0' microseconds
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO,
struct.pack('LL', self.timeout, 0))
self.sock = OpenSSLConnectionDelegator(self.context, sock)
self.sock.connect((self.host, self.port))
def new_accept(orgin_method, self, *args, **kwds):
while True:
return_value = orgin_method(*args, **kwds)
self_socket = return_value[0]
client_ip, client_port = return_value[1][:2]
server_addrs = self._server_addrs
client_list = self._all_client_list.get(server_addrs, {})
if len(client_list) < self._limit_clients_num or client_ip in client_list:
self_socket._server_addrs = self._server_addrs
self_socket.close = self_socket.new_close
logging.debug("[socket] add client %s:%d" %(client_ip, client_port))
if client_list.get(client_ip, None) == None:
client_list.update({client_ip : {"client_num":0, "last_up_time":0}})
client_list[client_ip]["client_num"] += 1
self._all_client_list[server_addrs].update(client_list)
if set_close_timeout:
# set recv_timeout and send_timeout , struct.pack("II", some_num_secs, some_num_microsecs)
self_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("II", recv_timeout, 0))
self_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, struct.pack("II", send_timeout, 0))
return return_value
else:
for k,v in self._all_client_list[server_addrs].copy().items():
last_up_time = v["last_up_time"]
if time.time() - last_up_time > recvfrom_timeout and v["client_num"] < 1:
if set_close_timeout:
self_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("II", recv_timeout, 0))
self_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, struct.pack("II", send_timeout, 0))
logging.info("[socket] remove the client %s" % (k))
del client_list[k]
if client_list.get(client_ip, None) == None:
client_list.update({client_ip : {"client_num":0, "last_up_time":0}})
client_list[client_ip]["client_num"] += 1
self._all_client_list[server_addrs].update(client_list)
self_socket._server_addrs = self._server_addrs
self_socket.close = self_socket.new_close
return return_value
if time.time() - self.last_log_time[0] > 10:
logging.error("[socket] the server_addrs %s client more than %d" % (server_addrs, self._limit_clients_num))
self.last_log_time[0] = time.time()
self_socket.close()
# ??Udp??
def test_scp_assoc_a_abort_reply(self):
"""Test the SCP sending an A-ABORT instead of an A-ASSOCIATE response"""
class DummyAE(threading.Thread, AE):
"""Dummy AE used for testing"""
def __init__(self, scp_sop_class, port):
"""Initialise the class"""
AE.__init__(self, scp_sop_class=scp_sop_class, port=port)
threading.Thread.__init__(self)
self.daemon = True
def run(self):
"""The thread run method"""
self.start_scp()
def start_scp(self):
"""new runner"""
self._bind_socket()
while True:
try:
if self._quit:
break
self._monitor_socket()
self.cleanup_associations()
except KeyboardInterrupt:
self.stop()
def _monitor_socket(self):
"""Override the normal method"""
try:
read_list, _, _ = select.select([self.local_socket], [], [], 0)
except (socket.error, ValueError):
return
# If theres a connection
if read_list:
client_socket, _ = self.local_socket.accept()
client_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_RCVTIMEO,
pack('ll', 10, 0))
# Create a new Association
# Association(local_ae, local_socket=None, max_pdu=16382)
assoc = Association(self,
client_socket,
max_pdu=self.maximum_pdu_size,
acse_timeout=self.acse_timeout,
dimse_timeout=self.dimse_timeout)
# Set the ACSE to abort association requests
assoc._a_abort_assoc_rq = True
assoc.start()
self.active_associations.append(assoc)
scp = DummyAE(scp_sop_class=[VerificationSOPClass], port=11112)
scp.start()
ae = AE(scu_sop_class=[VerificationSOPClass])
assoc = ae.associate('localhost', 11112)
self.assertFalse(assoc.is_established)
scp.stop()
def test_scp_assoc_ap_abort_reply(self):
"""Test the SCP sending an A-ABORT instead of an A-ASSOCIATE response"""
class DummyAE(threading.Thread, AE):
"""Dummy AE used for testing"""
def __init__(self, scp_sop_class, port):
"""Initialise the class"""
AE.__init__(self, scp_sop_class=scp_sop_class, port=port)
threading.Thread.__init__(self)
self.daemon = True
def run(self):
"""The thread run method"""
self.start_scp()
def start_scp(self):
"""new runner"""
self._bind_socket()
while True:
try:
if self._quit:
break
self._monitor_socket()
self.cleanup_associations()
except KeyboardInterrupt:
self.stop()
def _monitor_socket(self):
"""Override the normal method"""
try:
read_list, _, _ = select.select([self.local_socket], [], [], 0)
except ValueError:
return
# If theres a connection
if read_list:
client_socket, _ = self.local_socket.accept()
client_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_RCVTIMEO,
pack('ll', 10, 0))
# Create a new Association
# Association(local_ae, local_socket=None, max_pdu=16382)
assoc = Association(self,
client_socket,
max_pdu=self.maximum_pdu_size,
acse_timeout=self.acse_timeout,
dimse_timeout=self.dimse_timeout)
# Set the ACSE to abort association requests
assoc._a_p_abort_assoc_rq = True
assoc.start()
self.active_associations.append(assoc)
scp = DummyAE(scp_sop_class=[VerificationSOPClass], port=11112)
scp.start()
ae = AE(scu_sop_class=[VerificationSOPClass])
assoc = ae.associate('localhost', 11112)
self.assertFalse(assoc.is_established)
scp.stop()