python类build_opener()的实例源码

login.py 文件源码 项目:weibo 作者: windskyer 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def save_cookie(self, text, cookie_file=CONF.cookie_file):
        cookie_jar2 = cookielib.LWPCookieJar()
        cookie_support2 = urllib2.HTTPCookieProcessor(cookie_jar2)
        opener2 = urllib2.build_opener(cookie_support2, urllib2.HTTPHandler)
        urllib2.install_opener(opener2)
        if six.PY3:
            text = text.decode('gbk')
        p = re.compile('location\.replace\(\'(.*?)\'\)')
        # ???httpfox??????????????
        # location.replace('http://weibo.com ?????????
        # ?????????????# ????login_url?? ??????re?????
        # p = re.compile('location\.replace\(\B'(.*?)'\B\)')
        # ??? ??????? re?????\'???????
        try:
            # Search login redirection URL
            login_url = p.search(text).group(1)
            data = urllib2.urlopen(login_url).read()
            # Verify login feedback, check whether result is TRUE
            patt_feedback = 'feedBackUrlCallBack\((.*)\)'
            p = re.compile(patt_feedback, re.MULTILINE)

            feedback = p.search(data).group(1)
            feedback_json = json.loads(feedback)
            if feedback_json['result']:
                cookie_jar2.save(cookie_file,
                                 ignore_discard=True,
                                 ignore_expires=True)
                return 1
            else:
                return 0
        except:
            return 0
Lweibo.py 文件源码 项目:weibo 作者: windskyer 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def login(self, username, pwd, cookie_file):
        """"
            Login with use name, password and cookies.
            (1) If cookie file exists then try to load cookies;
            (2) If no cookies found then do login
        """
        # If cookie file exists then try to load cookies
        if os.path.exists(cookie_file):
            try:
                cookie_jar = cookielib.LWPCookieJar(cookie_file)
                cookie_jar.load(ignore_discard=True, ignore_expires=True)
                loaded = 1
            except cookielib.LoadError:
                loaded = 0
                print('Loading cookies error')

            #install loaded cookies for urllib2
            if loaded:
                cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
                opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler)
                urllib2.install_opener(opener)
                print('Loading cookies success')
                return 1
            else:
                return self.do_login(username, pwd, cookie_file)

        else:  #If no cookies found
            return self.do_login(username, pwd, cookie_file)
connection.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def request(self, host, handler, request_body, verbose=0):
        """Send XMLRPC request"""
        uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
                                                  host=host, handler=handler)

        if self._passmgr:
            self._passmgr.add_password(None, uri, self._username,
                                       self._password)
        if self.verbose:
            _LOGGER.debug("FabricTransport: {0}".format(uri))

        opener = urllib2.build_opener(*self._handlers)

        headers = {
            'Content-Type': 'text/xml',
            'User-Agent': self.user_agent,
        }
        req = urllib2.Request(uri, request_body, headers=headers)

        try:
            return self.parse_response(opener.open(req))
        except (urllib2.URLError, urllib2.HTTPError) as exc:
            try:
                code = -1
                if exc.code == 400:
                    reason = 'Permission denied'
                    code = exc.code
                else:
                    reason = exc.reason
                msg = "{reason} ({code})".format(reason=reason, code=code)
            except AttributeError:
                if 'SSL' in str(exc):
                    msg = "SSL error"
                else:
                    msg = str(exc)
            raise InterfaceError("Connection with Fabric failed: " + msg)
        except BadStatusLine:
            raise InterfaceError("Connection with Fabric failed: check SSL")
