def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
If *dns* is true, also patch dns functions in :mod:`socket`.
"""
from gevent import socket
_socket = __import__('socket')
_socket.socket = socket.socket
_socket.SocketType = socket.SocketType
_socket.create_connection = socket.create_connection
if hasattr(socket, 'socketpair'):
_socket.socketpair = socket.socketpair
if hasattr(socket, 'fromfd'):
_socket.fromfd = socket.fromfd
try:
from gevent.socket import ssl, sslerror
_socket.ssl = ssl
_socket.sslerror = sslerror
except ImportError:
if aggressive:
try:
del _socket.ssl
except AttributeError:
pass
if dns:
patch_dns()
python类fromfd()的实例源码
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
If *dns* is true, also patch dns functions in :mod:`socket`.
"""
from gevent import socket
_socket = __import__('socket')
_socket.socket = socket.socket
_socket.SocketType = socket.SocketType
_socket.create_connection = socket.create_connection
if hasattr(socket, 'socketpair'):
_socket.socketpair = socket.socketpair
if hasattr(socket, 'fromfd'):
_socket.fromfd = socket.fromfd
try:
from gevent.socket import ssl, sslerror
_socket.ssl = ssl
_socket.sslerror = sslerror
except ImportError:
if aggressive:
try:
del _socket.ssl
except AttributeError:
pass
if dns:
patch_dns()
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
If *dns* is true, also patch dns functions in :mod:`socket`.
"""
from gevent import socket
_socket = __import__('socket')
_socket.socket = socket.socket
_socket.SocketType = socket.SocketType
_socket.create_connection = socket.create_connection
if hasattr(socket, 'socketpair'):
_socket.socketpair = socket.socketpair
if hasattr(socket, 'fromfd'):
_socket.fromfd = socket.fromfd
try:
from gevent.socket import ssl, sslerror
_socket.ssl = ssl
_socket.sslerror = sslerror
except ImportError:
if aggressive:
try:
del _socket.ssl
except AttributeError:
pass
if dns:
patch_dns()
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
If *dns* is true, also patch dns functions in :mod:`socket`.
"""
from gevent import socket
_socket = __import__('socket')
_socket.socket = socket.socket
_socket.SocketType = socket.SocketType
_socket.create_connection = socket.create_connection
if hasattr(socket, 'socketpair'):
_socket.socketpair = socket.socketpair
if hasattr(socket, 'fromfd'):
_socket.fromfd = socket.fromfd
try:
from gevent.socket import ssl, sslerror
_socket.ssl = ssl
_socket.sslerror = sslerror
except ImportError:
if aggressive:
try:
del _socket.ssl
except AttributeError:
pass
if dns:
patch_dns()
def get_raw_socket(cls, env): #pragma: no cover
sock = None
if env.get('uwsgi.version'):
try:
import uwsgi
fd = uwsgi.connection_fd()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
except Exception as e:
pass
elif env.get('gunicorn.socket'):
sock = env['gunicorn.socket']
if not sock:
# attempt to find socket from wsgi.input
input_ = env.get('wsgi.input')
if input_:
if hasattr(input_, '_sock'):
raw = input_._sock
sock = socket.socket(_sock=raw)
elif hasattr(input_, 'raw'):
sock = input_.raw._sock
elif hasattr(input_, 'rfile'):
# PY3
if hasattr(input_.rfile, 'raw'):
sock = input_.rfile.raw._sock
# PY2
else:
sock = input_.rfile._sock
return sock
# ============================================================================