def handle_read(self):
while True:
try:
buf = self._socket.recv(self.in_buffer_size)
self._iobuf.write(buf)
except socket.error as err:
log.debug("Exception in read for %s: %s", self, err)
self.defunct(err)
return # leave the read loop
if buf and self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.close()
return
python类error()的实例源码
def _reconnect(self):
log.debug("[control connection] Attempting to reconnect")
try:
self._set_new_connection(self._reconnect_internal())
except NoHostAvailable:
# make a retry schedule (which includes backoff)
schedule = self._cluster.reconnection_policy.new_schedule()
with self._reconnection_lock:
# cancel existing reconnection attempts
if self._reconnection_handler:
self._reconnection_handler.cancel()
# when a connection is successfully made, _set_new_connection
# will be called with the new connection and then our
# _reconnection_handler will be cleared out
self._reconnection_handler = _ControlReconnectionHandler(
self, self._cluster.scheduler, schedule,
self._get_and_set_reconnection_handler,
new_handler=None)
self._reconnection_handler.start()
except Exception:
log.debug("[control connection] error reconnecting", exc_info=True)
raise
def add_errback(self, fn, *args, **kwargs):
"""
Like :meth:`.add_callback()`, but handles error cases.
An Exception instance will be passed as the first positional argument
to `fn`.
"""
run_now = False
with self._callback_lock:
# Always add fn to self._errbacks, even when we're about to execute
# it, to prevent races with functions like start_fetching_next_page
# that reset _final_exception
self._errbacks.append((fn, args, kwargs))
if self._final_exception:
run_now = True
if run_now:
fn(self._final_exception, *args, **kwargs)
return self
def connect(self) -> bool:
""" Establish a long running connection to EPMD, will not return until
the connection has been established.
:return: True
"""
while True:
try:
print("EPMD: Connecting %s:%d" % (self.host_, self.port_))
host_port = (self.host_, self.port_)
self.sock_ = socket.create_connection(address=host_port,
timeout=5.0)
break # the connect loop
except socket.error as err:
print("EPMD: connection error:", err)
gevent.sleep(5)
print("EPMD: Socket connected")
return True
def _read_alive2_reply(self) -> int:
""" Read reply from ALIVE2 request, check the result code, read creation
:return: Creation value if all is well, connection remains on.
On error returns -1
"""
# Reply will be [121,0,Creation:16] for OK, otherwise [121,Error]
reply = self.sock_.recv(2)
if not reply:
print("EPMD: ALIVE2 Read error. Closed?", reply)
return -1
if reply[1] == 0:
cr = self.sock_.recv(2)
(creation,) = struct.unpack(">H", cr)
return creation
print("EPMD: ALIVE2 returned error", reply[1])
return -1
def _parse_for_errors(soap_response):
if soap_response.status == 500:
response_data = soap_response.read()
try:
err_dom = parseString(response_data)
err_code = _node_val(err_dom.getElementsByTagName('errorCode')[0])
err_msg = _node_val(
err_dom.getElementsByTagName('errorDescription')[0]
)
except Exception, err:
logging.error("Unable to parse SOAP error: {0}, response: {1}".format(err, response_data))
return False
logging.error('SOAP request error: {0} - {1}'.format(err_code, err_msg))
raise Exception(
'SOAP request error: {0} - {1}'.format(err_code, err_msg)
)
return False
else:
return True
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self.context._wrap_socket(self._sock, False, self.server_hostname, ssl_sock=self)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
self._connected = True
if self.do_handshake_on_connect:
self.do_handshake()
return rc
except socket_error:
self._sslobj = None
raise
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self.context._wrap_socket(self._sock, False, self.server_hostname)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
if self.do_handshake_on_connect:
self.do_handshake()
self._connected = True
return rc
except socket_error:
self._sslobj = None
raise
def update_environ(self):
"""
Called before the first request is handled to fill in WSGI environment values.
This includes getting the correct server name and port.
"""
address = self.address
if isinstance(address, tuple):
if 'SERVER_NAME' not in self.environ:
try:
name = socket.getfqdn(address[0])
except socket.error:
name = str(address[0])
if PY3 and not isinstance(name, str):
name = name.decode('ascii')
self.environ['SERVER_NAME'] = name
self.environ.setdefault('SERVER_PORT', str(address[1]))
else:
self.environ.setdefault('SERVER_NAME', '')
self.environ.setdefault('SERVER_PORT', '')
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self._context._wrap_socket(self._sock, False, self.server_hostname, ssl_sock=self)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
self._connected = True
if self.do_handshake_on_connect:
self.do_handshake()
return rc
except socket_error:
self._sslobj = None
raise
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self._context._wrap_socket(self._sock, False, self.server_hostname)
if self._session is not None: # 3.6
self._sslobj = SSLObject(self._sslobj, owner=self, session=self._session)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
if self.do_handshake_on_connect:
self.do_handshake()
self._connected = True
return rc
except socket_error:
self._sslobj = None
raise
def update_environ(self):
"""
Called before the first request is handled to fill in WSGI environment values.
This includes getting the correct server name and port.
"""
address = self.address
if isinstance(address, tuple):
if 'SERVER_NAME' not in self.environ:
try:
name = socket.getfqdn(address[0])
except socket.error:
name = str(address[0])
if PY3 and not isinstance(name, str):
name = name.decode('ascii') # python 2 pylint:disable=redefined-variable-type
self.environ['SERVER_NAME'] = name
self.environ.setdefault('SERVER_PORT', str(address[1]))
else:
self.environ.setdefault('SERVER_NAME', '')
self.environ.setdefault('SERVER_PORT', '')
def handle_slave_send(socket, address, req):
message = req['data']
message_id = message.get('message_id', '?')
message['to_slave'] = True
try:
runtime = send_funcs['message_send_enqueue'](message)
response = 'OK'
access_logger.info('Message (ID %s) from master %s queued successfully', message_id, address)
except Exception:
response = 'FAIL'
logger.exception('Queueing message (ID %s) from master %s failed.')
access_logger.error('Failed queueing message (ID %s) from master %s: %s', message_id, address, runtime)
metrics.incr('slave_message_send_fail_cnt')
socket.sendall(msgpack.packb(response))
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self.context._wrap_socket(self._sock, False, self.server_hostname, ssl_sock=self)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
self._connected = True
if self.do_handshake_on_connect:
self.do_handshake()
return rc
except socket_error:
self._sslobj = None
raise
def get_jvm_option(self, java_bin):
cmd = '%s -version 2>&1 | grep \' version \' | awk -F"[\\"_]" \'{print $2}\'' %java_bin
popen = subprocess.Popen(cmd, shell=True, close_fds=True, stdout=subprocess.PIPE)
excute_data = popen.stdout.readlines()
try:
version = excute_data[0].strip('\n')
major, minor, security = [int(x) for x in version.split('.')]
if major > 1 or minor >= 8:
self.logger.info("using jdk version: {0}, "
"set JVM option with MetaSpace parameter".format(excute_data))
return JSTATUS['new_jvm_option']
else:
self.logger.info("using jdk version: {0}, "
"set JVM option with PermSpace parameter".format(excute_data))
return JSTATUS['old_jvm_option']
except Exception as msg:
self.logger.error("failed to decide the java version from excute_data: {0}, "
"exception msg: {1}".format(excute_data, msg))
self.logger.error(traceback.format_exc())
raise JvmCollectorExcept('receive java version error')
def start_jstatus_process(self, java_bin):
self.jstatus_jvm_option = self.get_jvm_option(java_bin)
cmd = '%s %s -jar %s -D %s -L %s -l %s -P %s start' % (
java_bin,
self.jstatus_jvm_option,
self.jstatus_path,
self.jstatus_data,
self.jstatus_log_level,
self.jstatus_log,
self.jstatus_port
)
cmd = 'nohup %s &' % cmd
popen = subprocess.Popen(cmd, shell=True, close_fds=True)
popen.communicate()
code = popen.returncode
if not code:
self.logger.info('start jstatus success')
else:
raise JvmCollectorExcept('start jstatus error')
gevent.sleep(10)
def _parse_data(self, rvalue):
try:
rvalue = json.loads(rvalue)
except ValueError, e:
raise JvmCollectorExcept('return value should be json, bug found %s' % rvalue)
# ???????????
if rvalue.get('code'):
if "Can't match any instances with instance id" in rvalue.get('msg') and not self.already_reload:
self.already_reload = True
# ????????
self.reload_instance(self.inst_id)
return 1, {}
else:
raise JvmCollectorExcept('jstatus return error, %s' % rvalue.get('msg'))
return 0, self._parse_dimension_value(rvalue.get('data', {}))
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self._context._wrap_socket(self._sock, False, self.server_hostname, ssl_sock=self)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
self._connected = True
if self.do_handshake_on_connect:
self.do_handshake()
return rc
except socket_error:
self._sslobj = None
raise
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self._context._wrap_socket(self._sock, False, self.server_hostname)
if self._session is not None: # 3.6
self._sslobj = SSLObject(self._sslobj, owner=self, session=self._session)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
if self.do_handshake_on_connect:
self.do_handshake()
self._connected = True
return rc
except socket_error:
self._sslobj = None
raise
def update_environ(self):
"""
Called before the first request is handled to fill in WSGI environment values.
This includes getting the correct server name and port.
"""
address = self.address
if isinstance(address, tuple):
if 'SERVER_NAME' not in self.environ:
try:
name = socket.getfqdn(address[0])
except socket.error:
name = str(address[0])
if PY3 and not isinstance(name, str):
name = name.decode('ascii') # python 2 pylint:disable=redefined-variable-type
self.environ['SERVER_NAME'] = name
self.environ.setdefault('SERVER_PORT', str(address[1]))
else:
self.environ.setdefault('SERVER_NAME', '')
self.environ.setdefault('SERVER_PORT', '')
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self.context._wrap_socket(self._sock, False, self.server_hostname, ssl_sock=self)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
self._connected = True
if self.do_handshake_on_connect:
self.do_handshake()
return rc
except socket_error:
self._sslobj = None
raise
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._connected:
raise ValueError("attempt to connect already-connected SSLSocket!")
self._sslobj = self.context._wrap_socket(self._sock, False, self.server_hostname)
try:
if connect_ex:
rc = socket.connect_ex(self, addr)
else:
rc = None
socket.connect(self, addr)
if not rc:
if self.do_handshake_on_connect:
self.do_handshake()
self._connected = True
return rc
except socket_error:
self._sslobj = None
raise
def update_environ(self):
"""
Called before the first request is handled to fill in WSGI environment values.
This includes getting the correct server name and port.
"""
address = self.address
if isinstance(address, tuple):
if 'SERVER_NAME' not in self.environ:
try:
name = socket.getfqdn(address[0])
except socket.error:
name = str(address[0])
if PY3 and not isinstance(name, str):
name = name.decode('ascii')
self.environ['SERVER_NAME'] = name
self.environ.setdefault('SERVER_PORT', str(address[1]))
else:
self.environ.setdefault('SERVER_NAME', '')
self.environ.setdefault('SERVER_PORT', '')
def handle_read(self):
while True:
try:
buf = self._socket.recv(self.in_buffer_size)
self._iobuf.write(buf)
except socket.error as err:
log.debug("Exception in read for %s: %s", self, err)
self.defunct(err)
return # leave the read loop
if buf and self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.close()
return
def _reconnect(self):
log.debug("[control connection] Attempting to reconnect")
try:
self._set_new_connection(self._reconnect_internal())
except NoHostAvailable:
# make a retry schedule (which includes backoff)
schedule = self._cluster.reconnection_policy.new_schedule()
with self._reconnection_lock:
# cancel existing reconnection attempts
if self._reconnection_handler:
self._reconnection_handler.cancel()
# when a connection is successfully made, _set_new_connection
# will be called with the new connection and then our
# _reconnection_handler will be cleared out
self._reconnection_handler = _ControlReconnectionHandler(
self, self._cluster.scheduler, schedule,
self._get_and_set_reconnection_handler,
new_handler=None)
self._reconnection_handler.start()
except Exception:
log.debug("[control connection] error reconnecting", exc_info=True)
raise
def add_errback(self, fn, *args, **kwargs):
"""
Like :meth:`.add_callback()`, but handles error cases.
An Exception instance will be passed as the first positional argument
to `fn`.
"""
run_now = False
with self._callback_lock:
# Always add fn to self._errbacks, even when we're about to execute
# it, to prevent races with functions like start_fetching_next_page
# that reset _final_exception
self._errbacks.append((fn, args, kwargs))
if self._final_exception:
run_now = True
if run_now:
fn(self._final_exception, *args, **kwargs)
return self
def can_read(self, timeout=0.0):
"""
Checks if there is data that can be read from the
socket (if open). Returns True if there is data and
False if not.
It returns None if something very bad happens such as
a dead connection (bad file descriptor), etc
"""
# rs = Read Sockets
# ws = Write Sockets
# es = Error Sockets
if self.socket is not None:
try:
rs, _, es = select([self.socket], [], [], timeout)
except (SelectError, socket.error), e:
if e[0] == errno.EBADF:
# Bad File Descriptor... hmm
self.close()
return None
if len(es) > 0:
# Bad File Descriptor
self.close()
return None
return len(rs) > 0
# no socket or no connection
return None
def can_write(self, timeout=0):
"""
Checks if there is data that can be written to the
socket (if open). Returns True if writing is possible and
False if not.
It returns None if something very bad happens such as
a dead connection (bad file descriptor), etc
"""
# rs = Read Sockets
# ws = Write Sockets
# es = Error Sockets
if self.socket is not None:
try:
_, ws, es = select([], [self.socket], [], timeout)
except (SelectError, socket.error), e:
if e[0] == errno.EBADF:
# Bad File Descriptor... hmm
self.close()
return None
if len(es) > 0:
# Bad File Descriptor
self.close()
return None
return len(ws) > 0
# no socket or no connection
return None
def handle_write(self):
while True:
try:
next_msg = self._write_queue.get()
self._socket.sendall(next_msg)
except socket.error as err:
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
def _on_up_future_completed(self, host, futures, results, lock, finished_future):
with lock:
futures.discard(finished_future)
try:
results.append(finished_future.result())
except Exception as exc:
results.append(exc)
if futures:
return
try:
# all futures have completed at this point
for exc in [f for f in results if isinstance(f, Exception)]:
log.error("Unexpected failure while marking node %s up:", host, exc_info=exc)
self._cleanup_failed_on_up_handling(host)
return
if not all(results):
log.debug("Connection pool could not be created, not marking node %s up", host)
self._cleanup_failed_on_up_handling(host)
return
log.info("Connection pools established for node %s", host)
# mark the host as up and notify all listeners
host.set_up()
for listener in self.listeners:
listener.on_up(host)
finally:
with host.lock:
host._currently_handling_node_up = False
# see if there are any pools to add or remove now that the host is marked up
for session in self.sessions:
session.update_created_pools()