python类error()的实例源码

etcdlib.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dorequest(url, data = "", method = 'GET'):
    try: 
        if method == 'GET':
            response = urllib.request.urlopen(url, timeout=10).read()
        else:
            # use PUT/DELETE/POST, data should be encoded in ascii/bytes 
            request = urllib.request.Request(url, data = data.encode('ascii'), method = method)
            response = urllib.request.urlopen(request, timeout=10).read()
    # etcd may return json result with response http error code
    # http error code will raise exception in urlopen
    # catch the HTTPError and get the json result
    except urllib.error.HTTPError as e:
        # e.fp must be read() in this except block.
        # the e will be deleted and e.fp will be closed after this block
        response = e.fp.read()
    # response is encoded in bytes. 
    # recoded in utf-8 and loaded in json
    result = json.loads(str(response, encoding='utf-8'))
    return result


# client to use etcd
# not all APIs are implemented below. just implement what we want
sample.py 文件源码 项目:yelp-fusion 作者: Yelp 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM,
                        type=str, help='Search term (default: %(default)s)')
    parser.add_argument('-l', '--location', dest='location',
                        default=DEFAULT_LOCATION, type=str,
                        help='Search location (default: %(default)s)')

    input_values = parser.parse_args()

    try:
        query_api(input_values.term, input_values.location)
    except HTTPError as error:
        sys.exit(
            'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format(
                error.code,
                error.url,
                error.read(),
            )
        )
network.py 文件源码 项目:transfer 作者: viur-framework 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def onFinished(self ):

        self.hasFinished = True
        if self.request.error()==self.request.NoError:
            self.requestSucceeded.emit( self )
        else:
            try:
                errorDescr = NetworkErrorDescrs[ self.request.error() ]
            except: #Unknown error 
                errorDescr = None
            if errorDescr:
                QtGui.QMessageBox.warning( None, "Networkrequest Failed", "The request to \"%s\" failed with: %s" % (self.url, errorDescr) )
            self.requestFailed.emit( self, self.request.error() )
        self.finished.emit( self )
        self.logger.debug("Request finished: %s", str(self) )
        self.logger.debug("Remaining requests: %s",  len(NetworkService.currentRequests) )
setup.py 文件源码 项目:macholib 作者: secmobi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_pypi_src_download(package):
        url = 'https://pypi.python.org/pypi/%s/json'%(package,)
        fp = urllib.urlopen(url)
        try:
            try:
                data = fp.read()

            finally:
                fp.close()
        except urllib.error:
            raise RuntimeError("Cannot determine download link for %s"%(package,))

        pkgdata = json.loads(data.decode('utf-8'))
        if 'urls' not in pkgdata:
            raise RuntimeError("Cannot determine download link for %s"%(package,))

        for info in pkgdata['urls']:
            if info['packagetype'] == 'sdist' and info['url'].endswith('tar.gz'):
                return (info.get('md5_digest'), info['url'])

        raise RuntimeError("Cannot determine downlink link for %s"%(package,))
postcards_random.py 文件源码 项目:postcards 作者: abertschi 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_img_and_text(self, plugin_config, cli_args):
        imgs = []
        enable_safe_search = True if cli_args.safe_search else False
        self.logger.debug('setting image safe search to {}'.format(enable_safe_search))

        if cli_args.keyword:
            self.logger.info('using custom keyword {}'.format(cli_args.keyword))
            imgs = self._fetch_img_urls(cli_args.keyword, safe_search=enable_safe_search)
        else:
            imgs = self._get_images_for_random_keyword(safe_search=enable_safe_search)

        if not imgs:
            self.logger.error('no images found for given keyword')
            exit(1)

        if cli_args.keyword:
            img = random.choice(imgs)[2]
        else:
            img = imgs[0][2]  # always choose first img because search key is random anyway

        self.logger.info('choosing image {}'.format(img))
        return {
            'img': self._read_from_url(img),
            'text': ''
        }
cli.py 文件源码 项目:puresec-cli 作者: puresec 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check_version():
    try:
        response = request.urlopen("http://cli.puresec.io/verify/version/{}".format(puresec_cli.__version__))
    except urllib.error.URLError:
        return

    try:
        response = json.loads(response.read().decode())
    except ValueError:
        return

    if not isinstance(response, dict):
        return

    try:
        is_uptodate, last_version = response['is_uptodate'], response['last_version']
    except KeyError:
        return

    if not is_uptodate:
        eprint("warn: you are using an outdated version of PureSec CLI (installed={}, latest={})".format(puresec_cli.__version__, last_version))
