def _prepare_all_queries(self, host):
if not self._prepared_statements or not self.reprepare_on_up:
return
log.debug("Preparing all known prepared statements against host %s", host)
connection = None
try:
connection = self.connection_factory(host.address)
statements = self._prepared_statements.values()
for keyspace, ks_statements in groupby(statements, lambda s: s.keyspace):
if keyspace is not None:
connection.set_keyspace_blocking(keyspace)
# prepare 10 statements at a time
ks_statements = list(ks_statements)
chunks = []
for i in range(0, len(ks_statements), 10):
chunks.append(ks_statements[i:i + 10])
for ks_chunk in chunks:
messages = [PrepareMessage(query=s.query_string) for s in ks_chunk]
# TODO: make this timeout configurable somehow?
responses = connection.wait_for_responses(*messages, timeout=5.0, fail_on_error=False)
for success, response in responses:
if not success:
log.debug("Got unexpected response when preparing "
"statement on host %s: %r", host, response)
log.debug("Done preparing all known prepared statements against host %s", host)
except OperationTimedOut as timeout:
log.warning("Timed out trying to prepare all statements on host %s: %s", host, timeout)
except (ConnectionException, socket.error) as exc:
log.warning("Error trying to prepare all statements on host %s: %r", host, exc)
except Exception:
log.exception("Error trying to prepare all statements on host %s", host)
finally:
if connection:
connection.close()
python类error()的实例源码
def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET, execution_profile=EXEC_PROFILE_DEFAULT, paging_state=None):
"""
Execute the given query and return a :class:`~.ResponseFuture` object
which callbacks may be attached to for asynchronous response
delivery. You may also call :meth:`~.ResponseFuture.result()`
on the :class:`.ResponseFuture` to synchronously block for results at
any time.
See :meth:`Session.execute` for parameter definitions.
Example usage::
>>> session = cluster.connect()
>>> future = session.execute_async("SELECT * FROM mycf")
>>> def log_results(results):
... for row in results:
... log.info("Results: %s", row)
>>> def log_error(exc):
>>> log.error("Operation failed: %s", exc)
>>> future.add_callbacks(log_results, log_error)
Async execution with blocking wait for results::
>>> future = session.execute_async("SELECT * FROM mycf")
>>> # do other stuff...
>>> try:
... results = future.result()
... except Exception:
... log.exception("Operation failed:")
"""
future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state)
future._protocol_handler = self.client_protocol_handler
self._on_request(future)
future.send_request()
return future
def prepare_on_all_hosts(self, query, excluded_host):
"""
Prepare the given query on all hosts, excluding ``excluded_host``.
Intended for internal use only.
"""
futures = []
for host in tuple(self._pools.keys()):
if host != excluded_host and host.is_up:
future = ResponseFuture(self, PrepareMessage(query=query), None, self.default_timeout)
# we don't care about errors preparing against specific hosts,
# since we can always prepare them as needed when the prepared
# statement is used. Just log errors and continue on.
try:
request_id = future._query(host)
except Exception:
log.exception("Error preparing query for host %s:", host)
continue
if request_id is None:
# the error has already been logged by ResponsFuture
log.debug("Failed to prepare query for host %s: %r",
host, future._errors.get(host))
continue
futures.append((host, future))
for host, future in futures:
try:
future.result()
except Exception:
log.exception("Error preparing query for host %s:", host)
def result(self):
"""
Return the final result or raise an Exception if errors were
encountered. If the final result or error has not been set
yet, this method will block until it is set, or the timeout
set for the request expires.
Timeout is specified in the Session request execution functions.
If the timeout is exceeded, an :exc:`cassandra.OperationTimedOut` will be raised.
This is a client-side timeout. For more information
about server-side coordinator timeouts, see :class:`.policies.RetryPolicy`.
Example usage::
>>> future = session.execute_async("SELECT * FROM mycf")
>>> # do other stuff...
>>> try:
... rows = future.result()
... for row in rows:
... ... # process results
... except Exception:
... log.exception("Operation failed:")
"""
self._event.wait()
if self._final_result is not _NOT_SET:
return ResultSet(self, self._final_result)
else:
raise self._final_exception
def add_callbacks(self, callback, errback,
callback_args=(), callback_kwargs=None,
errback_args=(), errback_kwargs=None):
"""
A convenient combination of :meth:`.add_callback()` and
:meth:`.add_errback()`.
Example usage::
>>> session = cluster.connect()
>>> query = "SELECT * FROM mycf"
>>> future = session.execute_async(query)
>>> def log_results(results, level='debug'):
... for row in results:
... log.log(level, "Result: %s", row)
>>> def log_error(exc, query):
... log.error("Query '%s' failed: %s", query, exc)
>>> future.add_callbacks(
... callback=log_results, callback_kwargs={'level': 'info'},
... errback=log_error, errback_args=(query,))
"""
self.add_callback(callback, *callback_args, **(callback_kwargs or {}))
self.add_errback(errback, *errback_args, **(errback_kwargs or {}))
gevent_chat_server.py 文件源码
项目:Software-Architecture-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def new_chat_channel(conn, address):
""" New chat channel for a given connection """
participants.add(conn)
data = conn.recv(1024)
user = ''
while data:
print("Chat:", data.strip())
for p in participants:
try:
if p is not conn:
data = data.decode('utf-8')
user, msg = data.split(':')
if msg != '<handshake>':
data_s = '\n#[' + user + ']>>> says ' + msg
else:
data_s = '(User %s connected)\n' % user
p.send(bytearray(data_s, 'utf-8'))
except socket.error as e:
# ignore broken pipes, they just mean the participant
# closed its connection already
if e[0] != 32:
raise
data = conn.recv(1024)
participants.remove(conn)
print("Participant %s left chat." % user)
def _m_search_ssdp(local_ip):
"""
Broadcast a UDP SSDP M-SEARCH packet and return response.
"""
search_target = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
ssdp_request = ''.join(
['M-SEARCH * HTTP/1.1\r\n',
'HOST: 239.255.255.250:1900\r\n',
'MAN: "ssdp:discover"\r\n',
'MX: 2\r\n',
'ST: {0}\r\n'.format(search_target),
'\r\n']
)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((local_ip, 10000))
sock.sendto(ssdp_request, ('239.255.255.250', 1900))
if local_ip == "127.0.0.1":
sock.settimeout(1)
else:
sock.settimeout(5)
try:
return sock.recv(2048)
except socket.error, err:
# no reply from IGD, possibly no IGD on LAN
logging.debug("UDP SSDP M-SEARCH send error using ip %s: %s" % (local_ip, err))
return False
def run_application(self):
if self.status and not self.headers_sent:
self.write('')
client = Client(self.client_address, self)
self.server.clients.add(client)
try:
print "[%s] Client connected (%s clients total)" % (
self.environ['REMOTE_ADDR'], len(self.server.clients))
origin = self.environ.get('HTTP_ORIGIN', '')
self.start_response("200 OK", [
('Content-Type', 'text/event-stream'),
('Cache-Control', 'no-cache'),
('Connection', 'keep-alive'),
('Access-Control-Allow-Origin', origin),
])
self.result = client.stream()
self.process_result()
super(EventSourceHandler, self).run_application()
except socket.error as exc:
if exc.errno != 32:
raise
finally:
self.server.clients.remove(client)
def handle(self):
try:
while self.socket is not None:
self.time_start = time.time()
self.time_finish = 0
result = self.handle_one_request()
if result is None:
break
if result is True:
continue
self.status, response_body = result
self.socket.sendall(response_body)
if self.time_finish == 0:
self.time_finish = time.time()
self.log_request()
break
finally:
if self.socket is not None:
try:
# read out request data to prevent error: [Errno 104] Connection reset by peer
try:
self.socket._sock.recv(16384)
finally:
self.socket._sock.close() # do not rely on garbage collection
self.socket.close()
except socket.error:
pass
self.__dict__.pop('socket', None)
self.__dict__.pop('rfile', None)
def _sendall(self, data):
try:
self.socket.sendall(data)
except socket.error, ex:
self.status = 'socket error: %s' % ex
if self.code > 0:
self.code = -self.code
raise
self.response_length += len(data)
def update_environ(self):
address = self.address
if isinstance(address, tuple):
if 'SERVER_NAME' not in self.environ:
try:
name = socket.getfqdn(address[0])
except socket.error:
name = str(address[0])
self.environ['SERVER_NAME'] = name
self.environ.setdefault('SERVER_PORT', str(address[1]))
else:
self.environ.setdefault('SERVER_NAME', '')
self.environ.setdefault('SERVER_PORT', '')
def __init__(self, sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=CERT_NONE,
ssl_version=PROTOCOL_SSLv23, ca_certs=None,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
ciphers=None):
socket.__init__(self, _sock=sock)
if certfile and not keyfile:
keyfile = certfile
# see if it's connected
try:
socket.getpeername(self)
except socket_error, e:
if e[0] != errno.ENOTCONN:
raise
# no, no connection yet
self._sslobj = None
else:
# yes, create the SSL object
if ciphers is None:
self._sslobj = _ssl.sslwrap(self._sock, server_side,
keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
else:
self._sslobj = _ssl.sslwrap(self._sock, server_side,
keyfile, certfile,
cert_reqs, ssl_version, ca_certs,
ciphers)
if do_handshake_on_connect:
self.do_handshake()
self.keyfile = keyfile
self.certfile = certfile
self.cert_reqs = cert_reqs
self.ssl_version = ssl_version
self.ca_certs = ca_certs
self.ciphers = ciphers
self.do_handshake_on_connect = do_handshake_on_connect
self.suppress_ragged_eofs = suppress_ragged_eofs
self._makefile_refs = 0
def _sendall(self, data):
try:
self.socket.sendall(data)
except socket.error, ex:
self.status = 'socket error: %s' % ex
if self.code > 0:
self.code = -self.code
raise
self.response_length += len(data)
def update_environ(self):
address = self.address
if isinstance(address, tuple):
if 'SERVER_NAME' not in self.environ:
try:
name = socket.getfqdn(address[0])
except socket.error:
name = str(address[0])
self.environ['SERVER_NAME'] = name
self.environ.setdefault('SERVER_PORT', str(address[1]))
else:
self.environ.setdefault('SERVER_NAME', '')
self.environ.setdefault('SERVER_PORT', '')
def __init__(self, sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=CERT_NONE,
ssl_version=PROTOCOL_SSLv23, ca_certs=None,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
ciphers=None):
socket.__init__(self, _sock=sock)
if certfile and not keyfile:
keyfile = certfile
# see if it's connected
try:
socket.getpeername(self)
except socket_error, e:
if e[0] != errno.ENOTCONN:
raise
# no, no connection yet
self._sslobj = None
else:
# yes, create the SSL object
if ciphers is None:
self._sslobj = _ssl.sslwrap(self._sock, server_side,
keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
else:
self._sslobj = _ssl.sslwrap(self._sock, server_side,
keyfile, certfile,
cert_reqs, ssl_version, ca_certs,
ciphers)
if do_handshake_on_connect:
self.do_handshake()
self.keyfile = keyfile
self.certfile = certfile
self.cert_reqs = cert_reqs
self.ssl_version = ssl_version
self.ca_certs = ca_certs
self.ciphers = ciphers
self.do_handshake_on_connect = do_handshake_on_connect
self.suppress_ragged_eofs = suppress_ragged_eofs
self._makefile_refs = 0
def handle(self):
try:
while self.socket is not None:
self.time_start = time.time()
self.time_finish = 0
result = self.handle_one_request()
if result is None:
break
if result is True:
continue
self.status, response_body = result
self.socket.sendall(response_body)
if self.time_finish == 0:
self.time_finish = time.time()
self.log_request()
break
finally:
if self.socket is not None:
try:
self.socket._sock.close() # do not rely on garbage collection
self.socket.close()
except socket.error:
pass
self.__dict__.pop('socket', None)
self.__dict__.pop('rfile', None)
self.__dict__.pop('_wfile', None) # XXX remove once wfile property is gone
def update_environ(self):
address = self.address
if isinstance(address, tuple):
if 'SERVER_NAME' not in self.environ:
try:
name = socket.getfqdn(address[0])
except socket.error:
name = str(address[0])
self.environ['SERVER_NAME'] = name
self.environ.setdefault('SERVER_PORT', str(address[1]))
else:
self.environ.setdefault('SERVER_NAME', '')
self.environ.setdefault('SERVER_PORT', '')
def __init__(self, sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=CERT_NONE,
ssl_version=PROTOCOL_SSLv23, ca_certs=None,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
ciphers=None):
socket.__init__(self, _sock=sock)
if certfile and not keyfile:
keyfile = certfile
# see if it's connected
try:
socket.getpeername(self)
except socket_error, e:
if e[0] != errno.ENOTCONN:
raise
# no, no connection yet
self._sslobj = None
else:
# yes, create the SSL object
if ciphers is None:
self._sslobj = _ssl.sslwrap(self._sock, server_side,
keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
else:
self._sslobj = _ssl.sslwrap(self._sock, server_side,
keyfile, certfile,
cert_reqs, ssl_version, ca_certs,
ciphers)
if do_handshake_on_connect:
self.do_handshake()
self.keyfile = keyfile
self.certfile = certfile
self.cert_reqs = cert_reqs
self.ssl_version = ssl_version
self.ca_certs = ca_certs
self.ciphers = ciphers
self.do_handshake_on_connect = do_handshake_on_connect
self.suppress_ragged_eofs = suppress_ragged_eofs
self._makefile_refs = 0
def read(self, len=1024):
"""Read up to LEN bytes and return them.
Return zero-length string on EOF."""
while True:
try:
return self._sslobj.read(len)
except SSLError, ex:
if ex.args[0] == SSL_ERROR_EOF and self.suppress_ragged_eofs:
return ''
elif ex.args[0] == SSL_ERROR_WANT_READ:
if self.timeout == 0.0:
raise
sys.exc_clear()
try:
wait_read(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorReadTimeout, event=self._read_event)
except socket_error, ex:
if ex[0] == EBADF:
return ''
raise
elif ex.args[0] == SSL_ERROR_WANT_WRITE:
if self.timeout == 0.0:
raise
sys.exc_clear()
try:
# note: using _SSLErrorReadTimeout rather than _SSLErrorWriteTimeout below is intentional
wait_write(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorReadTimeout, event=self._write_event)
except socket_error, ex:
if ex[0] == EBADF:
return ''
raise
else:
raise
def write(self, data):
"""Write DATA to the underlying SSL channel. Returns
number of bytes of DATA actually transmitted."""
while True:
try:
return self._sslobj.write(data)
except SSLError, ex:
if ex.args[0] == SSL_ERROR_WANT_READ:
if self.timeout == 0.0:
raise
sys.exc_clear()
try:
wait_read(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorWriteTimeout, event=self._read_event)
except socket_error, ex:
if ex[0] == EBADF:
return 0
raise
elif ex.args[0] == SSL_ERROR_WANT_WRITE:
if self.timeout == 0.0:
raise
sys.exc_clear()
try:
wait_write(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorWriteTimeout, event=self._write_event)
except socket_error, ex:
if ex[0] == EBADF:
return 0
raise
else:
raise