def __data_parser__(self, data):
try:
if data['mods']['itemlist']['data']['auctions']:
search_results = data['mods']['itemlist']['data']['auctions']
return [{
'intro': result["raw_title"],
'price': float(result["view_price"]),
'delivery': colorful_text(result["view_fee"], Fore.RED)
if float(result["view_fee"]) > 0 else result["view_fee"],
'sales': int(result["view_sales"].split('?')[0]),
'belong': colorful_text("??", Fore.CYAN)
if result.get('shopcard', {}).get('isTmall', False) else "??",
'url': result["detail_url"]
} for result in search_results]
error('Ops, get no goods..')
return []
except KeyError:
error('Ops, some key error happened..')
return []
python类error()的实例源码
def get_base64_saver(loading, url):
def callback(content):
if isinstance(content, urllib.error.HTTPError):
if content.getcode() == 404:
loading[url] = 404
return
elif isinstance(content, urllib.error.URLError):
if (content.reason.errno == 11001 and
content.reason.strerror == 'getaddrinfo failed'):
loading[url] = 404
return
return sublime.error_message('An unexpected error has occured: ' +
str(content))
loading[url] = to_base64(content=content)
return callback
def getTaskStatus(taskId):
"""Retrieve status of one or more long-running tasks.
Args:
taskId: ID of the task or a list of multiple IDs.
Returns:
List containing one object for each queried task, in the same order as
the input array, each object containing the following values:
id (string) ID of the task.
state (string) State of the task, one of READY, RUNNING, COMPLETED,
FAILED, CANCELLED; or UNKNOWN if the task with the specified ID
doesn't exist.
error_message (string) For a FAILED task, a description of the error.
"""
if isinstance(taskId, six.string_types):
taskId = [taskId]
args = {'q': ','.join(taskId)}
return send_('/taskstatus', args, 'GET')
def __init__(self, http_response, response_body=None):
"""Sets the HTTP information in the error.
Args:
http_response: The response from the server, contains error information.
response_body: string (optional) specified if the response has already
been read from the http_response object.
"""
body = response_body or http_response.read()
self.status = http_response.status
self.reason = http_response.reason
self.body = body
self.headers = atom.http_core.get_headers(http_response)
self.error_msg = 'Invalid response %s.' % self.status
try:
json_from_body = simplejson.loads(body)
if isinstance(json_from_body, dict):
self.error_msg = json_from_body.get('error', self.error_msg)
except (ValueError, JSONDecodeError):
pass
def _download_file(self, uri, file_path, **kwargs):
"""Downloads a file to disk from the specified URI.
Note: to download a file in memory, use the GetContent() method.
Args:
uri: str The full URL to download the file from.
file_path: str The full path to save the file to.
kwargs: Other parameters to pass to self.get_content().
Raises:
gdata.client.RequestError: on error response from server.
"""
f = open(file_path, 'wb')
try:
f.write(self._get_content(uri, **kwargs))
except gdata.client.RequestError as e:
f.close()
raise e
f.flush()
f.close()
def UpgradeToSessionToken(self, token=None):
"""Upgrades a single use AuthSub token to a session token.
Args:
token: A gdata.auth.AuthSubToken or gdata.auth.SecureAuthSubToken
(optional) which is good for a single use but can be upgraded
to a session token. If no token is passed in, the token
is found by looking in the token_store by looking for a token
for the current scope.
Raises:
NonAuthSubToken if the user's auth token is not an AuthSub token
TokenUpgradeFailed if the server responded to the request with an
error.
"""
if token is None:
scopes = lookup_scopes(self.service)
if scopes:
token = self.token_store.find_token(scopes[0])
else:
token = self.token_store.find_token(atom.token_store.SCOPE_ALL)
if not isinstance(token, gdata.auth.AuthSubToken):
raise NonAuthSubToken
self.SetAuthSubToken(self.upgrade_to_session_token(token))
def test_short_content_raises_ContentTooShortError(self):
self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1
FF
''')
def _reporthook(par1, par2, par3):
pass
with self.assertRaises(urllib.error.ContentTooShortError):
try:
urllib.request.urlretrieve('http://example.com/',
reporthook=_reporthook)
finally:
self.unfakehttp()
def change_cwd(path, quiet=False):
"""Return a context manager that changes the current working directory.
Arguments:
path: the directory to use as the temporary current working directory.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, it issues only a warning and keeps the current
working directory the same.
"""
saved_dir = os.getcwd()
try:
os.chdir(path)
except OSError:
if not quiet:
raise
warnings.warn('tests may fail, unable to change CWD to: ' + path,
RuntimeWarning, stacklevel=3)
try:
yield os.getcwd()
finally:
os.chdir(saved_dir)
def temp_cwd(name='tempcwd', quiet=False):
"""
Context manager that temporarily creates and changes the CWD.
The function temporarily changes the current working directory
after creating a temporary directory in the current directory with
name *name*. If *name* is None, the temporary directory is
created using tempfile.mkdtemp.
If *quiet* is False (default) and it is not possible to
create or change the CWD, an error is raised. If *quiet* is True,
only a warning is raised and the original CWD is used.
"""
with temp_dir(path=name, quiet=quiet) as temp_path:
with change_cwd(temp_path, quiet=quiet) as cwd_dir:
yield cwd_dir
def handle(self, fn_name, action, *args, **kwds):
self.parent.calls.append((self, fn_name, args, kwds))
if action is None:
return None
elif action == "return self":
return self
elif action == "return response":
res = MockResponse(200, "OK", {}, "")
return res
elif action == "return request":
return Request("http://blah/")
elif action.startswith("error"):
code = action[action.rfind(" ")+1:]
try:
code = int(code)
except ValueError:
pass
res = MockResponse(200, "OK", {}, "")
return self.parent.error("http", args[0], res, code, "", {})
elif action == "raise":
raise urllib.error.URLError("blah")
assert False
def test_badly_named_methods(self):
# test work-around for three methods that accidentally follow the
# naming conventions for handler methods
# (*_open() / *_request() / *_response())
# These used to call the accidentally-named methods, causing a
# TypeError in real code; here, returning self from these mock
# methods would either cause no exception, or AttributeError.
from urllib.error import URLError
o = OpenerDirector()
meth_spec = [
[("do_open", "return self"), ("proxy_open", "return self")],
[("redirect_request", "return self")],
]
add_ordered_mock_handlers(o, meth_spec)
o.add_handler(urllib.request.UnknownHandler())
for scheme in "do", "proxy", "redirect":
self.assertRaises(URLError, o.open, scheme+"://example.com/")
def test_errors(self):
h = urllib.request.HTTPErrorProcessor()
o = h.parent = MockOpener()
url = "http://example.com/"
req = Request(url)
# all 2xx are passed through
r = MockResponse(200, "OK", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
r = MockResponse(202, "Accepted", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
r = MockResponse(206, "Partial content", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
# anything else calls o.error (and MockOpener returns None, here)
r = MockResponse(502, "Bad gateway", {}, "", url)
self.assertIsNone(h.http_response(req, r))
self.assertEqual(o.proto, "http") # o.error called
self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http','https','ftp']
invalid_schemes = ['file','imap','ldap']
schemeless_url = "example.com/b.html"
h = urllib.request.HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib.error.HTTPError, h.http_error_302,
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
h.http_error_302(req, MockFile(), 302, "That's fine",
MockHeaders({"location": valid_url}))
self.assertEqual(o.req.get_full_url(), valid_url)
def _download(self):
try:
try:
import urllib.request
from urllib.error import URLError, HTTPError
with urllib.request.urlopen(self.url) as response, \
open(self.outputfile_origin, 'wb') as outfile:
shutil.copyfileobj(response, outfile)
except (AttributeError, ImportError):
import urllib
urllib.urlretrieve(self.url, self.outputfile_origin)
except (URLError, HTTPError, IOError, Exception) as e:
logger.debug("Unable to retrieve %s for %s", self.url, e)
def _buy(self, amount, price):
"""Create a buy limit order"""
params = {"amount": amount, "price": price}
response = self._send_request(self.buy_url, params)
if "error" in response:
raise TradeException(response["error"])
def _sell(self, amount, price):
"""Create a sell limit order"""
params = {"amount": amount, "price": price}
response = self._send_request(self.sell_url, params)
if "error" in response:
raise TradeException(response["error"])
def __fetch_epg(channel: Channel, epg_url: EPGURL):
try:
html = utils.get_response(epg_url.url, epg_url.data)
start_end_data = channel.epg_parser.parse_schedule_page(html)
epg_utils.normalize_times(start_end_data, channel.epg_data.get_normalization())
return get_response(start_end_data, channel.channel_id)
except urllib.error.URLError:
return ''
def kegg_rest_request(query):
url = 'http://rest.kegg.jp/%s' % (query)
print(url)
try:
data = urllib.request.urlopen(url).read()
except urllib.error.HTTPError as e:
print("HTTP error: %d" % e.code)
except urllib.error.URLError as e:
print("Network error: %s" % e.reason.args[1])
return data
def check_errors(self):
errors = False
for c in self.constraints:
if not c.required_version:
errors = True
reasons = c.why()
if len(reasons) == 1:
Logs.error('%s but no matching package could be found in this repository' % reasons[0])
else:
Logs.error('Conflicts on package %r:' % c.pkgname)
for r in reasons:
Logs.error(' %s' % r)
if errors:
self.fatal('The package requirements cannot be satisfied!')
def load_constraints(self, pkgname, pkgver, requires=REQUIRES):
try:
return self.cache_constraints[(pkgname, pkgver)]
except KeyError:
#Logs.error("no key %r" % (pkgname, pkgver))
text = Utils.readf(os.path.join(get_distnet_cache(), pkgname, pkgver, requires))
ret = parse_constraints(text)
self.cache_constraints[(pkgname, pkgver)] = ret
return ret