tts_baidu.py 文件源码 项目:tts-stray 作者: tweetyf 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def TTSBaidu(self, tid, txt, lan, spd):
        ''' 
            get the BAIDU.COM's TTS url
            filename: save the txt's Speech in the file with filetype .wav
            lan: language, 'en' for English or 'zh' for Chinese
            txt: the TTS text
            spd: the speedding of read
        '''
        socket.setdefaulttimeout(34.0)
        try:
            #print('processing... ',tid, txt)
            ttsUrl = genTTSUrl(lan ,txt, spd)
            c = getpage(ttsUrl)
            #?master?????
            #print('processing...finished',tid)
            self.results[tid]=c
        except urllib.error.URLError as e:
            print("error:URLError ",e," we will try again...tid:",tid)
            self.TTSBaidu(tid, txt, lan, spd)
        except socket.timeout:
            print("error: TTSBaidu time out!, we will try again...tid:",tid )
            self.TTSBaidu(tid, txt, lan, spd)
        finally:
            pass
get_images.py 文件源码 项目:average-pixels 作者: liviu- 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def save_images(term, count):
    api_key = get_api_key()
    images = search_images(term, count, api_key)
    filenames = []

    if not os.path.exists(SAVE_DIR):
        os.makedirs(SAVE_DIR)

    for i, img in enumerate(images):
        if img['encodingFormat'] == 'unknown':
            continue
        name = "{path}/{filename}.{ext}".format(
            path=SAVE_DIR,
            filename="_".join(term.split()) + str(i),
            ext=img['encodingFormat'])
        try:
            download_image(img['thumbnailUrl'], name)
            filenames.append(name)
        except urllib.error.HTTPError:
            pass

    return filenames
core.py 文件源码 项目:vulssimulator_ds 作者: kn0630 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def log(self, message='', err=None, level='info'):
        """
        Log a message
        """
        if not level.lower() in [
            'critical',
            'debug',
            'error',
            'fatal',
            'info',
            'warning'
        ]:
            level = 'info'

        if err:
            level = 'error'
            message += ' Threw exception:\n\t{}'.format(err)

        try:
            func = getattr(self.logger, level.lower())
            func(message)
        except Exception as log_err:
            self.logger.critical(
                "Could not write to log. Threw exception:\n\t{}".format(log_err))
elFinder.py 文件源码 项目:jumpscale_portal 作者: jumpscale7 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __mkfile(self):
        """Create new file"""
        name = current = None
        curDir = newFile = None
        if 'name' in self._request and 'current' in self._request:
            name = self._request['name']
            current = self._request['current']
            curDir = self.__findDir(current, None)
            newFile = os.path.join(curDir, name)

        if not curDir or not name:
            self._response['error'] = 'Invalid parameters'
        elif not self.__isAllowed(curDir, 'write'):
            self._response['error'] = 'Access denied'
        elif not self.__checkName(name):
            self._response['error'] = 'Invalid name'
        elif os.path.exists(newFile):
            self._response['error'] = 'File or folder with the same name already exists'
        else:
            try:
                open(newFile, 'w').close()
                self._response['select'] = [self.__hash(newFile)]
                self.__content(curDir, False)
            except:
                self._response['error'] = 'Unable to create file'
elFinder.py 文件源码 项目:jumpscale_portal 作者: jumpscale7 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __rm(self):
        """Delete files and directories"""
        current = rmList = None
        curDir = rmFile = None
        if 'current' in self._request and 'targets[]' in self._request:
            current = self._request['current']
            rmList = self._request['targets[]']
            curDir = self.__findDir(current, None)

        if not rmList or not curDir:
            self._response['error'] = 'Invalid parameters'
            return False

        if not isinstance(rmList, list):
            rmList = [rmList]

        for rm in rmList:
            rmFile = self.__find(rm, curDir)
            if not rmFile: continue
            self.__remove(rmFile)
        # TODO if errorData not empty return error
        self.__content(curDir, True)
