def create_connection(dest_pair, proxy_type=None, proxy_addr=None,
proxy_port=None, proxy_username=None,
proxy_password=None, timeout=None):
"""create_connection(dest_pair, *[, timeout], **proxy_args) -> socket object
Like socket.create_connection(), but connects to proxy
before returning the socket object.
dest_pair - 2-tuple of (IP/hostname, port).
**proxy_args - Same args passed to socksocket.set_proxy().
timeout - Optional socket timeout value, in seconds.
"""
sock = socksocket()
if isinstance(timeout, (int, float)):
sock.settimeout(timeout)
sock.set_proxy(proxy_type, proxy_addr, proxy_port,
proxy_username, proxy_password)
sock.connect(dest_pair)
return sock
python类create_connection()的实例源码
def connect(self, host='', port=0, timeout=-999):
'''Connect to host. Arguments are:
- host: hostname to connect to (string, default previous host)
- port: port to connect to (integer, default previous port)
'''
if host != '':
self.host = host
if port > 0:
self.port = port
if timeout != -999:
self.timeout = timeout
self.sock = socket.create_connection((self.host, self.port), self.timeout)
self.af = self.sock.family
self.file = self.sock.makefile('rb')
self.welcome = self.getresp()
return self.welcome
def _new_conn(self):
""" Establish a socket connection and set nodelay settings on it
:return: a new socket connection
"""
try:
conn = socket.create_connection(
(self.host, self.port),
self.timeout,
self.source_address,
)
except AttributeError: # Python 2.6
conn = socket.create_connection(
(self.host, self.port),
self.timeout,
)
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,
self.tcp_nodelay)
return conn
def connect(self):
sock = socket.create_connection(
(self.host, self.port), getattr(self, 'source_address', None)
)
# Handle the socket if a (proxy) tunnel is present
if hasattr(self, '_tunnel') and getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
self.sock = ssl.wrap_socket(
sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_bundle
)
try:
match_hostname(self.sock.getpeercert(), self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def create_connection(dest_pair, proxy_type=None, proxy_addr=None,
proxy_port=None, proxy_username=None,
proxy_password=None, timeout=None):
"""create_connection(dest_pair, *[, timeout], **proxy_args) -> socket object
Like socket.create_connection(), but connects to proxy
before returning the socket object.
dest_pair - 2-tuple of (IP/hostname, port).
**proxy_args - Same args passed to socksocket.set_proxy().
timeout - Optional socket timeout value, in seconds.
"""
sock = socksocket()
if isinstance(timeout, (int, float)):
sock.settimeout(timeout)
sock.set_proxy(proxy_type, proxy_addr, proxy_port,
proxy_username, proxy_password)
sock.connect(dest_pair)
return sock
def connect(self, host='', port=0, timeout=-999):
'''Connect to host. Arguments are:
- host: hostname to connect to (string, default previous host)
- port: port to connect to (integer, default previous port)
'''
if host != '':
self.host = host
if port > 0:
self.port = port
if timeout != -999:
self.timeout = timeout
self.sock = socket.create_connection((self.host, self.port), self.timeout)
self.af = self.sock.family
self.file = self.sock.makefile('rb')
self.welcome = self.getresp()
return self.welcome
def get_server_certificate(addr, ssl_version=PROTOCOL_SSLv23, ca_certs=None):
"""Retrieve the certificate from the server at the specified address,
and return it as a PEM-encoded string.
If 'ca_certs' is specified, validate the server cert against it.
If 'ssl_version' is specified, use it in the connection attempt."""
host, port = addr
if ca_certs is not None:
cert_reqs = CERT_REQUIRED
else:
cert_reqs = CERT_NONE
context = _create_stdlib_context(ssl_version,
cert_reqs=cert_reqs,
cafile=ca_certs)
with closing(create_connection(addr)) as sock:
with closing(context.wrap_socket(sock)) as sslsock:
dercert = sslsock.getpeercert(True)
return DER_cert_to_PEM_cert(dercert)
def wait_net_port(ip, port, timeout, try_interval=2):
LOG.debug("Waiting for {0}:{1} to become available.".format(ip, port))
end = time.time() + timeout
while time.time() < end:
try:
s = socket.create_connection((ip, port), timeout=5)
except socket.timeout:
# cannot connect after timeout
continue
except socket.error as ex:
# cannot connect immediately (e.g. no route)
# wait timeout before next try
LOG.debug("Wait cycle msg: {0}".format(repr(ex)))
time.sleep(try_interval)
continue
else:
# success!
s.close()
return
raise PublicPortWaitTimeoutException()
def connect(self):
"Connect to a host on a given (SSL) port."
if hasattr(self, "timeout"):
sock = socket.create_connection((self.host, self.port), self.timeout)
else:
sock = socket.create_connection((self.host, self.port))
msg = "wrapping ssl socket; "
if self.ca_certs:
msg += "CA certificate file=%s" % self.ca_certs
else:
msg += "using system provided SSL certs"
boto.log.debug(msg)
self.sock = ssl.wrap_socket(sock, keyfile=self.key_file,
certfile=self.cert_file,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=self.ca_certs)
cert = self.sock.getpeercert()
hostname = self.host.split(':', 0)[0]
if not ValidateCertificateHostname(cert, hostname):
raise InvalidCertificateException(hostname,
cert,
'remote hostname "%s" does not match '
'certificate' % hostname)
def _create_connection(host, port, connect_timeout):
"""
Connect to a 2-tuple (host, port) and return
the socket object.
Args:
2-tuple (host, port) and connection timeout
Returns:
Socket object
"""
if sys.version_info < (2, 6):
(family, _) = (_convert_host_to_ip(host))[0]
connect_socket = socket.socket(family, socket.SOCK_STREAM)
connect_socket.settimeout(connect_timeout)
connect_socket.connect( (host, port) )
else:
connect_socket = socket.create_connection( (host, port), connect_timeout)
return connect_socket
def connect(self):
"Connect to a host on a given (SSL) port."
if hasattr(self, 'source_address'):
sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address)
else:
sock = socket.create_connection((self.host, self.port), self.timeout)
server_hostname = self.host
# Note: self._tunnel_host is not available on py < 2.6 but this code
# isn't used on py < 2.6 (lack of create_connection)
if self._tunnel_host:
self.sock = sock
self._tunnel()
server_hostname = self._tunnel_host
if HAS_SSLCONTEXT:
self.sock = self.context.wrap_socket(sock, server_hostname=server_hostname)
elif HAS_URLLIB3_SNI_SUPPORT:
self.sock = ssl_wrap_socket(sock, keyfile=self.key_file, cert_reqs=ssl.CERT_NONE, certfile=self.cert_file, ssl_version=PROTOCOL,
server_hostname=server_hostname)
else:
self.sock = ssl.wrap_socket(sock, keyfile=self.key_file, certfile=self.cert_file, ssl_version=PROTOCOL)
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection(
(self.host, self.port), getattr(self, 'source_address', None)
)
# Handle the socket if a (proxy) tunnel is present
if hasattr(self, '_tunnel') and getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
# http://bugs.python.org/issue7776: Python>=3.4.1 and >=2.7.7
# change self.host to mean the proxy server host when tunneling is
# being used. Adapt, since we are interested in the destination
# host for the match_hostname() comparison.
actual_host = self._tunnel_host
else:
actual_host = self.host
self.sock = ssl.wrap_socket(
sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_bundle
)
try:
match_hostname(self.sock.getpeercert(), actual_host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection(
(self.host, self.port), getattr(self, 'source_address', None)
)
# Handle the socket if a (proxy) tunnel is present
if hasattr(self, '_tunnel') and getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
# http://bugs.python.org/issue7776: Python>=3.4.1 and >=2.7.7
# change self.host to mean the proxy server host when tunneling is
# being used. Adapt, since we are interested in the destination
# host for the match_hostname() comparison.
actual_host = self._tunnel_host
else:
actual_host = self.host
self.sock = ssl.wrap_socket(
sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_bundle
)
try:
match_hostname(self.sock.getpeercert(), actual_host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def _connect(self):
"Create a TCP socket connection"
# we want to mimic what socket.create_connection does to support
# ipv4/ipv6, but we want to set options prior to calling
# socket.connect()
err = None
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(self.socket_connect_timeout)
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
sock.settimeout(self.socket_timeout)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise socket.error("socket.getaddrinfo returned an empty list")
def connect(self):
"""Connect to the host and port specified in __init__."""
self.sock = socket.create_connection((self.host,self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self._tunnel()
def connect(self):
"Connect to a host on a given (SSL) port."
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file)
def open(self, host = '', port = IMAP4_PORT):
"""Setup connection to remote server on "host:port"
(default: localhost:standard IMAP4 port).
This connection will be used by the routines:
read, readline, send, shutdown.
"""
self.host = host
self.port = port
self.sock = socket.create_connection((host, port))
self.file = self.sock.makefile('rb')
def open(self, host = '', port = IMAP4_SSL_PORT):
"""Setup connection to remote server on "host:port".
(default: localhost:standard IMAP4 SSL port).
This connection will be used by the routines:
read, readline, send, shutdown.
"""
self.host = host
self.port = port
self.sock = socket.create_connection((host, port))
self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
self.file = self.sslobj.makefile('rb')