def dorequest(url, data = "", method = 'GET'):
try:
if method == 'GET':
response = urllib.request.urlopen(url, timeout=10).read()
else:
# use PUT/DELETE/POST, data should be encoded in ascii/bytes
request = urllib.request.Request(url, data = data.encode('ascii'), method = method)
response = urllib.request.urlopen(request, timeout=10).read()
# etcd may return json result with response http error code
# http error code will raise exception in urlopen
# catch the HTTPError and get the json result
except urllib.error.HTTPError as e:
# e.fp must be read() in this except block.
# the e will be deleted and e.fp will be closed after this block
response = e.fp.read()
# response is encoded in bytes.
# recoded in utf-8 and loaded in json
result = json.loads(str(response, encoding='utf-8'))
return result
# client to use etcd
# not all APIs are implemented below. just implement what we want
python类error()的实例源码
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM,
type=str, help='Search term (default: %(default)s)')
parser.add_argument('-l', '--location', dest='location',
default=DEFAULT_LOCATION, type=str,
help='Search location (default: %(default)s)')
input_values = parser.parse_args()
try:
query_api(input_values.term, input_values.location)
except HTTPError as error:
sys.exit(
'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format(
error.code,
error.url,
error.read(),
)
)
def onFinished(self ):
self.hasFinished = True
if self.request.error()==self.request.NoError:
self.requestSucceeded.emit( self )
else:
try:
errorDescr = NetworkErrorDescrs[ self.request.error() ]
except: #Unknown error
errorDescr = None
if errorDescr:
QtGui.QMessageBox.warning( None, "Networkrequest Failed", "The request to \"%s\" failed with: %s" % (self.url, errorDescr) )
self.requestFailed.emit( self, self.request.error() )
self.finished.emit( self )
self.logger.debug("Request finished: %s", str(self) )
self.logger.debug("Remaining requests: %s", len(NetworkService.currentRequests) )
def get_pypi_src_download(package):
url = 'https://pypi.python.org/pypi/%s/json'%(package,)
fp = urllib.urlopen(url)
try:
try:
data = fp.read()
finally:
fp.close()
except urllib.error:
raise RuntimeError("Cannot determine download link for %s"%(package,))
pkgdata = json.loads(data.decode('utf-8'))
if 'urls' not in pkgdata:
raise RuntimeError("Cannot determine download link for %s"%(package,))
for info in pkgdata['urls']:
if info['packagetype'] == 'sdist' and info['url'].endswith('tar.gz'):
return (info.get('md5_digest'), info['url'])
raise RuntimeError("Cannot determine downlink link for %s"%(package,))
def get_img_and_text(self, plugin_config, cli_args):
imgs = []
enable_safe_search = True if cli_args.safe_search else False
self.logger.debug('setting image safe search to {}'.format(enable_safe_search))
if cli_args.keyword:
self.logger.info('using custom keyword {}'.format(cli_args.keyword))
imgs = self._fetch_img_urls(cli_args.keyword, safe_search=enable_safe_search)
else:
imgs = self._get_images_for_random_keyword(safe_search=enable_safe_search)
if not imgs:
self.logger.error('no images found for given keyword')
exit(1)
if cli_args.keyword:
img = random.choice(imgs)[2]
else:
img = imgs[0][2] # always choose first img because search key is random anyway
self.logger.info('choosing image {}'.format(img))
return {
'img': self._read_from_url(img),
'text': ''
}
def check_version():
try:
response = request.urlopen("http://cli.puresec.io/verify/version/{}".format(puresec_cli.__version__))
except urllib.error.URLError:
return
try:
response = json.loads(response.read().decode())
except ValueError:
return
if not isinstance(response, dict):
return
try:
is_uptodate, last_version = response['is_uptodate'], response['last_version']
except KeyError:
return
if not is_uptodate:
eprint("warn: you are using an outdated version of PureSec CLI (installed={}, latest={})".format(puresec_cli.__version__, last_version))
def TTSBaidu(self, tid, txt, lan, spd):
'''
get the BAIDU.COM's TTS url
filename: save the txt's Speech in the file with filetype .wav
lan: language, 'en' for English or 'zh' for Chinese
txt: the TTS text
spd: the speedding of read
'''
socket.setdefaulttimeout(34.0)
try:
#print('processing... ',tid, txt)
ttsUrl = genTTSUrl(lan ,txt, spd)
c = getpage(ttsUrl)
#?master?????
#print('processing...finished',tid)
self.results[tid]=c
except urllib.error.URLError as e:
print("error:URLError ",e," we will try again...tid:",tid)
self.TTSBaidu(tid, txt, lan, spd)
except socket.timeout:
print("error: TTSBaidu time out!, we will try again...tid:",tid )
self.TTSBaidu(tid, txt, lan, spd)
finally:
pass
def save_images(term, count):
api_key = get_api_key()
images = search_images(term, count, api_key)
filenames = []
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
for i, img in enumerate(images):
if img['encodingFormat'] == 'unknown':
continue
name = "{path}/{filename}.{ext}".format(
path=SAVE_DIR,
filename="_".join(term.split()) + str(i),
ext=img['encodingFormat'])
try:
download_image(img['thumbnailUrl'], name)
filenames.append(name)
except urllib.error.HTTPError:
pass
return filenames
def log(self, message='', err=None, level='info'):
"""
Log a message
"""
if not level.lower() in [
'critical',
'debug',
'error',
'fatal',
'info',
'warning'
]:
level = 'info'
if err:
level = 'error'
message += ' Threw exception:\n\t{}'.format(err)
try:
func = getattr(self.logger, level.lower())
func(message)
except Exception as log_err:
self.logger.critical(
"Could not write to log. Threw exception:\n\t{}".format(log_err))
def __mkfile(self):
"""Create new file"""
name = current = None
curDir = newFile = None
if 'name' in self._request and 'current' in self._request:
name = self._request['name']
current = self._request['current']
curDir = self.__findDir(current, None)
newFile = os.path.join(curDir, name)
if not curDir or not name:
self._response['error'] = 'Invalid parameters'
elif not self.__isAllowed(curDir, 'write'):
self._response['error'] = 'Access denied'
elif not self.__checkName(name):
self._response['error'] = 'Invalid name'
elif os.path.exists(newFile):
self._response['error'] = 'File or folder with the same name already exists'
else:
try:
open(newFile, 'w').close()
self._response['select'] = [self.__hash(newFile)]
self.__content(curDir, False)
except:
self._response['error'] = 'Unable to create file'
def __rm(self):
"""Delete files and directories"""
current = rmList = None
curDir = rmFile = None
if 'current' in self._request and 'targets[]' in self._request:
current = self._request['current']
rmList = self._request['targets[]']
curDir = self.__findDir(current, None)
if not rmList or not curDir:
self._response['error'] = 'Invalid parameters'
return False
if not isinstance(rmList, list):
rmList = [rmList]
for rm in rmList:
rmFile = self.__find(rm, curDir)
if not rmFile: continue
self.__remove(rmFile)
# TODO if errorData not empty return error
self.__content(curDir, True)
def __duplicate(self):
"""Create copy of files/directories"""
if 'current' in self._request and 'target' in self._request:
curDir = self.__findDir(self._request['current'], None)
target = self.__find(self._request['target'], curDir)
if not curDir or not target:
self._response['error'] = 'Invalid parameters'
return
if not self.__isAllowed(target, 'read') or not self.__isAllowed(curDir, 'write'):
self._response['error'] = 'Access denied'
newName = self.__uniqueName(target)
if not self.__copy(target, newName):
self._response['error'] = 'Unable to create file copy'
return
self.__content(curDir, True)
return
def __edit(self):
"""Save content in file"""
error = ''
if 'current' in self._request and 'target' in self._request and 'content' in self._request:
curDir = self.__findDir(self._request['current'], None)
curFile = self.__find(self._request['target'], curDir)
error = curFile
if curFile and curDir:
if self.__isAllowed(curFile, 'write'):
try:
f = open(curFile, 'w+')
f.write(self._request['content'])
f.close()
self._response['target'] = self.__info(curFile)
except:
self._response['error'] = 'Unable to write to file'
else:
self._response['error'] = 'Access denied'
return
self._response['error'] = 'Invalid parameters'
return
def removeNoise (s):
import re
pattern_list = ["\[\[(.*?)\]\]", "{{(.*?)}}", "\[(.*?)\]"]
clean = s
for pattern in pattern_list:
regex = re.compile(pattern)
clean = re.sub (regex, "", clean)
return clean
#==============================================================================#
#==============================================================================#
# Get the html for a comic from the explainxkcd website
# Extract the transcript from the text
# Check if the transcript is mark as incomplete
# if yes, mark it as so locally (for later updates)
# Returns a dictionary (result)
# If an error occured the status is not 0 and the error is passed in the error
# field of the returned dictionary
def get_xkcd(number = 0):
if number is 0:
url ='https://xkcd.com/info.0.json'
else:
url = 'https://xkcd.com/{}/info.0.json'.format (number)
response = {'status': 0, 'error': '', 'comic': ""}
try:
online_comic = urlopen(url).read ()
response['comic'] = json.loads (online_comic.decode('utf-8'))
except urllib.error.HTTPError:
response['status'] = -1
except IOError:
response['status'] = -2
except:
response['status'] = -3
return response
#==============================================================================##==============================================================================#
def download_page(url, referer, maxretries, timeout, pause):
tries = 0
htmlpage = None
while tries < maxretries and htmlpage is None:
try:
code = 404
req = request.Request(url)
req.add_header('Referer', referer)
req.add_header('User-agent',
'Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.30 (KHTML, like Gecko) Ubuntu/11.04 Chromium/12.0.742.91 Chrome/12.0.742.91 Safari/534.30')
with closing(request.urlopen(req, timeout=timeout)) as f:
code = f.getcode()
htmlpage = f.read()
sleep(pause)
except (urlerror.URLError, socket.timeout, socket.error):
tries += 1
if htmlpage:
return htmlpage.decode('utf-8'), code
else:
return None, code
def test_short_content_raises_ContentTooShortError(self):
self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1
FF
''')
def _reporthook(par1, par2, par3):
pass
with self.assertRaises(urllib.error.ContentTooShortError):
try:
urllib.request.urlretrieve('http://example.com/',
reporthook=_reporthook)
finally:
self.unfakehttp()
def handle(self, fn_name, action, *args, **kwds):
self.parent.calls.append((self, fn_name, args, kwds))
if action is None:
return None
elif action == "return self":
return self
elif action == "return response":
res = MockResponse(200, "OK", {}, "")
return res
elif action == "return request":
return Request("http://blah/")
elif action.startswith("error"):
code = action[action.rfind(" ")+1:]
try:
code = int(code)
except ValueError:
pass
res = MockResponse(200, "OK", {}, "")
return self.parent.error("http", args[0], res, code, "", {})
elif action == "raise":
raise urllib.error.URLError("blah")
assert False
def test_badly_named_methods(self):
# test work-around for three methods that accidentally follow the
# naming conventions for handler methods
# (*_open() / *_request() / *_response())
# These used to call the accidentally-named methods, causing a
# TypeError in real code; here, returning self from these mock
# methods would either cause no exception, or AttributeError.
from urllib.error import URLError
o = OpenerDirector()
meth_spec = [
[("do_open", "return self"), ("proxy_open", "return self")],
[("redirect_request", "return self")],
]
handlers = add_ordered_mock_handlers(o, meth_spec)
o.add_handler(urllib.request.UnknownHandler())
for scheme in "do", "proxy", "redirect":
self.assertRaises(URLError, o.open, scheme+"://example.com/")
def test_raise(self):
# raising URLError stops processing of request
o = OpenerDirector()
meth_spec = [
[("http_open", "raise")],
[("http_open", "return self")],
]
handlers = add_ordered_mock_handlers(o, meth_spec)
req = Request("http://example.com/")
self.assertRaises(urllib.error.URLError, o.open, req)
self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
## def test_error(self):
## # XXX this doesn't actually seem to be used in standard library,
## # but should really be tested anyway...
def test_errors(self):
h = urllib.request.HTTPErrorProcessor()
o = h.parent = MockOpener()
url = "http://example.com/"
req = Request(url)
# all 2xx are passed through
r = MockResponse(200, "OK", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
r = MockResponse(202, "Accepted", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
r = MockResponse(206, "Partial content", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
# anything else calls o.error (and MockOpener returns None, here)
r = MockResponse(502, "Bad gateway", {}, "", url)
self.assertIsNone(h.http_response(req, r))
self.assertEqual(o.proto, "http") # o.error called
self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http','https','ftp']
invalid_schemes = ['file','imap','ldap']
schemeless_url = "example.com/b.html"
h = urllib.request.HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib.error.HTTPError, h.http_error_302,
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
h.http_error_302(req, MockFile(), 302, "That's fine",
MockHeaders({"location": valid_url}))
self.assertEqual(o.req.get_full_url(), valid_url)
def batch_sign(request):
tasks = request.sign_session['tasks']
if not tasks:
return redirect('ecs.dashboard.views.view_dashboard')
task = _get_tasks(request.user).get(pk=tasks[0])
data = request.sign_session['data_func'](request, task)
data['sign_session_id'] = request.sign_session.id
sign_data = _store_sign_data(data)
if request.user.email.startswith('signing_fail'):
return sign_error(request, pdf_id=sign_data.id, error='forced failure', cause='requested force_fail, so we failed')
return render(request, 'signature/batch.html', {
'sign_url': get_pdfas_url(request, sign_data),
'pdf_id': sign_data.id,
})
def get_pdfas_url(request, sign_data):
values = {
'connector': 'onlinebku',
'invoke-app-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_receive', kwargs={'pdf_id': sign_data.id})),
'invoke-app-url-target': '_top',
'invoke-app-error-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_error', kwargs={'pdf_id': sign_data.id})),
'locale': 'DE',
'num-bytes': str(len(sign_data['pdf_data'])),
'sig_type': 'SIGNATURBLOCK_DE',
'pdf-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_send', kwargs={'pdf_id': sign_data.id})),
'verify-level': 'intOnly', # Dies bedeutet, dass eine Signaturprüfung durchgeführt wird, allerdings ohne Zertifikatsprüfung.
'filename': sign_data['document_filename'],
#'preview': 'false',
#'mode': 'binary',
#'inline': 'false',
#'pdf-id': sign_data.id,
}
data = urllib.parse.urlencode({k: v.encode('utf-8') for k, v in values.items()})
return '{0}Sign?{1}'.format(settings.PDFAS_SERVICE, data)
def stream_live_check(stream):
url = "https://api.twitch.tv/kraken/streams/{}".format(stream.lower())
try:
contents = json.loads(urllib.request.urlopen(url).read().decode("utf-8"))
if contents["stream"] == None:
status = "offline"
bot_message = "{} is offline.".format(stream)
else:
#print(contents)
name = contents["stream"]["channel"]["name"]
title = contents["stream"]["channel"]["status"]
game = contents["stream"]["channel"]["game"]
viewers = contents["stream"]["viewers"]
bot_message = "{0} is online.\n{0}'s title is: {1} \n{0} is playing {2} \nThere are {3} viewers \n".format(name,title,game,viewers)
except urllib.error.URLError as e:
if e.reason == "Not found" or e.reason == "Unprocessable Entity":
bot_message = "That stream doesn't exist."
else:
bot_message = "There was an error proccessing your request."
return bot_message
def test_short_content_raises_ContentTooShortError(self):
self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1
FF
''')
def _reporthook(par1, par2, par3):
pass
with self.assertRaises(urllib.error.ContentTooShortError):
try:
urllib.request.urlretrieve('http://example.com/',
reporthook=_reporthook)
finally:
self.unfakehttp()
def handle(self, fn_name, action, *args, **kwds):
self.parent.calls.append((self, fn_name, args, kwds))
if action is None:
return None
elif action == "return self":
return self
elif action == "return response":
res = MockResponse(200, "OK", {}, "")
return res
elif action == "return request":
return Request("http://blah/")
elif action.startswith("error"):
code = action[action.rfind(" ")+1:]
try:
code = int(code)
except ValueError:
pass
res = MockResponse(200, "OK", {}, "")
return self.parent.error("http", args[0], res, code, "", {})
elif action == "raise":
raise urllib.error.URLError("blah")
assert False
def test_badly_named_methods(self):
# test work-around for three methods that accidentally follow the
# naming conventions for handler methods
# (*_open() / *_request() / *_response())
# These used to call the accidentally-named methods, causing a
# TypeError in real code; here, returning self from these mock
# methods would either cause no exception, or AttributeError.
from urllib.error import URLError
o = OpenerDirector()
meth_spec = [
[("do_open", "return self"), ("proxy_open", "return self")],
[("redirect_request", "return self")],
]
add_ordered_mock_handlers(o, meth_spec)
o.add_handler(urllib.request.UnknownHandler())
for scheme in "do", "proxy", "redirect":
self.assertRaises(URLError, o.open, scheme+"://example.com/")
def test_errors(self):
h = urllib.request.HTTPErrorProcessor()
o = h.parent = MockOpener()
url = "http://example.com/"
req = Request(url)
# all 2xx are passed through
r = MockResponse(200, "OK", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
r = MockResponse(202, "Accepted", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
r = MockResponse(206, "Partial content", {}, "", url)
newr = h.http_response(req, r)
self.assertIs(r, newr)
self.assertFalse(hasattr(o, "proto")) # o.error not called
# anything else calls o.error (and MockOpener returns None, here)
r = MockResponse(502, "Bad gateway", {}, "", url)
self.assertIsNone(h.http_response(req, r))
self.assertEqual(o.proto, "http") # o.error called
self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http','https','ftp']
invalid_schemes = ['file','imap','ldap']
schemeless_url = "example.com/b.html"
h = urllib.request.HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib.error.HTTPError, h.http_error_302,
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
h.http_error_302(req, MockFile(), 302, "That's fine",
MockHeaders({"location": valid_url}))
self.assertEqual(o.req.get_full_url(), valid_url)