elFinder.py 文件源码 项目:jumpscale_portal 作者: jumpscale7 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def __duplicate(self):
        """Create copy of files/directories"""
        if 'current' in self._request and 'target' in self._request:
            curDir = self.__findDir(self._request['current'], None)
            target = self.__find(self._request['target'], curDir)
            if not curDir or not target:
                self._response['error'] = 'Invalid parameters'
                return
            if not self.__isAllowed(target, 'read') or not self.__isAllowed(curDir, 'write'):
                self._response['error'] = 'Access denied'
            newName = self.__uniqueName(target)
            if not self.__copy(target, newName):
                self._response['error'] = 'Unable to create file copy'
                return

        self.__content(curDir, True)
        return
elFinder.py 文件源码 项目:jumpscale_portal 作者: jumpscale7 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __edit(self):
        """Save content in file"""
        error = ''
        if 'current' in self._request and 'target' in self._request and 'content' in self._request:
            curDir = self.__findDir(self._request['current'], None)
            curFile = self.__find(self._request['target'], curDir)
            error = curFile
            if curFile and curDir:
                if self.__isAllowed(curFile, 'write'):
                    try:
                        f = open(curFile, 'w+')
                        f.write(self._request['content'])
                        f.close()
                        self._response['target'] = self.__info(curFile)
                    except:
                        self._response['error'] = 'Unable to write to file'
                else:
                    self._response['error'] = 'Access denied'
            return

        self._response['error'] = 'Invalid parameters'
        return
xkcd_helpers.py 文件源码 项目:randi 作者: nhatzHK 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def removeNoise (s):
    import re
    pattern_list = ["\[\[(.*?)\]\]", "{{(.*?)}}", "\[(.*?)\]"]
    clean = s

    for pattern in pattern_list:
        regex = re.compile(pattern)
        clean = re.sub (regex, "", clean)

    return clean

#==============================================================================#
#==============================================================================#

# Get the html for a comic from the explainxkcd website
# Extract the transcript from the text
# Check if the transcript is mark as incomplete
#   if yes, mark it as so locally (for later updates)
# Returns a dictionary (result)
# If an error occured the status is not 0 and the error is passed in the error
# field of the returned dictionary
xkcd_helpers.py 文件源码 项目:randi 作者: nhatzHK 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_xkcd(number = 0):
    if number is 0:
        url ='https://xkcd.com/info.0.json'
    else:
        url = 'https://xkcd.com/{}/info.0.json'.format (number)

    response = {'status': 0, 'error': '', 'comic': ""}

    try:
        online_comic = urlopen(url).read ()
        response['comic'] = json.loads (online_comic.decode('utf-8'))
    except urllib.error.HTTPError:
        response['status'] = -1
    except IOError:
        response['status'] = -2
    except:
        response['status'] = -3

    return response

#==============================================================================##==============================================================================#
amazon_crawler.py 文件源码 项目:amadown2py 作者: aesuli 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def download_page(url, referer, maxretries, timeout, pause):
    tries = 0
    htmlpage = None
    while tries < maxretries and htmlpage is None:
        try:
            code = 404
            req = request.Request(url)
            req.add_header('Referer', referer)
            req.add_header('User-agent',
                           'Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.30 (KHTML, like Gecko) Ubuntu/11.04 Chromium/12.0.742.91 Chrome/12.0.742.91 Safari/534.30')
            with closing(request.urlopen(req, timeout=timeout)) as f:
                code = f.getcode()
                htmlpage = f.read()
                sleep(pause)
        except (urlerror.URLError, socket.timeout, socket.error):
            tries += 1
    if htmlpage:
        return htmlpage.decode('utf-8'), code
    else:
        return None, code
test_urllib.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_short_content_raises_ContentTooShortError(self):
        self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1

