def start_websocket(self, run_in_background=False):
socket_params = {}
try:
socket_params = self.get_socket_params()
except requests.exceptions.HTTPError:
print ("Can't Access Socket Params")
ping_interval = 60
ping_timeout = 10
if 'pythonPingInterval' in socket_params.keys():
ping_interval = socket_params['pythonPingInterval']
if 'pythonPingTimeout' in socket_params.keys():
ping_timeout = socket_params['pythonPingTimeout']
url = self.config['socketEndpoint'].format(api_key=self.api_key, access_token=self.access_token)
self.websocket = websocket.WebSocketApp(url,
header={'Authorization: Bearer' + self.access_token},
on_data=self._on_data,
on_error=self._on_error,
on_close=self._on_close)
if run_in_background is True:
self.ws_thread = threading.Thread(target=self.websocket.run_forever)
self.ws_thread.daemon = True
self.ws_thread.start()
else:
self.websocket.run_forever(ping_interval=ping_interval, ping_timeout=ping_timeout)
python类HTTPError()的实例源码
def sendRequest(ip, port, route, data=None, protocol="http"):
url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)
if data is not None:
try:
resp = requests.post(url, data=data)
except requests.HTTPError as e:
raise PipelineServiceError("{reason}".format(reason=e))
else:
try:
resp = requests.get(url)
except requests.HTTPError as e:
raise PipelineServiceError("{reason}".format(reason=e))
return resp
def api_call_helper(self, name, http_method, params, data):
# helper formats the url and reads error codes nicely
url = self.config['host'] + self.config['routes'][name]
if params is not None:
url = url.format(**params)
response = self.api_call(url, http_method, data)
if response.status_code != 200:
raise requests.HTTPError(response.text)
body = json.loads(response.text)
if is_status_2xx(body['code']):
# success
return body['data']
else:
raise requests.HTTPError(response.text)
return
def test_load_inexistent_tables_to_workspace(self):
"""
Workspace endpoint raises HTTPError when mock loading inexistent table
"""
msg = ('404 Client Error: Not Found for url: '
'https://connection.keboola.com/v2/storage/workspaces/78432/'
'load')
responses.add(
responses.Response(
method='POST',
url=('https://connection.keboola.com/v2/storage/workspaces/'
'78432/load'),
body=HTTPError(msg)
)
)
workspace_id = '78432'
mapping = {"in.c-table.does_not_exist": "my-table"}
with self.assertRaises(HTTPError) as error_context:
self.ws.load_tables(workspace_id, mapping)
assert error_context.exception.args[0] == msg
def test_api_call_raises_exception_connection_error(mock_requests,
cloudflare):
for ex in [requests.exceptions.RequestException,
requests.exceptions.HTTPError,
requests.exceptions.ConnectionError,
requests.exceptions.ProxyError,
requests.exceptions.SSLError,
requests.exceptions.Timeout,
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
requests.exceptions.URLRequired,
requests.exceptions.TooManyRedirects,
requests.exceptions.MissingSchema,
requests.exceptions.InvalidSchema,
requests.exceptions.InvalidURL,
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ContentDecodingError,
requests.exceptions.StreamConsumedError,
requests.exceptions.RetryError]:
mock_requests.get.side_effect = ex('Some error')
with pytest.raises(CloudFlareException):
cloudflare._api_call('/foo')
def request_jira(client, url, method='GET', **kwargs):
jwt_authorization = 'JWT %s' % encode_token(
method, url, app.config.get('ADDON_KEY'),
client.sharedSecret)
result = requests.request(
method,
client.baseUrl.rstrip('/') + url,
headers={
"Authorization": jwt_authorization,
"Content-Type": "application/json"
},
**kwargs)
try:
result.raise_for_status()
except requests.HTTPError as e:
raise requests.HTTPError(e.response.text, response=e.response)
return result
def _fetch_index_package_info(self, package_name, current_version):
"""
:type package_name: str
:type current_version: version.Version
"""
try:
package_canonical_name = package_name
if self.PYPI_API_TYPE == 'simple_html':
package_canonical_name = canonicalize_name(package_name)
response = requests.get(self.PYPI_API_URL.format(package=package_canonical_name), timeout=15)
except HTTPError as e: # pragma: nocover
return False, e.message
if not response.ok: # pragma: nocover
return False, 'API error: {}'.format(response.reason)
if self.PYPI_API_TYPE == 'pypi_json':
return self._parse_pypi_json_package_info(package_name, current_version, response)
elif self.PYPI_API_TYPE == 'simple_html':
return self._parse_simple_html_package_info(package_name, current_version, response)
else: # pragma: nocover
raise NotImplementedError('This type of PYPI_API_TYPE type is not supported')
def get_character(self, character_id):
try:
character = self.eve.get_character(character_id)
except requests.HTTPError as ex:
# Patch for character being unresolvable and ESI throwing internal errors
# Temporarily stub character to not break our behavior.
if ex.response.status_code == 500:
character = { 'name': 'Unknown character', 'corporation_id': 98356193 }
else:
raise
return '**%s** (https://zkillboard.com/character/%d/) (%s)' % (
character['name'],
character_id,
self.get_corporation(character['corporation_id'])
)
def get(self, endpoint, args=None):
"""
Wrapper for requests.get
@arg args is a dict wich is converted to URL parameters
"""
answer = requests.get(endpoint, params=args)
if answer.status_code != 200:
if answer.status_code == 404:
self.logger.error(
"endpoint %s or resource not found", endpoint)
self.logger.error(
"error description was: %s",
answer.json()["error_description"])
return None
else:
self.logger.error(
"Error during get request for endpoint %s", endpoint)
self.logger.error("Status code was %d", answer.status_code)
self.logger.error("error message was: %s", answer.json())
raise requests.HTTPError
return answer.json()
def ssidSuccessfullySet(self, ssid_str):
r = requests.put(self.ADMIN_SSID_URL, auth=getAdminAuth(),
data=json.dumps({"value": ssid_str}))
try:
# This assumes there's not a better method to test whether
# the SSID was of a valid length...
r.raise_for_status()
except requests.HTTPError:
return False
if r.json()["result"] != self.SUCCESS_RESPONSE:
return False
r = requests.get(self.ADMIN_SSID_URL, auth=getAdminAuth())
r.raise_for_status()
# Finally, check whether it was actually set
return r.json()["result"][0] == ssid_str
def get_version(self):
"""Fetches the current version number of the Graph API being used."""
args = {"access_token": self.access_token}
try:
response = requests.request("GET",
"https://graph.facebook.com/" +
self.version + "/me",
params=args,
timeout=self.timeout,
proxies=self.proxies)
except requests.HTTPError as e:
response = json.loads(e.read())
raise GraphAPIError(response)
try:
headers = response.headers
version = headers["facebook-api-version"].replace("v", "")
return float(version)
except Exception:
raise GraphAPIError("API version number not available")
def test_categories_returns_200_and_decoded_json(self):
provider_id = '1'
provider_name = 'flinkster'
category_id = '1000'
# Assert querying by network only works as expected
try:
resp = self.api.categories(provider_name)
except HTTPError as e:
self.fail('Status Code Was %s - URL: %s!' % (e.response.status_code, e.request.url))
self.assertIsInstance(resp, dict)
self.assertIn('items', resp)
# Assert querying by network and category ID works as expected
try:
resp = self.api.categories(provider_name, by_id=category_id)
except HTTPError as e:
self.fail('Status Code Was %s - URL: %s!' % (e.response.status_code, e.request.url))
self.assertIsInstance(resp, (list, dict))
self.assertNotIn('items', resp)
def get_time_totals_from_pond(timeallocation, start, end, too, recur=0):
"""
Pond queries are too slow with large proposals and time out, this hack splits the query
when a timeout occurs
"""
if recur > 3:
raise RecursionError('Pond is timing out, too much recursion.')
total = 0
try:
total += query_pond(
timeallocation.proposal.id, start, end, timeallocation.telescope_class, timeallocation.instrument_name, too
)
except requests.HTTPError:
logger.warning('We got a pond inception. Splitting further.')
for start, end in split_time(start, end, 4):
total += get_time_totals_from_pond(timeallocation, start, end, too, recur=recur + 1)
return total
def login(self, username, password, require_ownership=False):
resp = self.session.post(
self.login_url,
params=dict(require_game_ownership=int(require_ownership)),
data=dict(username=username, password=password)
)
try:
json = resp.json()
except:
json = None
try:
resp.raise_for_status()
return json[0]
except requests.HTTPError:
if isinstance(json, dict) and 'message' in json:
if json['message'] == 'Insufficient membership':
raise OwnershipError(json['message'])
else:
raise AuthError(json['message'])
else:
raise
def bucket_get(self, bucket_id):
"""Return the bucket object.
See `API buckets: GET /buckets
<https://storj.github.io/bridge/#!/buckets/get_buckets_id>`_
Args:
bucket_id (str): bucket unique identifier.
Returns:
(:py:class:`model.Bucket`): bucket.
"""
self.logger.info('bucket_get(%s)', bucket_id)
try:
return model.Bucket(**self._request(
method='GET',
path='/buckets/%s' % bucket_id))
except requests.HTTPError as e:
if e.response.status_code == requests.codes.not_found:
return None
else:
self.logger.error('bucket_get() error=%s', e)
raise BridgeError()
def test_bucket_get_not_found(self):
"""Test Client.bucket_get() when bucket does not exist."""
mock_error = requests.HTTPError()
mock_error.response = mock.Mock()
mock_error.response.status_code = 404
self.mock_request.side_effect = mock_error
bucket = self.client.bucket_get('inexistent')
self.mock_request.assert_called_once_with(
method='GET',
path='/buckets/inexistent')
assert bucket is None
def update_json():
global data
try:
# response = requests.get('http://sickpi:3000/station/9100013') # Spittelmarkt
response = requests.get('http://sickpi:3000/station/9160523') # Gotlindestr.
json_data = response.json()
data = json.loads(json.dumps(json_data[0], indent=4))
print('\nUPDATE JSON')
update_departures()
except (requests.HTTPError, requests.ConnectionError):
print('\nERROR UPDATE JSON')
quit_all()
def similar_fragments(self, fragment_id, cutoff, limit=1000):
"""Find similar fragments to query.
Args:
fragment_id (str): Query fragment identifier
cutoff (float): Cutoff, similarity scores below cutoff are discarded.
limit (int): Maximum number of hits. Default is None for no limit.
Returns:
list[dict]: Query fragment identifier, hit fragment identifier and similarity score
Raises:
request.HTTPError: When fragment_id could not be found
"""
url = self.base_url + '/fragments/{fragment_id}/similar'.format(fragment_id=fragment_id)
params = {'cutoff': cutoff, 'limit': limit}
response = requests.get(url, params)
response.raise_for_status()
return response.json()
def _fetch_fragments(self, idtype, ids):
url = self.base_url + '/fragments?{idtype}={ids}'.format(idtype=idtype, ids=','.join(ids))
absent_identifiers = []
try:
response = requests.get(url)
response.raise_for_status()
fragments = response.json()
except HTTPError as e:
if e.response.status_code == 404:
body = e.response.json()
fragments = body['fragments']
absent_identifiers = body['absent_identifiers']
else:
raise e
# Convert molblock string to RDKit Mol object
for fragment in fragments:
if fragment['mol'] is not None:
fragment['mol'] = MolFromMolBlock(fragment['mol'])
return fragments, absent_identifiers
def pharmacophores(self, fragment_ids):
absent_identifiers = []
pharmacophores = []
for fragment_id in fragment_ids:
url = self.base_url + '/fragments/{0}.phar'.format(fragment_id)
try:
response = requests.get(url)
response.raise_for_status()
pharmacophore = response.text
pharmacophores.append(pharmacophore)
except HTTPError as e:
if e.response.status_code == 404:
pharmacophores.append(None)
absent_identifiers.append(fragment_id)
else:
raise e
if absent_identifiers:
raise IncompletePharmacophores(absent_identifiers, pharmacophores)
return pharmacophores
def handle_trigger_event(self, sender, **data):
# Pushover doesn't support images, so just send the event.
notification_data = {
"user": self.pushover_user,
"token": self.pushover_token,
"message": "Camera %s, event: %s" % (data['source'], data['prediction']),
"timestamp": calendar.timegm(data['timestamp'].timetuple())
}
# Optionally, set the device.
if self.pushover_device:
notification_data['device'] = self.pushover_device
try:
r = requests.post("https://api.pushover.net/1/messages.json", data=notification_data)
if r.status_code != 200:
logger.error("Failed to send notification, (%d): %s" % (r.status_code, r.text))
except requests.ConnectionError, e:
logger.error("Connection Error:", e)
except requests.HTTPError, e:
logger.error("HTTP Error:", e)
def raise_for_status(self, allow_redirects=True):
"""Raises stored :class:`HTTPError` or :class:`URLError`, if one occurred."""
if self.status_code == 304:
return
elif self.error:
if self.traceback:
six.reraise(Exception, Exception(self.error), Traceback.from_string(self.traceback).as_traceback())
http_error = HTTPError(self.error)
elif (self.status_code >= 300) and (self.status_code < 400) and not allow_redirects:
http_error = HTTPError('%s Redirection' % (self.status_code))
elif (self.status_code >= 400) and (self.status_code < 500):
http_error = HTTPError('%s Client Error' % (self.status_code))
elif (self.status_code >= 500) and (self.status_code < 600):
http_error = HTTPError('%s Server Error' % (self.status_code))
else:
return
http_error.response = self
raise http_error
def _delete(self, model_instance):
"""
Deletes a serialized SecretModel string from Custodia.
:param model_instance: SecretModel instance to delete.
:type model_instance: commissaire.model.SecretModel
:raises StorageLookupError: if data lookup fails (404 Not Found)
:raises requests.HTTPError: if the request fails (other than 404)
"""
url = self._build_key_url(model_instance)
try:
response = self.session.request(
'DELETE', url, timeout=self.CUSTODIA_TIMEOUT)
response.raise_for_status()
except requests.HTTPError as error:
# XXX bool(response) defers to response.ok, which is a misfeature.
# Have to explicitly test "if response is None" to know if the
# object is there.
have_response = response is not None
if have_response and error.response.status_code == 404:
raise StorageLookupError(str(error), model_instance)
else:
raise error
def connect(self):
self.gg.setup(self.url_ro)
self.gg.set_token(self._privatekey)
self.gg.set_default_private(self.default_create_private)
self.gg.setup_session(
self.session_certificate or not self.session_insecure,
self.session_proxy)
try:
self.username = self.user # Call to self.gg.authenticated_user()
except HTTPError as err:
if err.response is not None and err.response.status_code == 401:
if not self._privatekey:
raise ConnectionError('Could not connect to GoGS. '
'Please configure .gitconfig '
'with your gogs private key.') from err
else:
raise ConnectionError('Could not connect to GoGS. '
'Check your configuration and try again.') from err
else:
raise err
def _comment_exists(self):
"""
This is yet another horrible workaround to keep this from falling apart.
Because of the other workarounds to get around the fact that you
cannot get metadata from a comment, we've run into problems where it
is not always obvious if a comment even exists. For this reason, we
need to have a method to test it for us.
"""
try:
# Getting the reply data from a comment should raise an error if
# it doesn't exist. We use Comment's get_reply_data because
# ProgramCommentReply's get_reply_data is another one of those
# horrible workarounds
list(Comment(self.id, self.get_program()).get_reply_data())
except requests.HTTPError:
return False
return True
def get_metadata(self):
"""Returns a dictionary with information about this ``ProgramCommentReply``."""
# there's no way that I've found to get comment reply metadata directly,
# so we iterate though the comment thread until you find this comment
for comment_data in self.get_parent().get_reply_data():
if comment_data["key"] == self.id:
return comment_data
raise _CommentDoesntExistError(self)
# I'm keeping this todo until I can fully address it, although I did
# add an error
# todo: raise some error instead of returning None. What error? IDK.
# I'm almost tempted to pretend it's an HTTPError, but I'll need to do
# some research into why we would get here (We can get here btw.
# That's how I found this). Would self.get_parent().get_reply_data()
# raise an HTTPError if self.comment_key was nonsense? If that's the
# case, we will only (probably) be here if comment_key was a
# ProgramComment key instead of a ProgramCommentReply key, so we would
# probably want to have an error that's more specific (like TypeError
# maybe?). Man, there are so many edge cases with this stuff that I
# really should look into now that I think about it... We are also
# probably going to need to keep a lot of this comment to explain why
# we raise the error we do.
def request(self, path, method='GET', **kwargs):
try:
url = ENDPOINT_URL + path
response = requests.request(
method=method,
url=url,
headers=self.headers,
timeout=self.timeout,
**kwargs
)
except requests.HTTPError as e:
response = json.loads(e.read())
except requests.ConnectionError as e:
raise VoucherifyError(e)
if response.headers.get('content-type') and 'json' in response.headers['content-type']:
result = response.json()
else:
result = response.text
if isinstance(result, dict) and result.get('error'):
raise VoucherifyError(result)
return result
def get_from_chain(url_adder):
url = 'https://api.chain.com/v2/bitcoin/%s' % (url_adder)
ok = False
while not ok:
try:
r = requests.get(url, auth=(CHAIN_API_KEY, CHAIN_API_SECRET))
r.raise_for_status()
ok = True
except requests.HTTPError as e:
if r.status_code == 429: # Too many requests
time.sleep(1)
else:
print("Request was to %s" % (url))
raise e
b = json.loads(r.text)
return b
def goto_course_list(self):
"""Go to the course picker."""
long_wait = WebDriverWait(self.driver, 30)
try:
long_wait.until(
expect.presence_of_element_located(
(By.ID, 'ox-react-root-container')
)
)
if 'tutor' in self.current_url():
self.find(By.CSS_SELECTOR, '.ui-brand-logo').click()
self.page.wait_for_page_load()
else:
raise HTTPError('Not currently on an OpenStax Tutor webpage:' +
'%s' % self.current_url())
except Exception as ex:
raise ex
def push(self):
"""
Push data to API.
:return: push success or not
"""
try:
re = requests.post(url=self.api, data={"info": json.dumps(self.post_data, ensure_ascii=False)})
result = re.json()
if result.get("vul_pdf", "") != "":
logger.info('[PUSH API] Push success!')
return True
else:
logger.warning('[PUSH API] Push result error: {0}'.format(re.text))
return False
except (requests.ConnectionError, requests.HTTPError) as error:
logger.critical('[PUSH API] Network error: {0}'.format(str(error)))
return False
except ValueError as error:
logger.critical('[PUSH API] Response error: {0}'.format(str(error)))
return False