base.py 文件源码 项目:habrahabr-api-python 作者: dotzero 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _request(self, url, method='GET', data=None):
        url = self._auth.endpoint + url
        headers = self._auth.headers

        if data is not None:
            data = urlencode(data)
            if method in ['GET', 'DELETE']:
                url = url + '?' + data
                data = None
            else:
                headers.update({'Content-Type': POST_CONTENT_TYPE})
                if sys.version_info > (3,):  # python3
                    data = data.encode('utf-8')

        log.debug(method + ' ' + url)
        log.debug(data)

        try:
            opener = build_opener(HTTPHandler)
            request = Request(url, data=data, headers=headers)
            request.get_method = lambda: method
            response = opener.open(request).read()
            data = self._parse_response(response)
        except HTTPError as e:
            log.error(e)
            data = self._parse_response(e.read())
            raise ApiHandlerError('Invalid server response', data)
        except ValueError as e:
            log.error(e)
            raise ApiHandlerError('Invalid server response')

        return data
weibo.py 文件源码 项目:Spider 作者: vincenth520 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def build_opener():
    cookie = http.cookiejar.CookieJar()
    cookie_processor = request.HTTPCookieProcessor(cookie)
    opener = request.build_opener(cookie_processor) 
    opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"),
    ("Referer", "https://passport.weibo.cn"),
    ("Origin", "https://passport.weibo.cn"),
    ("Host", "passport.weibo.cn")]
    request.install_opener(opener)


#??
v2ex.py 文件源码 项目:Spider 作者: vincenth520 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def build_opener():
    cookie = http.cookiejar.CookieJar()
    cookie_processor = request.HTTPCookieProcessor(cookie)
    opener = request.build_opener(cookie_processor) 
    opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:49.0) Gecko/20100101 Firefox/49.0"),
    ("Referer", "http://cn.v2ex.com/signin"),
    ("Origin", "http://cn.v2ex.com"),
    ("Host", "cn.v2ex.com")]
    request.install_opener(opener)
weixin.py 文件源码 项目:Spider 作者: vincenth520 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def build_opener():
    cookie = http.cookiejar.CookieJar()
    cookie_processor = request.HTTPCookieProcessor(cookie)
    opener = request.build_opener(cookie_processor) 
    opener.addheaders = [("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36"),
    ("Referer", "https://wx.qq.com/"),
    ("Origin", "https://wx.qq.com/"),
    ("Host", "wx.qq.com")]
    request.install_opener(opener)

#??uuid
zhihu.py 文件源码 项目:Spider 作者: vincenth520 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def build_opener():
    cookie = http.cookiejar.CookieJar()
    cookie_processor = request.HTTPCookieProcessor(cookie)
    opener = request.build_opener(cookie_processor) 
    opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"),
    ("Referer", "https://www.zhihu.com/"),
    ("Origin", "https://www.zhihu.com/"),
    ("Host", "www.zhihu.com")]
    request.install_opener(opener)
getsploit.py 文件源码 项目:getsploit 作者: vulnersCom 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getUrllibOpener():
    if pythonVersion > 3.0:
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE
        opener = urllib2.build_opener(urllib2.HTTPSHandler(context=ctx))
        opener.addheaders = [('Content-Type', 'application/json'),('User-Agent', 'vulners-getsploit-v%s' % __version__)]
    else:
        opener = urllib2.build_opener(urllib2.HTTPSHandler())
        opener.addheaders = [('Content-Type', 'application/json'), ('User-Agent', 'vulners-getsploit-v%s' % __version__)]
    return opener
pipstrap.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def hashed_download(url, temp, digest):
    """Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``,
    and return its path."""
    # Based on pip 1.4.1's URLOpener but with cert verification removed. Python
    # >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert
    # authenticity has only privacy (not arbitrary code execution)
    # implications, since we're checking hashes.
    def opener():
        opener = build_opener(HTTPSHandler())
        # Strip out HTTPHandler to prevent MITM spoof:
        for handler in opener.handlers:
            if isinstance(handler, HTTPHandler):
                opener.handlers.remove(handler)
        return opener

    def read_chunks(response, chunk_size):
        while True:
            chunk = response.read(chunk_size)
            if not chunk:
                break
            yield chunk

    response = opener().open(url)
    path = join(temp, urlparse(url).path.split('/')[-1])
    actual_hash = sha256()
    with open(path, 'wb') as file:
        for chunk in read_chunks(response, 4096):
            file.write(chunk)
            actual_hash.update(chunk)

    actual_digest = actual_hash.hexdigest()
    if actual_digest != digest:
        raise HashError(url, path, actual_digest, digest)
    return path