FF
''')

        def _reporthook(par1, par2, par3):
            pass

        with self.assertRaises(urllib.error.ContentTooShortError):
            try:
                urllib.request.urlretrieve('http://example.com/',
                                           reporthook=_reporthook)
            finally:
                self.unfakehttp()
test_urllib2.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def handle(self, fn_name, action, *args, **kwds):
        self.parent.calls.append((self, fn_name, args, kwds))
        if action is None:
            return None
        elif action == "return self":
            return self
        elif action == "return response":
            res = MockResponse(200, "OK", {}, "")
            return res
        elif action == "return request":
            return Request("http://blah/")
        elif action.startswith("error"):
            code = action[action.rfind(" ")+1:]
            try:
                code = int(code)
            except ValueError:
                pass
            res = MockResponse(200, "OK", {}, "")
            return self.parent.error("http", args[0], res, code, "", {})
        elif action == "raise":
            raise urllib.error.URLError("blah")
        assert False
test_urllib2.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_badly_named_methods(self):
        # test work-around for three methods that accidentally follow the
        # naming conventions for handler methods
        # (*_open() / *_request() / *_response())

        # These used to call the accidentally-named methods, causing a
        # TypeError in real code; here, returning self from these mock
        # methods would either cause no exception, or AttributeError.

        from urllib.error import URLError

        o = OpenerDirector()
        meth_spec = [
            [("do_open", "return self"), ("proxy_open", "return self")],
            [("redirect_request", "return self")],
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)
        o.add_handler(urllib.request.UnknownHandler())
        for scheme in "do", "proxy", "redirect":
            self.assertRaises(URLError, o.open, scheme+"://example.com/")
test_urllib2.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_raise(self):
        # raising URLError stops processing of request
        o = OpenerDirector()
        meth_spec = [
            [("http_open", "raise")],
            [("http_open", "return self")],
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://example.com/")
        self.assertRaises(urllib.error.URLError, o.open, req)
        self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])

##     def test_error(self):
##         # XXX this doesn't actually seem to be used in standard library,
##         #  but should really be tested anyway...
test_urllib2.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_errors(self):
        h = urllib.request.HTTPErrorProcessor()
        o = h.parent = MockOpener()

        url = "http://example.com/"
        req = Request(url)
        # all 2xx are passed through
        r = MockResponse(200, "OK", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(202, "Accepted", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(206, "Partial content", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        # anything else calls o.error (and MockOpener returns None, here)
        r = MockResponse(502, "Bad gateway", {}, "", url)
        self.assertIsNone(h.http_response(req, r))
        self.assertEqual(o.proto, "http")  # o.error called
        self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
test_urllib2.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_invalid_redirect(self):
        from_url = "http://example.com/a.html"
        valid_schemes = ['http','https','ftp']
        invalid_schemes = ['file','imap','ldap']
        schemeless_url = "example.com/b.html"
        h = urllib.request.HTTPRedirectHandler()
        o = h.parent = MockOpener()
        req = Request(from_url)
        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT

        for scheme in invalid_schemes:
            invalid_url = scheme + '://' + schemeless_url
            self.assertRaises(urllib.error.HTTPError, h.http_error_302,
                    req, MockFile(), 302, "Security Loophole",
                    MockHeaders({"location": invalid_url}))

        for scheme in valid_schemes:
            valid_url = scheme + '://' + schemeless_url
            h.http_error_302(req, MockFile(), 302, "That's fine",
                MockHeaders({"location": valid_url}))
            self.assertEqual(o.req.get_full_url(), valid_url)
views.py 文件源码 项目:ecs 作者: ecs-org 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def batch_sign(request):
    tasks = request.sign_session['tasks']
    if not tasks:
        return redirect('ecs.dashboard.views.view_dashboard')

    task = _get_tasks(request.user).get(pk=tasks[0])
    data = request.sign_session['data_func'](request, task)
    data['sign_session_id'] = request.sign_session.id
    sign_data = _store_sign_data(data)

    if request.user.email.startswith('signing_fail'):
        return sign_error(request, pdf_id=sign_data.id, error='forced failure', cause='requested force_fail, so we failed')

    return render(request, 'signature/batch.html', {
        'sign_url': get_pdfas_url(request, sign_data),
        'pdf_id': sign_data.id,
    })
utils.py 文件源码 项目:ecs 作者: ecs-org 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_pdfas_url(request, sign_data):
    values = {
        'connector': 'onlinebku',
        'invoke-app-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_receive', kwargs={'pdf_id': sign_data.id})),
        'invoke-app-url-target': '_top',
        'invoke-app-error-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_error', kwargs={'pdf_id': sign_data.id})),
        'locale': 'DE',
        'num-bytes': str(len(sign_data['pdf_data'])),
        'sig_type': 'SIGNATURBLOCK_DE',
        'pdf-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_send', kwargs={'pdf_id': sign_data.id})),

        'verify-level': 'intOnly', # Dies bedeutet, dass eine Signaturprüfung durchgeführt wird, allerdings ohne Zertifikatsprüfung.
        'filename': sign_data['document_filename'],

        #'preview': 'false',
        #'mode': 'binary',
        #'inline': 'false',
        #'pdf-id': sign_data.id,
    }
    data = urllib.parse.urlencode({k: v.encode('utf-8') for k, v in values.items()})
    return '{0}Sign?{1}'.format(settings.PDFAS_SERVICE, data)
general_commands.py 文件源码 项目:Basic-discord-bot 作者: TheWizoid 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def stream_live_check(stream):

    url = "https://api.twitch.tv/kraken/streams/{}".format(stream.lower())

    try:
        contents = json.loads(urllib.request.urlopen(url).read().decode("utf-8"))
        if contents["stream"] == None:
            status = "offline"
            bot_message = "{} is offline.".format(stream)
        else:
            #print(contents)
            name = contents["stream"]["channel"]["name"]
            title = contents["stream"]["channel"]["status"]
            game = contents["stream"]["channel"]["game"]
            viewers = contents["stream"]["viewers"]
            bot_message = "{0} is online.\n{0}'s title is: {1} \n{0} is playing {2} \nThere are {3} viewers \n".format(name,title,game,viewers)

    except urllib.error.URLError as e:
        if e.reason == "Not found" or e.reason == "Unprocessable Entity":
            bot_message = "That stream doesn't exist."
        else:
            bot_message = "There was an error proccessing your request."

    return bot_message
test_urllib.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_short_content_raises_ContentTooShortError(self):
        self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1

FF
''')

        def _reporthook(par1, par2, par3):
            pass

        with self.assertRaises(urllib.error.ContentTooShortError):
            try:
                urllib.request.urlretrieve('http://example.com/',
                                           reporthook=_reporthook)
            finally:
                self.unfakehttp()
