def connect_with(protocol_class, host_port: tuple,
args: list, kwargs: dict):
""" Helper which creates a new connection and feeds the data stream into
a protocol handler class.
:rtype: tuple(protocol_class, gevent.socket)
:type protocol_class: class
:param protocol_class: A handler class which has handler functions like
on_connected, consume, and on_connection_lost
:param kwargs: Keyword args to pass to the handler class constructor
:param args: Args to pass to the handler class constructor
:param host_port: (host,port) tuple where to connect
"""
sock = socket.create_connection(address=host_port)
handler = protocol_class(*args, **kwargs)
handler.on_connected(sock, host_port)
print("Connection to %s established" % str(host_port))
try:
g = gevent.spawn(_handle_socket_read, handler, sock)
g.start()
except Exception as e:
print("\nException: %s" % e)
traceback.print_exc()
print()
return handler, sock
python类socket()的实例源码
subscription_manager.py 文件源码
项目:graphql-python-subscriptions
作者: hballard
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self, host='localhost', port=6379, *args, **kwargs):
redis.connection.socket = gevent.socket
self.redis = redis.StrictRedis(host, port, *args, **kwargs)
self.pubsub = self.redis.pubsub()
self.subscriptions = {}
self.sub_id_counter = 0
self.greenlet = None
def test_getaddrinfo_mp(self):
"""This test would make gevent's hub threadpool kill upon hub
destruction in child block forever. Gipc resolves this by killing
threadpool even harder.
"""
import gevent.socket as socket
socket.getaddrinfo("localhost", 21)
p = start_process(target=complchild_test_getaddrinfo_mp)
p.join(timeout=1)
assert p.exitcode == 0
def test_wsgi_scenario(self):
from gevent.wsgi import WSGIServer
def serve(http_server):
http_server.serve_forever()
def hello_world(environ, start_response):
# Generate response in child process.
with pipe() as (reader, writer):
start_response('200 OK', [('Content-Type', 'text/html')])
rg = start_process(
target=complchild_test_wsgi_scenario_respgen,
args=(writer, ))
response = reader.get()
rg.join()
assert rg.exitcode == 0
return [response]
http_server = WSGIServer(('localhost', 0), hello_world)
servelet = gevent.spawn(serve, http_server)
# Wait for server being bound to socket.
while True:
if http_server.address[1] != 0:
break
gevent.sleep(0.05)
client = start_process(
target=complchild_test_wsgi_scenario_client,
args=(http_server.address, ))
client.join()
assert client.exitcode == 0
servelet.kill()
servelet.get() # get() is join and re-raises Exception.
def get_environ(self):
env = super(PyWSGIHandler, self).get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env
def socket(self, *args, **kwargs):
return utils.create_tcp_socket(socket)
def create_connection(self, *args, **kwargs):
return utils.create_tcp_connection(socket, *args, **kwargs)
def create_socket_pair(self):
return utils.create_socket_pair(socket)
def close(self):
if self.closed:
sys.exit('Multiple exit signals received - aborting.')
else:
log('Closing listener socket')
StreamServer.close(self)
def get_environ(self):
env = super(PyWSGIHandler, self).get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env
def get_environ(self):
env = super(PyWSGIHandler, self).get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env
def get_environ(self):
env = super(PyWSGIHandler, self).get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env
def get_environ(self):
env = super(PyWSGIHandler, self).get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env
def handle_events(self):
'''
Gets and Dispatches events in an endless loop using gevent spawn.
'''
self.trace("handle_events started")
while True:
# Gets event and dispatches to handler.
try:
self.get_event()
gevent.sleep(0)
if not self.connected:
self.trace("Not connected !")
break
except LimitExceededError:
break
except ConnectError:
break
except socket.error, se:
break
except GreenletExit, e:
break
except Exception, ex:
self.trace("handle_events error => %s" % str(ex))
self.trace("handle_events stopped now")
try:
self.trace("handle_events socket.close")
self.transport.sockfd.close()
self.trace("handle_events socket.close success")
except Exception, e:
self.trace("handle_eventssocket.close ERROR: %s" % e)
self.connected = False
# prevent any pending request to be stuck
self._flush_commands()
return
def disconnect(self):
'''
Disconnect and release socket and finally kill event handler.
'''
self.connected = False
self.trace("releasing ...")
try:
# avoid handler stuck
self._g_handler.get(block=True, timeout=2.0)
except:
self.trace("releasing forced")
self._g_handler.kill()
self.trace("releasing done")
# prevent any pending request to be stuck
self._flush_commands()
def get_environ(self):
env = super(PyWSGIHandler, self).get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env
def main():
with contextlib.closing(socket.socket()) as s, \
contextlib.closing(s.makefile('wb',0)) as writer, \
contextlib.closing(s.makefile('rb', 0)) as reader:
# This will actually return a random testnet node
their_ip = socket.gethostbyname("testnet-seed.bitcoin.schildbach.de")
print("Connecting to:", their_ip)
my_ip = "127.0.0.1"
s.connect( (their_ip,PORT) )
stream = msg_stream(reader)
# Send Version packet
send(writer, version_pkt(my_ip, their_ip))
# Receive their Version
their_ver = next(stream)
print('Received:', their_ver)
# Send Version acknolwedgement (Verack)
send(writer, msg_verack())
# Fork off a handler, but keep a tee of the stream
stream = tee_and_handle(writer, stream)
# Get Verack
their_verack = next(stream)
# Send a ping!
try:
while True:
send(writer, msg_ping())
send(writer, msg_getaddr())
gevent.sleep(5)
except KeyboardInterrupt: pass
def _connect( self ):
try:
self._socket = gevent.ssl.wrap_socket( gevent.socket.socket( gevent.socket.AF_INET,
gevent.socket.SOCK_STREAM ),
cert_reqs = gevent.ssl.CERT_NONE )
self._socket.connect( ( self._destServer, self._destPort ) )
self._log( "Connected" )
headers = rSequence()
headers.addSequence( Symbols.base.HCP_IDENT, AgentId( ( self._oid, self._iid, self._sid, self._plat, self._arch ) ).toJson() )
headers.addStringA( Symbols.base.HOST_NAME, hashlib.md5( str( self._sid ) ).hexdigest() )
headers.addIpv4( Symbols.base.IP_ADDRESS, "%d.%d.%d.%d" % ( random.randint( 0, 254 ),
random.randint( 0, 254 ),
random.randint( 0, 254 ),
random.randint( 0, 254 ) ) )
if self._enrollmentToken is not None:
headers.addBuffer( Symbols.hcp.ENROLLMENT_TOKEN, self._enrollmentToken )
self._sendFrame( HcpModuleId.HCP, [ headers ], timeout = 30, isNotHbs = True )
self._log( "Handshake sent" )
self._threads.add( gevent.spawn( self._recvThread ) )
self._threads.add( gevent.spawn_later( 1, self._syncHcpThread ) )
self._threads.add( gevent.spawn_later( 10, self._syncHbsThread ) )
self._threads.add( gevent.spawn_later( 2, lambda: self._connectedEvent.set() ) )
return True
except:
self._log( "Failed to connect over TLS: %s" % traceback.format_exc() )
return False
def handle( self, source, address ):
global currentEndpoints
try:
if 0 == len( currentEndpoints ): return
print( "Connection from %s" % str( address ) )
try:
source.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
source.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 5 )
source.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10 )
source.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 2 )
except:
print( "Failed to set keepalive on source connection" )
try:
dest = create_connection( random.sample( currentEndpoints, 1 )[ 0 ] )
except:
print( "Failed to connect to EndpointProcessor" )
else:
try:
try:
dest.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
dest.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 5 )
dest.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10 )
dest.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 2 )
except:
print( "Failed to set keepalive on dest connection" )
# Send a small connection header that contains the original
# source of the connection.
connectionHeaders = msgpack.packb( address )
dest.sendall( struct.pack( '!I', len( connectionHeaders ) ) )
dest.sendall( connectionHeaders )
gevent.joinall( ( gevent.spawn( forward, source, dest, address, self ),
gevent.spawn( forward, dest, source, address, self ) ) )
finally:
dest.close()
finally:
source.close()
def get_environ(self):
env = super(PyWSGIHandler, self).get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env