cfnresponse.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send(event, context, response_status, reason=None, response_data=None, physical_resource_id=None):
    response_data = response_data or {}
    response_body = json.dumps(
        {
            'Status': response_status,
            'Reason': reason or "See the details in CloudWatch Log Stream: " + context.log_stream_name,
            'PhysicalResourceId': physical_resource_id or context.log_stream_name,
            'StackId': event['StackId'],
            'RequestId': event['RequestId'],
            'LogicalResourceId': event['LogicalResourceId'],
            'Data': response_data
        }
    )

    opener = build_opener(HTTPHandler)
    request = Request(event['ResponseURL'], data=response_body)
    request.add_header('Content-Type', '')
    request.add_header('Content-Length', len(response_body))
    request.get_method = lambda: 'PUT'
    try:
        response = opener.open(request)
        print("Status code: {}".format(response.getcode()))
        print("Status message: {}".format(response.msg))
        return True
    except HTTPError as exc:
        print("Failed executing HTTP request: {}".format(exc.code))
        return False
webwxapi.py 文件源码 项目:WxRobot 作者: sharpdeep 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self):
        self.DEBUG = False
        self.appid = 'wx782c26e4c19acffb'
        self.uuid = ''
        self.base_uri = ''
        self.redirect_uri = ''
        self.uin = ''
        self.sid = ''
        self.skey = ''
        self.pass_ticket = ''
        self.deviceId = 'e' + repr(random.random())[2:17]
        self.BaseRequest = {}
        self.synckey = ''
        self.SyncKey = []
        self.User = []
        self.MemberList = []
        self.ContactList = []
        self.GroupList = []
        self.autoReplyMode = False
        self.syncHost = ''

        self._handlers = dict((k, []) for k in self.message_types)
        self._handlers['location'] = []
        self._handlers['all'] = []

        self._filters = dict()

        opener = request.build_opener(request.HTTPCookieProcessor(CookieJar()))
        opener.addheaders = [('User-agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.109 Safari/537.36'),
                             ('Referer','https://wx2.qq.com/')]
        request.install_opener(opener)
transmission.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, secure: bool=True) -> None:
        transmissionrpc.HTTPHandler.__init__(self)
        self.http_opener = build_opener()
        self.auth: DataDict = None
        self.secure = secure
transmission.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def set_authentication(self, uri: str, login: str, password: str) -> None:
        if self.secure:
            context = ssl.create_default_context()
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
            context.verify_mode = ssl.CERT_NONE
            context.check_hostname = False
        self.http_opener = build_opener(HTTPSHandler(context=context))
        self.auth = {'Authorization': 'Basic %s' %
                     b64encode(str.encode(login +
                                          ":" +
                                          password)).decode('utf-8')}
Config.py 文件源码 项目:VIA4CVE 作者: cve-search 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getFile(cls, getfile, unpack=True):
    if cls.getProxy():
      proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()})
      auth = req.HTTPBasicAuthHandler()
      opener = req.build_opener(proxy, auth, req.HTTPHandler)
      req.install_opener(opener)
    try:
      response = req.urlopen(getfile)
    except:
      msg = "[!] Could not fetch file %s"%getfile
      if cls.exitWhenNoSource(): sys.exit(msg)
      else:                      print(msg)
      data = None
    data = response.read()
    # TODO: if data == text/plain; charset=utf-8, read and decode
    if unpack:
      if   'gzip' in response.info().get('Content-Type'):
        data = gzip.GzipFile(fileobj = BytesIO(data))
      elif 'bzip2' in response.info().get('Content-Type'):
        data = BytesIO(bz2.decompress(data))
      elif 'zip' in response.info().get('Content-Type'):
        fzip = zipfile.ZipFile(BytesIO(data), 'r')
        if len(fzip.namelist())>0:
          data=BytesIO(fzip.read(fzip.namelist()[0]))
      # In case the webserver is being generic
      elif 'application/octet-stream' in response.info().get('Content-Type'):
        if data[:4] == b'PK\x03\x04': # Zip
          fzip = zipfile.ZipFile(BytesIO(data), 'r')
          if len(fzip.namelist())>0:
            data=BytesIO(fzip.read(fzip.namelist()[0]))
    return (data, response)
