def connect(self) -> bool:
""" Establish a long running connection to EPMD, will not return until
the connection has been established.
:return: True
"""
while True:
try:
print("EPMD: Connecting %s:%d" % (self.host_, self.port_))
host_port = (self.host_, self.port_)
self.sock_ = socket.create_connection(address=host_port,
timeout=5.0)
break # the connect loop
except socket.error as err:
print("EPMD: connection error:", err)
gevent.sleep(5)
print("EPMD: Socket connected")
return True
python类create_connection()的实例源码
def _fire_forget_query(ip: str, query: bytes) -> bytes:
""" Connect to node, fire the query, read and disconnect. """
s = socket.create_connection(address=(ip, EPMD_DEFAULT_PORT),
timeout=EPMD_REMOTE_DEFAULT_TIMEOUT)
query1 = util.to_u16(len(query)) + query
s.send(query1)
# Expect that after everything is received, the peer will close
# the socket automatically, so we will too
result = b''
while True:
incoming = s.recv(4096)
if incoming == b'':
break
result += incoming
s.close()
return result
def get_server_certificate(addr, ssl_version=PROTOCOL_SSLv23, ca_certs=None):
"""Retrieve the certificate from the server at the specified address,
and return it as a PEM-encoded string.
If 'ca_certs' is specified, validate the server cert against it.
If 'ssl_version' is specified, use it in the connection attempt."""
_, _ = addr
if ca_certs is not None:
cert_reqs = CERT_REQUIRED
else:
cert_reqs = CERT_NONE
context = _create_stdlib_context(ssl_version,
cert_reqs=cert_reqs,
cafile=ca_certs)
with closing(create_connection(addr)) as sock:
with closing(context.wrap_socket(sock)) as sslsock:
dercert = sslsock.getpeercert(True)
return DER_cert_to_PEM_cert(dercert)
def get_server_certificate(addr, ssl_version=PROTOCOL_SSLv23, ca_certs=None):
"""Retrieve the certificate from the server at the specified address,
and return it as a PEM-encoded string.
If 'ca_certs' is specified, validate the server cert against it.
If 'ssl_version' is specified, use it in the connection attempt."""
_, _ = addr
if ca_certs is not None:
cert_reqs = CERT_REQUIRED
else:
cert_reqs = CERT_NONE
context = _create_stdlib_context(ssl_version,
cert_reqs=cert_reqs,
cafile=ca_certs)
with closing(create_connection(addr)) as sock:
with closing(context.wrap_socket(sock)) as sslsock:
dercert = sslsock.getpeercert(True)
return DER_cert_to_PEM_cert(dercert)
def connect_with(protocol_class, host_port: tuple,
args: list, kwargs: dict):
""" Helper which creates a new connection and feeds the data stream into
a protocol handler class.
:rtype: tuple(protocol_class, gevent.socket)
:type protocol_class: class
:param protocol_class: A handler class which has handler functions like
on_connected, consume, and on_connection_lost
:param kwargs: Keyword args to pass to the handler class constructor
:param args: Args to pass to the handler class constructor
:param host_port: (host,port) tuple where to connect
"""
sock = socket.create_connection(address=host_port)
handler = protocol_class(*args, **kwargs)
handler.on_connected(sock, host_port)
print("Connection to %s established" % str(host_port))
try:
g = gevent.spawn(_handle_socket_read, handler, sock)
g.start()
except Exception as e:
print("\nException: %s" % e)
traceback.print_exc()
print()
return handler, sock
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
If *dns* is true, also patch dns functions in :mod:`socket`.
"""
from gevent import socket
_socket = __import__('socket')
_socket.socket = socket.socket
_socket.SocketType = socket.SocketType
_socket.create_connection = socket.create_connection
if hasattr(socket, 'socketpair'):
_socket.socketpair = socket.socketpair
if hasattr(socket, 'fromfd'):
_socket.fromfd = socket.fromfd
try:
from gevent.socket import ssl, sslerror
_socket.ssl = ssl
_socket.sslerror = sslerror
except ImportError:
if aggressive:
try:
del _socket.ssl
except AttributeError:
pass
if dns:
patch_dns()
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
If *dns* is true, also patch dns functions in :mod:`socket`.
"""
from gevent import socket
_socket = __import__('socket')
_socket.socket = socket.socket
_socket.SocketType = socket.SocketType
_socket.create_connection = socket.create_connection
if hasattr(socket, 'socketpair'):
_socket.socketpair = socket.socketpair
if hasattr(socket, 'fromfd'):
_socket.fromfd = socket.fromfd
try:
from gevent.socket import ssl, sslerror
_socket.ssl = ssl
_socket.sslerror = sslerror
except ImportError:
if aggressive:
try:
del _socket.ssl
except AttributeError:
pass
if dns:
patch_dns()
def handle(self, source, address): # pylint:disable=method-hidden
log('%s:%s accepted', *address[:2])
try:
dest = create_connection(self.dest)
except IOError as ex:
log('%s:%s failed to connect to %s:%s: %s', address[
0], address[1], self.dest[0], self.dest[1], ex)
return
forwarders = (gevent.spawn(forward, source, dest, self),
gevent.spawn(forward, dest, source, self))
gevent.joinall(forwarders)
def _connect(self):
sock = socket.create_connection((self.host, self.port))
def handle():
try:
while True:
self.command_queue.put_nowait(_recv_obj(sock))
except Exception as e:
try:
sock.close()
except:
pass
gevent.spawn(handle)
return sock
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
If *dns* is true, also patch dns functions in :mod:`socket`.
"""
from gevent import socket
_socket = __import__('socket')
_socket.socket = socket.socket
_socket.SocketType = socket.SocketType
_socket.create_connection = socket.create_connection
if hasattr(socket, 'socketpair'):
_socket.socketpair = socket.socketpair
if hasattr(socket, 'fromfd'):
_socket.fromfd = socket.fromfd
try:
from gevent.socket import ssl, sslerror
_socket.ssl = ssl
_socket.sslerror = sslerror
except ImportError:
if aggressive:
try:
del _socket.ssl
except AttributeError:
pass
if dns:
patch_dns()
def send_message_to_slave(message, address):
try:
payload = generate_msgpack_message_payload(message)
except TypeError:
logger.exception('Failed encoding message %s as msgpack', message)
metrics.incr('rpc_message_pass_fail_cnt')
return False
pretty_address = '%s:%s' % address
message_id = message.get('message_id', '?')
try:
s = socket.create_connection(address)
s.send(payload)
sender_resp = msgpack_unpack_msg_from_socket(s)
s.close()
except socket.error:
logging.exception('Failed connecting to %s to send message (ID %s)',
pretty_address, message_id)
metrics.incr('rpc_message_pass_fail_cnt')
return False
if sender_resp == 'OK':
access_logger.info('Successfully passed message (ID %s) to %s for sending',
message_id, pretty_address)
metrics.incr('rpc_message_pass_success_cnt')
return True
else:
logger.error('Failed sending message (ID %s) through %s: %s',
message_id, pretty_address, sender_resp)
metrics.incr('rpc_message_pass_fail_cnt')
return False
def handle( self, source, address ):
global currentEndpoints
try:
if 0 == len( currentEndpoints ): return
print( "Connection from %s" % str( address ) )
try:
source.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
source.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 5 )
source.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10 )
source.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 2 )
except:
print( "Failed to set keepalive on source connection" )
try:
dest = create_connection( random.sample( currentEndpoints, 1 )[ 0 ] )
except:
print( "Failed to connect to EndpointProcessor" )
else:
try:
try:
dest.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
dest.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 5 )
dest.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10 )
dest.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 2 )
except:
print( "Failed to set keepalive on dest connection" )
# Send a small connection header that contains the original
# source of the connection.
connectionHeaders = msgpack.packb( address )
dest.sendall( struct.pack( '!I', len( connectionHeaders ) ) )
dest.sendall( connectionHeaders )
gevent.joinall( ( gevent.spawn( forward, source, dest, address, self ),
gevent.spawn( forward, dest, source, address, self ) ) )
finally:
dest.close()
finally:
source.close()
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
If *dns* is true, also patch dns functions in :mod:`socket`.
"""
from gevent import socket
_socket = __import__('socket')
_socket.socket = socket.socket
_socket.SocketType = socket.SocketType
_socket.create_connection = socket.create_connection
if hasattr(socket, 'socketpair'):
_socket.socketpair = socket.socketpair
if hasattr(socket, 'fromfd'):
_socket.fromfd = socket.fromfd
try:
from gevent.socket import ssl, sslerror
_socket.ssl = ssl
_socket.sslerror = sslerror
except ImportError:
if aggressive:
try:
del _socket.ssl
except AttributeError:
pass
if dns:
patch_dns()