def get(self, url, timeout=None):
"""Request an URL.
Args:
url (str): The web location we want to retrieve.
timeout (Optional[int|float]): If this value is specified, use it as the read timeout
from the server (instead of the one specified during creation of the connection
pool).
Returns:
A JSON object.
"""
urlopen_kwargs = {}
if timeout is not None:
urlopen_kwargs['timeout'] = Timeout(read=timeout, connect=self._connect_timeout)
result = self._request_wrapper('GET', url, **urlopen_kwargs)
return self._parse(result)
python类Timeout()的实例源码
def __connect(self):
num_pools = float(self.pool_size_total) / self.pool_size_per_route
headers = {
'Content-Type': 'application/x-protobuf',
'Accept': 'application/x-protobuf',
'User-Agent': 'python-pilosa/' + VERSION,
}
timeout = urllib3.Timeout(connect=self.connect_timeout, read=self.socket_timeout)
client_options = {
"num_pools": num_pools,
"maxsize": self.pool_size_per_route,
"block": True,
"headers": headers,
"timeout": timeout,
"retries": self.retry_count,
}
if not self.tls_skip_verify:
client_options["cert_reqs"] = "CERT_REQUIRED"
client_options["ca_certs"] = self.tls_ca_certificate_path
client = urllib3.PoolManager(**client_options)
self.__client = client
def __http_pool(self):
"""
Create HTTP connection pool
:raise HttpRequestError
:return: urllib3.HTTPConnectionPool
"""
try:
pool = HTTPConnectionPool(self.__cfg.host,
port=self.__cfg.port,
maxsize=self.__cfg.threads,
timeout=Timeout(self.__cfg.timeout, read=self.__cfg.timeout),
block=True)
if self._HTTP_DBG_LEVEL <= self.__debug.level:
self.__debug.debug_connection_pool('http_pool_start', pool)
return pool
except Exception as error:
raise HttpRequestError(str(error))
def __https_pool(self):
"""
Create HTTP connection pool
:raise HttpsRequestError
:return: urllib3.HTTPConnectionPool
"""
try:
pool = HTTPSConnectionPool(
host=self.__cfg.host,
port=self.__cfg.port,
maxsize=self.__cfg.threads,
timeout=Timeout(self.__cfg.timeout, read=self.__cfg.timeout),
block=True)
if self._HTTP_DBG_LEVEL <= self.__debug.level:
self.__debug.debug_connection_pool('https_pool_start', pool)
return pool
except Exception as error:
raise HttpsRequestError(str(error))
def post(self, url, data, timeout=None):
"""Request an URL.
Args:
url (str): The web location we want to retrieve.
data (dict[str, str]): A dict of key/value pairs. Note: On py2.7 value is unicode.
timeout (Optional[int|float]): If this value is specified, use it as the read timeout
from the server (instead of the one specified during creation of the connection
pool).
Returns:
A JSON object.
"""
urlopen_kwargs = {}
if timeout is not None:
urlopen_kwargs['timeout'] = Timeout(read=timeout, connect=self._connect_timeout)
if InputFile.is_inputfile(data):
data = InputFile(data)
result = self._request_wrapper(
'POST', url, body=data.to_form(), headers=data.headers, **urlopen_kwargs)
else:
data = json.dumps(data)
result = self._request_wrapper(
'POST',
url,
body=data.encode(),
headers={'Content-Type': 'application/json'},
**urlopen_kwargs)
return self._parse(result)
def retrieve(self, url, timeout=None):
"""Retrieve the contents of a file by its URL.
Args:
url (str): The web location we want to retrieve.
timeout (Optional[int|float]): If this value is specified, use it as the read timeout
from the server (instead of the one specified during creation of the connection
pool).
"""
urlopen_kwargs = {}
if timeout is not None:
urlopen_kwargs['timeout'] = Timeout(read=timeout, connect=self._connect_timeout)
return self._request_wrapper('GET', url, **urlopen_kwargs)
def __proxy_pool(self):
"""
Create Proxy connection pool
:raise ProxyRequestError
:return: urllib3.HTTPConnectionPool
"""
try:
self.__server = self.__cfg.proxy if True is self.__cfg.is_standalone_proxy else self.__get_random_proxy()
if self.__get_proxy_type(self.__server) == 'socks':
disable_warnings(InsecureRequestWarning)
if not hasattr(self, '__pm'):
package_module = importlib.import_module('urllib3.contrib.socks')
self.__pm = getattr(package_module, 'SOCKSProxyManager')
pool = self.__pm(self.__server,
num_pools=self.__cfg.threads,
timeout=Timeout(self.__cfg.timeout,
read=self.__cfg.timeout),
block=True)
else:
pool = ProxyManager(self.__server,
num_pools=self.__cfg.threads,
timeout=Timeout(self.__cfg.timeout, read=self.__cfg.timeout),
block=True)
return pool
except (DependencyWarning, ProxySchemeUnknown, ImportError) as error:
raise ProxyRequestError(error)
def _conn_request(self, conn, request_uri, method, body, headers):
# Reconstruct the full uri from the connection object.
if isinstance(conn, httplib2.HTTPSConnectionWithTimeout):
scheme = 'https'
else:
scheme = 'http'
host = conn.host
# Reformat IPv6 hosts.
if _is_ipv6(host):
host = '[{}]'.format(host)
full_uri = '{}://{}:{}{}'.format(
scheme, host, conn.port, request_uri)
decode = True if method != 'HEAD' else False
try:
urllib3_response = self.pool.request(
method,
full_uri,
body=body,
headers=headers,
redirect=False,
retries=urllib3.Retry(total=False, redirect=0),
timeout=urllib3.Timeout(total=self.timeout),
decode_content=decode)
response = _map_response(urllib3_response, decode=decode)
content = urllib3_response.data
except Exception as e:
raise _map_exception(e)
return response, content
def __init__(self,
etcd_addrs,
key_to_poll,
etcd_scheme="http",
etcd_key=None,
etcd_cert=None,
etcd_ca=None,
poll_timeout=10,
connect_timeout=5):
super(EtcdWatcher, self).__init__(etcd_addrs,
etcd_scheme=etcd_scheme,
etcd_key=etcd_key,
etcd_cert=etcd_cert,
etcd_ca=etcd_ca)
self.etcd_timeout = Timeout(connect=connect_timeout,
read=poll_timeout)
self.key_to_poll = key_to_poll
self.next_etcd_index = None
# Forces a resync after the current poll if set. Safe to set from
# another thread. Automatically reset to False after the resync is
# triggered.
self.resync_after_current_poll = False
# Tells the watcher to stop after this poll. One-way flag.
self._stopped = False
self.dispatcher = PathDispatcher()
def delete_empty_parents(client, child_dir, root_key, timeout=DEFAULT_TIMEOUT):
"""
Attempts to delete child_dir and any empty parent directories.
Makes a best effort. If any of the deletes fail, gives up. This
method is safe, even if another client is writing to the directory (the
delete will fail if the directory becomes non-empty).
:param client: EtcdClient
:param child_dir: Key to delete, along with its parents, should be a
directory.
:param root_key: Prefix of child_key to stop at. Will not be deleted.
:param timeout: Timeout to use on the etcd delete operation.
"""
_log.debug("Deleting empty directories from %s down to %s", child_dir,
root_key)
path_segments = child_dir.strip("/").split("/")
root_path_segments = root_key.strip("/").split("/")
if path_segments[:len(root_path_segments)] != root_path_segments:
raise ValueError("child_key %r must start with root key %r" %
(child_dir, root_key))
for num_seg_to_strip in xrange(len(path_segments) -
len(root_path_segments)):
key_segments = path_segments[:len(path_segments) - num_seg_to_strip]
key_to_delete = "/".join(key_segments)
try:
client.delete(key_to_delete, dir=True, timeout=timeout)
except etcd.EtcdKeyNotFound:
_log.debug("Key %s already deleted", key_to_delete)
continue
except etcd.EtcdDirNotEmpty:
_log.debug("Directory %s not empty, giving up", key_to_delete)
break
except etcd.EtcdException as e:
_log.warning("Failed to delete %s (%r), skipping.",
key_to_delete, e)
break
def __init__(self,
etcd_addrs,
key_to_poll,
etcd_scheme="http",
etcd_key=None,
etcd_cert=None,
etcd_ca=None,
poll_timeout=10,
connect_timeout=5):
super(EtcdWatcher, self).__init__(etcd_addrs,
etcd_scheme=etcd_scheme,
etcd_key=etcd_key,
etcd_cert=etcd_cert,
etcd_ca=etcd_ca)
self.etcd_timeout = Timeout(connect=connect_timeout,
read=poll_timeout)
self.key_to_poll = key_to_poll
self.next_etcd_index = None
# Forces a resync after the current poll if set. Safe to set from
# another thread. Automatically reset to False after the resync is
# triggered.
self.resync_after_current_poll = False
# Tells the watcher to stop after this poll. One-way flag.
self._stopped = False
self.dispatcher = PathDispatcher()
def delete_empty_parents(client, child_dir, root_key, timeout=DEFAULT_TIMEOUT):
"""
Attempts to delete child_dir and any empty parent directories.
Makes a best effort. If any of the deletes fail, gives up. This
method is safe, even if another client is writing to the directory (the
delete will fail if the directory becomes non-empty).
:param client: EtcdClient
:param child_dir: Key to delete, along with its parents, should be a
directory.
:param root_key: Prefix of child_key to stop at. Will not be deleted.
:param timeout: Timeout to use on the etcd delete operation.
"""
_log.debug("Deleting empty directories from %s down to %s", child_dir,
root_key)
path_segments = child_dir.strip("/").split("/")
root_path_segments = root_key.strip("/").split("/")
if path_segments[:len(root_path_segments)] != root_path_segments:
raise ValueError("child_key %r must start with root key %r" %
(child_dir, root_key))
for num_seg_to_strip in xrange(len(path_segments) -
len(root_path_segments)):
key_segments = path_segments[:len(path_segments) - num_seg_to_strip]
key_to_delete = "/".join(key_segments)
try:
client.delete(key_to_delete, dir=True, timeout=timeout)
except etcd.EtcdKeyNotFound:
_log.debug("Key %s already deleted", key_to_delete)
continue
except etcd.EtcdDirNotEmpty:
_log.debug("Directory %s not empty, giving up", key_to_delete)
break
except etcd.EtcdException as e:
_log.warning("Failed to delete %s (%r), skipping.",
key_to_delete, e)
break
def __init__(self,
con_pool_size=1,
proxy_url=None,
urllib3_proxy_kwargs=None,
connect_timeout=5.,
read_timeout=5.):
if urllib3_proxy_kwargs is None:
urllib3_proxy_kwargs = dict()
self._connect_timeout = connect_timeout
kwargs = dict(
maxsize=con_pool_size,
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where(),
socket_options=HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
],
timeout=urllib3.Timeout(
connect=self._connect_timeout, read=read_timeout, total=None))
# Set a proxy according to the following order:
# * proxy defined in proxy_url (+ urllib3_proxy_kwargs)
# * proxy set in `HTTPS_PROXY` env. var.
# * proxy set in `https_proxy` env. var.
# * None (if no proxy is configured)
if not proxy_url:
proxy_url = os.environ.get('HTTPS_PROXY') or os.environ.get('https_proxy')
if not proxy_url:
if urllib3.contrib.appengine.is_appengine_sandbox():
# Use URLFetch service if running in App Engine
mgr = urllib3.contrib.appengine.AppEngineManager()
else:
mgr = urllib3.PoolManager(**kwargs)
else:
kwargs.update(urllib3_proxy_kwargs)
if proxy_url.startswith('socks'):
try:
from urllib3.contrib.socks import SOCKSProxyManager
except ImportError:
raise RuntimeError('PySocks is missing')
mgr = SOCKSProxyManager(proxy_url, **kwargs)
else:
mgr = urllib3.proxy_from_url(proxy_url, **kwargs)
if mgr.proxy.auth:
# TODO: what about other auth types?
auth_hdrs = urllib3.make_headers(proxy_basic_auth=mgr.proxy.auth)
mgr.proxy_headers.update(auth_hdrs)
self._con_pool = mgr