connection.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def request(self, host, handler, request_body, verbose=0):
        """Send XMLRPC request"""
        uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
                                                  host=host, handler=handler)

        if self._passmgr:
            self._passmgr.add_password(None, uri, self._username,
                                       self._password)
        if self.verbose:
            _LOGGER.debug("FabricTransport: {0}".format(uri))

        opener = urllib2.build_opener(*self._handlers)

        headers = {
            'Content-Type': 'text/xml',
            'User-Agent': self.user_agent,
        }
        req = urllib2.Request(uri, request_body, headers=headers)

        try:
            return self.parse_response(opener.open(req))
        except (urllib2.URLError, urllib2.HTTPError) as exc:
            try:
                code = -1
                if exc.code == 400:
                    reason = 'Permission denied'
                    code = exc.code
                else:
                    reason = exc.reason
                msg = "{reason} ({code})".format(reason=reason, code=code)
            except AttributeError:
                if 'SSL' in str(exc):
                    msg = "SSL error"
                else:
                    msg = str(exc)
            raise InterfaceError("Connection with Fabric failed: " + msg)
        except BadStatusLine:
            raise InterfaceError("Connection with Fabric failed: check SSL")
test_httplib.py 文件源码 项目:dd-trace-py 作者: DataDog 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_urllib_request_opener(self):
        """
        When making a request via urllib.request.OpenerDirector
           we return the original response
           we capture a span for the request
        """
        opener = build_opener()
        with override_global_tracer(self.tracer):
            resp = opener.open(URL_200)

        self.assertEqual(self.to_str(resp.read()), '')
        self.assertEqual(resp.getcode(), 200)

        spans = self.tracer.writer.pop()
        self.assertEqual(len(spans), 1)
        span = spans[0]
        self.assertEqual(span.span_type, 'http')
        self.assertIsNone(span.service)
        self.assertEqual(span.name, self.SPAN_NAME)
        self.assertEqual(span.error, 0)
        self.assertEqual(span.get_tag('http.method'), 'GET')
        self.assertEqual(span.get_tag('http.status_code'), '200')
        self.assertEqual(span.get_tag('http.url'), URL_200)


# Additional Python2 test cases for urllib
transport.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
        if (timeout is not None) and not self.supports_feature('timeout'):
            raise RuntimeError('timeout is not supported with urllib2 transport')
        if proxy:
            raise RuntimeError('proxy is not supported with urllib2 transport')
        if cacert:
            raise RuntimeError('cacert is not support with urllib2 transport')

        self.request_opener = urllib2.urlopen
        if sessions:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar()))
            self.request_opener = opener.open

        self._timeout = timeout
spider_main.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_cookie_opener(self):
        '''
        ????Cookie
        :return: ????????opener
        '''
        cookie = cookiejar.CookieJar()
        cookie_process = request.HTTPCookieProcessor(cookie)
        opener = request.build_opener(cookie_process)
        return opener
transport.py 文件源码 项目:spc 作者: whbrewer 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
        if (timeout is not None) and not self.supports_feature('timeout'):
            raise RuntimeError('timeout is not supported with urllib2 transport')
        if proxy:
            raise RuntimeError('proxy is not supported with urllib2 transport')
        if cacert:
            raise RuntimeError('cacert is not support with urllib2 transport')

        self.request_opener = urllib2.urlopen
        if sessions:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar()))
            self.request_opener = opener.open

        self._timeout = timeout


问题


面经


文章

微信
公众号

扫码关注公众号