test_urllib2.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def handle(self, fn_name, action, *args, **kwds):
        self.parent.calls.append((self, fn_name, args, kwds))
        if action is None:
            return None
        elif action == "return self":
            return self
        elif action == "return response":
            res = MockResponse(200, "OK", {}, "")
            return res
        elif action == "return request":
            return Request("http://blah/")
        elif action.startswith("error"):
            code = action[action.rfind(" ")+1:]
            try:
                code = int(code)
            except ValueError:
                pass
            res = MockResponse(200, "OK", {}, "")
            return self.parent.error("http", args[0], res, code, "", {})
        elif action == "raise":
            raise urllib.error.URLError("blah")
        assert False
test_urllib2.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_badly_named_methods(self):
        # test work-around for three methods that accidentally follow the
        # naming conventions for handler methods
        # (*_open() / *_request() / *_response())

        # These used to call the accidentally-named methods, causing a
        # TypeError in real code; here, returning self from these mock
        # methods would either cause no exception, or AttributeError.

        from urllib.error import URLError

        o = OpenerDirector()
        meth_spec = [
            [("do_open", "return self"), ("proxy_open", "return self")],
            [("redirect_request", "return self")],
            ]
        add_ordered_mock_handlers(o, meth_spec)
        o.add_handler(urllib.request.UnknownHandler())
        for scheme in "do", "proxy", "redirect":
            self.assertRaises(URLError, o.open, scheme+"://example.com/")
test_urllib2.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_errors(self):
        h = urllib.request.HTTPErrorProcessor()
        o = h.parent = MockOpener()

        url = "http://example.com/"
        req = Request(url)
        # all 2xx are passed through
        r = MockResponse(200, "OK", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(202, "Accepted", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(206, "Partial content", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        # anything else calls o.error (and MockOpener returns None, here)
        r = MockResponse(502, "Bad gateway", {}, "", url)
        self.assertIsNone(h.http_response(req, r))
        self.assertEqual(o.proto, "http")  # o.error called
        self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
test_urllib2.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_invalid_redirect(self):
        from_url = "http://example.com/a.html"
        valid_schemes = ['http','https','ftp']
        invalid_schemes = ['file','imap','ldap']
        schemeless_url = "example.com/b.html"
        h = urllib.request.HTTPRedirectHandler()
        o = h.parent = MockOpener()
        req = Request(from_url)
        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT

        for scheme in invalid_schemes:
            invalid_url = scheme + '://' + schemeless_url
            self.assertRaises(urllib.error.HTTPError, h.http_error_302,
                    req, MockFile(), 302, "Security Loophole",
                    MockHeaders({"location": invalid_url}))

        for scheme in valid_schemes:
            valid_url = scheme + '://' + schemeless_url
            h.http_error_302(req, MockFile(), 302, "That's fine",
                MockHeaders({"location": valid_url}))
            self.assertEqual(o.req.get_full_url(), valid_url)


问题


面经


文章

微信
公众号

扫码关注公众号