python类http()的实例源码

chou.py 文件源码 项目:Spider 作者: Ctrlsman 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def login(self,response):
        cookie_jar = CookieJar()
        cookie_jar.extract_cookies(response,response.request)
        for k,v in cookie_jar._cookies.items():
            for i,j in v.items():
                for m,n in j.items():
                    self.cookie_dict[m] = n.value
        req = Request(
            url='http://dig.chouti.com/login',
            method='POST',
            headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'},
            body='phone=13331167937&password=zds819918&oneMonth=1',
            cookies=self.cookie_dict,
            callback=self.check_login
        )
        yield req
postSpider.py 文件源码 项目:JianShu-Donate 作者: whatbeg 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parse(self, response):
        selector = Selector(response)
        articles = selector.xpath('//ul[@class="article-list thumbnails"]/li')

        for article in articles:
            item = Jianshu2Item()
            url = article.xpath('div/h4/a/@href').extract()
            likeNum = article.xpath('div/div/span[2]/text()').extract()
            posturl = 'http://www.jianshu.com'+url[0]

            if len(likeNum) == 0:
                item['likeNum'] = 0
            else:
                item['likeNum'] = int(likeNum[0].split(' ')[-1])

            request = Request(posturl,callback=self.parse_donate)
            request.meta['item'] = item
            yield request

        next_link = selector.xpath('//*[@id="list-container"]/div[@class="load-more"]/button/@data-url').extract()[0]
        if next_link:
            next_link = self.url + str(next_link)
            yield Request(next_link,callback=self.parse)
uuSpider.py 文件源码 项目:pythonStudy 作者: jeikerxiao 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def post_login(self, response):
        self.logger.info('---- login start ----')
        # ????????????????????? formhash ????, ????????
        formhash = response.xpath('//input[@name="formhash"]/@value').extract()[0]
        self.logger.info('formhash: ' + formhash)
        # FormRequeset.from_response?Scrapy???????, ??post??
        # ?????, ???after_login????
        return [scrapy.FormRequest.from_response(response,
                                          formdata={
                                              'formhash': formhash,
                                              'referer': 'http://www.mayattt.com/index.php',
                                              'loginfield': 'username',
                                              'username': 'mayajeiker',
                                              'password': 'friendship',
                                              'questionid': '0',
                                              'cookietime': '12592000',
                                          },
                                          callback=self.after_login
                                          )]

    # ????????????
uuSpider.py 文件源码 项目:pythonStudy 作者: jeikerxiao 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def parse(self, response):
        # ?request.content ??? Element
        items = response.xpath('//form[@name="moderate"]/*/div[@class="spaceborder"]/table/tr')
        for item in items:
            url_str = 'http://www.mayattt.com/'+item.xpath('./td[@class="f_title"]/a/@href').extract()[0]
            title_str = ''
            date_str = ''
            try:
                title_str = item.xpath('./td[@class="f_title"]/a/text()').extract()[0]
                date_str = item.xpath('./td[@class="f_last"]/span/a/text()').extract()[0]
            except:
                self.logger.error('get list page failure!')
                pass
            yield Request(url_str, headers=self.headers, callback=self.parseImage, meta={'title': title_str,
                                                                                             'date': date_str})

    # ??????? ??url , ??item?
pictureSpider_demo.py 文件源码 项目:PythonCrawler-Scrapy-Mysql-File-Template 作者: lawlite19 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def parse(self, response):
        se=Selector(response) #???????HtmlXPathSelector???
        if(re.match("http://desk.zol.com.cn/fengjing/\d+x\d+/\d+.html", response.url)):#??url??????????url????
            src=se.xpath("//ul[@class='pic-list2  clearfix']/li")#???ul?????li

            for i in range(len(src)):#??li??
                imgURLs=se.xpath("//ul[@class='pic-list2  clearfix']/li[%d]/a/img/@src"%i).extract() #??????????
                titles=se.xpath("//ul[@class='pic-list2  clearfix']/li[%d]/a/img/@title"%i).extract()

                if imgURLs:
                    realUrl=imgURLs[0].replace("t_s208x130c5","t_s2560x1600c5") #????????????????
                    file_name=u"%s.jpg"%titles[0] #????????

                    path=os.path.join("D:\pics",file_name)#??????????????F??pics????

                    type = sys.getfilesystemencoding()
                    print file_name.encode(type)  

                    item=WebcrawlerScrapyItem()  #??item??????item??,?????????????item???
                    item['name']=file_name 
                    item['url']=realUrl
                    print item["name"],item["url"]    

                    yield item  #??item,???????item

                    urllib.urlretrieve(realUrl,path)  #??????????????????????????????????????

            all_urls=se.xpath("//a/@href").extract()#???????url
            for url in all_urls:
                if url.startswith("/fengjing/1920x1080/"):#??????????????
                    yield Request("http://desk.zol.com.cn"+url,callback=self.parse)
reference_news_spider.py 文件源码 项目:Spider_cust_news 作者: sensen58588 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parse_detail(self, response):
        content = response.css('#work span::text').extract()
        reg = "^(http|https|ftp)://.*(.com|.cn|.html|.htm|.asp|.jsp)"
        url = response.url
        reg_url_name = ".*?(\d+)"
        get_url = re.match(reg_url_name, url)
        if get_url:
            self.get_name = get_url.group(1)
        reference_url_list = []
        for each_line in content:
            get_reference_url = re.match(reg, each_line)
            if get_reference_url:
                reference_url_list.append(get_reference_url.group(0))
        self.count = 0
        if reference_url_list:
            for each_url in reference_url_list:
                yield Request(url=each_url, dont_filter=True, callback=self.parse_reference)
                self.count += 1
        else:
            pass
__init__.py 文件源码 项目:osp-scraper 作者: opensyllabus 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def extract_links(self, response):
        """Generate (url, source_anchor) tuples extracted from the page"""

        for link in response.css('a'):
            # extract the href & urljoin it to the current response
            url = response.urljoin(link.xpath('@href').extract_first())

            # Only follow http(s) URLs (i.e., no `javascript:` or `mailto:`).
            if url.startswith('http'):
                # merge text content of all child nodes of the link
                anchor = " ".join(s.strip() for s in link.css('*::text').extract() if s.strip())

                yield (url, anchor)

        for frame in (response.css("frame") + response.css("iframe")):
            relative_url = frame.css("::attr(src)").extract_first()
            url = response.urljoin(relative_url)

            if url.startswith("http"):
                anchor = frame.css("::attr(name)").extract_first()

                yield (url, anchor)
163_spider.py 文件源码 项目:mongodb_project 作者: Lovecanon 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def post_get_playlist(self, response):
        collection = self.db.playlist
        result = json.loads(response.body, encoding='utf-8')['result']

        # inserted = collection.update({'id': result['id']}, result, upsert=True)  # upsert=True??insert or update
        # logger.info('Update or Insert to playlist database[%s]' % (str(inserted),))
        if result['id'] not in self.playlist_id_buffer:
            collection.insert(result)

        for song in result['tracks']:
            artists = []
            for detail in song['artists']:
                artists.append(detail['name'])
            comment_url = 'http://music.163.com/weapi/v1/resource/comments/%s/?csrf_token=' % (song['commentThreadId'],)
            # ??FormRequest???POST??????????????
            # Request(url, method='POST', body=json.dumps(data))
            yield FormRequest(comment_url, formdata=self.post_data, callback=self.parse,
                              meta={'m_id': song['id'], 'm_name': song['name'], 'artists': artists})
mogujie.py 文件源码 项目:first-crawler 作者: Xinghaoz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parse_list(self, response):
        url = response.meta['splash']['args']['url']
        pattern = re.compile(r'http://www.mogujie.com/book/\w+/\d+/')

        if (pattern.match(url)):
            page = int(pattern.split(url)[1])
            url = pattern.findall(url)[0]
            page += 1
            url = url + str(page)
        else:
            url = url + '/2'

        print '+++++++++++++++++++++++++ Next url:', url
        req = SplashRequest(url = url, callback = self.parse_list)
        yield req

        pattern_detail = re.compile(r'http://shop.mogujie.com/detail/.{7}')
        for item_url in pattern_detail.findall(response.body):
            req = Request(url = item_url, callback = self.parse_item)
            yield req
mogujie_mac.py 文件源码 项目:first-crawler 作者: Xinghaoz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def parse(self, response):
        #print '=========================', response.url
        pattern_list = re.compile(r'http://www.mogujie.com/book/\w+/\d+')
        #print '+++++++++++++++++++++++++', pattern_list.findall(response.body)

        '''
        for item_list in pattern_list.findall(response.body):
            req = Request(url = item_list, callback = self.parse_list)
            yield req
        '''

        '''
        req = Request(url = 'http://www.mogujie.com/book/clothing/50249/', callback = self.parse_list, meta={
                'splash': {
                    'endpoint': 'render.html'
                },
                #'dont_send_headers': True,
        })
        '''

        for item_list in pattern_list.findall(response.body):
            #req = SplashRequest(url = 'http://www.mogujie.com/book/clothing/50249/', callback = self.parse_list)
            req = SplashRequest(url = item_list, callback = self.parse_list)
            yield req
followall.py 文件源码 项目:Scrapy-BenchCLI 作者: Parth-Vader 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, **kw):
        super(FollowAllSpider, self).__init__(**kw)
        url = 'http://localhost/books.toscrape.com/index.html'
        if not url.startswith('http://') and not url.startswith('https://'):
            url = 'http://%s/' % url
        self.url = url
        self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)]
        self.link_extractor = LinkExtractor()
        self.cookies_seen = set()
        self.previtem = 0
        self.items = 0
        self.timesec = datetime.datetime.utcnow()
Tieba_search.py 文件源码 项目:Crawlers 作者: mi-minus 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def start_requests(self):

        #####################################################################################
        # topic_dict = {'1':[u'??', u'??'], '2':[u'??',u'??']}
        topic_dict = {'1':[u'??'], '2':[u'??'], '3':[u'????'], '4':[u'??']}

        index = 0
        for id, kws_list in topic_dict.iteritems():
            for kw in kws_list:
                print kw
                wd_code = urllib.quote(kw.encode('gbk'))
                search_url = 'http://tieba.baidu.com/f/search/res?isnew=1&kw=&qw='+wd_code+'&un=&rn=10&pn=0&sd=&ed=&sm=1&only_thread=1'
                                # http://tieba.baidu.com/f/search/res?isnew=1&kw=&qw=%B1%B1%BE%A9&un=&rn=10&pn=0&sd=&ed=&sm=1&only_thread=1
                                # http://tieba.baidu.com/f/search/res?isnew=1&kw=&qw=%B1%B1%BE%A9&un=&rn=10&pn=0&sd=&ed=&sm=1
                # print search_url
                self.Flag_List.append(True)
                self.Maxpage_List.append(self.MAX_PAGE_NUM)
                print search_url
                yield scrapy.Request(search_url,meta={'topic_id': id,'index':index, 'kw':kw},)
                index += 1

        #####################################################################################
imgspider.py 文件源码 项目:ScrapyImage 作者: donnki 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def parse(self, response):
        # print response.request.headers
        # print u'~~~~', ("pp3288" in response.body)
        # print u'~~~~', unicode(response.body, "utf8").encode("utf8")
        #????????????????parse_albumm????
        for box in response.xpath(self.config["xpathAlbumList"]):
            url = box.xpath(self.config["xpathAlbumURL"]).extract()[0]
            title = box.xpath(self.config["xpathAlbumTitle"]).extract()[0]
            if not self.config.has_key("specificAlbums") or url in self.config["specificAlbums"]:

                if not url.startswith("http") and self.config.has_key("baseAddress"):
                    url = self.config["baseAddress"] + url
                # print u'?????', title, url
                request = scrapy.Request(url, headers=self.headers, callback=self.parse_album, cookies={'title': title})
                yield request
                # break

        #TODO????????????????parse_album_list
        pass

    #?????????
chsi.py 文件源码 项目:gaokao 作者: EasyData 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_url(self, level, key):

        base_url = 'http://gaokao.chsi.com.cn/zyk/zybk/'

        if level == 0:
            page = 'ccCategory.action'
        elif level == 1:
            page = 'mlCategory.action'
        elif level == 2:
            page = 'xkCategory.action'
        elif level == 3:
            page = 'specialityesByCategory.action'
        else:
            raise Exception('invalid level')

        return '{}{}?key={}'.format(base_url, page, key)
qichacha_spider.py 文件源码 项目:qichacha 作者: guapier 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start_requests(self):
        # with open(getattr(self, "file", "company.csv"), "rU") as f:
        #     reader = csv.reader(f)
        #     for line in reader:
        #         request = Request('http://www.qichacha.com/search?key='+line[0].decode('gbk').encode('utf-8'),headers=self.headers)
        #         #request.meta['fields'] = line
        #         yield request
        with open(("company.csv"), "rU") as f:
            reader = csv.reader(f)
            for line in reader:
                request = Request('http://www.qichacha.com/search?key='+line[0],headers=self.headers)
                #request.meta['fields'] = line
                yield request

    # def start_requests(self):
    #     yield Request('http://www.qichacha.com/search?key=%E5%89%8D%E6%B5%B7%E4%BA%BA%E5%AF%BF%E4%BF%9D%E9%99%A9%E8%82%A1%E4%BB%BD%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8',headers=self.headers)
??????.py 文件源码 项目:User-Python-Write-a-web-crawler 作者: xiexiaoshinick 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def parse(self, response):
        item=AutopjtItem()
#?Xpath????????
        item["name"]=response.xpath("//a[@class='pic']/@title").extract()
        item["price"]=response.xpath("//span[@class='price_n']/text()").extract()
        item["link"]=response.xpath("//a[@class='pic']/@href").extract()
        item["comnum"]=response.xpath("//a[@name='P_pl']/text()").extract()
#???item
        yield item
#??????75?
        for i in range(1,76):
#???????
            url="http://category.dangdang.com/pg"+str(i)+"-cid4002203.html"
#?yieldRequest??????
#???
            yield Request(url, callback=self.parse)

#15.5 
#1
# Obey robots.txt rules
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_nosplash():
    mw = _get_mw()
    cookie_mw = _get_cookie_mw()
    req = scrapy.Request("http://example.com")
    old_meta = copy.deepcopy(req.meta)

    assert cookie_mw.process_request(req, None) is None
    assert mw.process_request(req, None) is None
    assert old_meta == req.meta

    # response is not changed
    response = Response("http://example.com", request=req)
    response2 = mw.process_response(req, response, None)
    response3 = cookie_mw.process_response(req, response, None)
    assert response2 is response
    assert response3 is response
    assert response3.url == "http://example.com"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_magic_response2():
    # check 'body' handling and another 'headers' format
    mw = _get_mw()
    req = SplashRequest('http://example.com/', magic_response=True,
                        headers={'foo': 'bar'}, dont_send_headers=True)
    req = mw.process_request(req, None)
    assert 'headers' not in req.meta['splash']['args']

    resp_data = {
        'body': base64.b64encode(b"binary data").decode('ascii'),
        'headers': {'Content-Type': 'text/plain'},
    }
    resp = TextResponse("http://mysplash.example.com/execute",
                        headers={b'Content-Type': b'application/json'},
                        body=json.dumps(resp_data).encode('utf8'))
    resp2 = mw.process_response(req, resp, None)
    assert resp2.data == resp_data
    assert resp2.body == b'binary data'
    assert resp2.headers == {b'Content-Type': [b'text/plain']}
    assert resp2.status == 200
    assert resp2.url == "http://example.com/"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_magic_response_http_error():
    mw = _get_mw()
    req = SplashRequest('http://example.com/foo')
    req = mw.process_request(req, None)

    resp_data = {
        "info": {
            "error": "http404",
            "message": "Lua error: [string \"function main(splash)\r...\"]:3: http404",
            "line_number": 3,
            "type": "LUA_ERROR",
            "source": "[string \"function main(splash)\r...\"]"
        },
        "description": "Error happened while executing Lua script",
        "error": 400,
        "type": "ScriptError"
    }
    resp = TextResponse("http://mysplash.example.com/execute",
                        headers={b'Content-Type': b'application/json'},
                        body=json.dumps(resp_data).encode('utf8'))
    resp = mw.process_response(req, resp, None)
    assert resp.data == resp_data
    assert resp.status == 404
    assert resp.url == "http://example.com/foo"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_slot_policy_per_domain():
    mw = _get_mw()
    meta = {'splash': {
        'slot_policy': scrapy_splash.SlotPolicy.PER_DOMAIN
    }}

    req1 = scrapy.Request("http://example.com/path?key=value", meta=meta)
    req1 = mw.process_request(req1, None)

    req2 = scrapy.Request("http://example.com/path2", meta=meta)
    req2 = mw.process_request(req2, None)

    req3 = scrapy.Request("http://fooexample.com/path?key=value", meta=meta)
    req3 = mw.process_request(req3, None)

    assert req1.meta.get('download_slot')
    assert req3.meta.get('download_slot')

    assert req1.meta['download_slot'] == req2.meta['download_slot']
    assert req1.meta['download_slot'] != req3.meta['download_slot']
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_adjust_timeout():
    mw = _get_mw()
    req1 = scrapy.Request("http://example.com", meta = {
        'splash': {'args': {'timeout': 60, 'html': 1}},

        # download_timeout is always present,
        # it is set by DownloadTimeoutMiddleware
        'download_timeout': 30,
    })
    req1 = mw.process_request(req1, None)
    assert req1.meta['download_timeout'] > 60

    req2 = scrapy.Request("http://example.com", meta = {
        'splash': {'args': {'html': 1}},
        'download_timeout': 30,
    })
    req2 = mw.process_request(req2, None)
    assert req2.meta['download_timeout'] == 30
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_nosplash():
    mw = _get_mw()
    cookie_mw = _get_cookie_mw()
    req = scrapy.Request("http://example.com")
    old_meta = copy.deepcopy(req.meta)

    assert cookie_mw.process_request(req, None) is None
    assert mw.process_request(req, None) is None
    assert old_meta == req.meta

    # response is not changed
    response = Response("http://example.com", request=req)
    response2 = mw.process_response(req, response, None)
    response3 = cookie_mw.process_response(req, response, None)
    assert response2 is response
    assert response3 is response
    assert response3.url == "http://example.com"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_magic_response2():
    # check 'body' handling and another 'headers' format
    mw = _get_mw()
    req = SplashRequest('http://example.com/', magic_response=True,
                        headers={'foo': 'bar'}, dont_send_headers=True)
    req = mw.process_request(req, None)
    assert 'headers' not in req.meta['splash']['args']

    resp_data = {
        'body': base64.b64encode(b"binary data").decode('ascii'),
        'headers': {'Content-Type': 'text/plain'},
    }
    resp = TextResponse("http://mysplash.example.com/execute",
                        headers={b'Content-Type': b'application/json'},
                        body=json.dumps(resp_data).encode('utf8'))
    resp2 = mw.process_response(req, resp, None)
    assert resp2.data == resp_data
    assert resp2.body == b'binary data'
    assert resp2.headers == {b'Content-Type': [b'text/plain']}
    assert resp2.status == 200
    assert resp2.url == "http://example.com/"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_magic_response_http_error():
    mw = _get_mw()
    req = SplashRequest('http://example.com/foo')
    req = mw.process_request(req, None)

    resp_data = {
        "info": {
            "error": "http404",
            "message": "Lua error: [string \"function main(splash)\r...\"]:3: http404",
            "line_number": 3,
            "type": "LUA_ERROR",
            "source": "[string \"function main(splash)\r...\"]"
        },
        "description": "Error happened while executing Lua script",
        "error": 400,
        "type": "ScriptError"
    }
    resp = TextResponse("http://mysplash.example.com/execute",
                        headers={b'Content-Type': b'application/json'},
                        body=json.dumps(resp_data).encode('utf8'))
    resp = mw.process_response(req, resp, None)
    assert resp.data == resp_data
    assert resp.status == 404
    assert resp.url == "http://example.com/foo"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_slot_policy_per_domain():
    mw = _get_mw()
    meta = {'splash': {
        'slot_policy': scrapy_splash.SlotPolicy.PER_DOMAIN
    }}

    req1 = scrapy.Request("http://example.com/path?key=value", meta=meta)
    req1 = mw.process_request(req1, None)

    req2 = scrapy.Request("http://example.com/path2", meta=meta)
    req2 = mw.process_request(req2, None)

    req3 = scrapy.Request("http://fooexample.com/path?key=value", meta=meta)
    req3 = mw.process_request(req3, None)

    assert req1.meta.get('download_slot')
    assert req3.meta.get('download_slot')

    assert req1.meta['download_slot'] == req2.meta['download_slot']
    assert req1.meta['download_slot'] != req3.meta['download_slot']
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_adjust_timeout():
    mw = _get_mw()
    req1 = scrapy.Request("http://example.com", meta = {
        'splash': {'args': {'timeout': 60, 'html': 1}},

        # download_timeout is always present,
        # it is set by DownloadTimeoutMiddleware
        'download_timeout': 30,
    })
    req1 = mw.process_request(req1, None)
    assert req1.meta['download_timeout'] > 60

    req2 = scrapy.Request("http://example.com", meta = {
        'splash': {'args': {'html': 1}},
        'download_timeout': 30,
    })
    req2 = mw.process_request(req2, None)
    assert req2.meta['download_timeout'] == 30
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_nosplash():
    mw = _get_mw()
    cookie_mw = _get_cookie_mw()
    req = scrapy.Request("http://example.com")
    old_meta = copy.deepcopy(req.meta)

    assert cookie_mw.process_request(req, None) is None
    assert mw.process_request(req, None) is None
    assert old_meta == req.meta

    # response is not changed
    response = Response("http://example.com", request=req)
    response2 = mw.process_response(req, response, None)
    response3 = cookie_mw.process_response(req, response, None)
    assert response2 is response
    assert response3 is response
    assert response3.url == "http://example.com"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_magic_response2():
    # check 'body' handling and another 'headers' format
    mw = _get_mw()
    req = SplashRequest('http://example.com/', magic_response=True,
                        headers={'foo': 'bar'}, dont_send_headers=True)
    req = mw.process_request(req, None)
    assert 'headers' not in req.meta['splash']['args']

    resp_data = {
        'body': base64.b64encode(b"binary data").decode('ascii'),
        'headers': {'Content-Type': 'text/plain'},
    }
    resp = TextResponse("http://mysplash.example.com/execute",
                        headers={b'Content-Type': b'application/json'},
                        body=json.dumps(resp_data).encode('utf8'))
    resp2 = mw.process_response(req, resp, None)
    assert resp2.data == resp_data
    assert resp2.body == b'binary data'
    assert resp2.headers == {b'Content-Type': [b'text/plain']}
    assert resp2.status == 200
    assert resp2.url == "http://example.com/"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_magic_response_http_error():
    mw = _get_mw()
    req = SplashRequest('http://example.com/foo')
    req = mw.process_request(req, None)

    resp_data = {
        "info": {
            "error": "http404",
            "message": "Lua error: [string \"function main(splash)\r...\"]:3: http404",
            "line_number": 3,
            "type": "LUA_ERROR",
            "source": "[string \"function main(splash)\r...\"]"
        },
        "description": "Error happened while executing Lua script",
        "error": 400,
        "type": "ScriptError"
    }
    resp = TextResponse("http://mysplash.example.com/execute",
                        headers={b'Content-Type': b'application/json'},
                        body=json.dumps(resp_data).encode('utf8'))
    resp = mw.process_response(req, resp, None)
    assert resp.data == resp_data
    assert resp.status == 404
    assert resp.url == "http://example.com/foo"
test_middleware.py 文件源码 项目:badoo_scrapy_splash_redis 作者: Supe2015 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_slot_policy_per_domain():
    mw = _get_mw()
    meta = {'splash': {
        'slot_policy': scrapy_splash.SlotPolicy.PER_DOMAIN
    }}

    req1 = scrapy.Request("http://example.com/path?key=value", meta=meta)
    req1 = mw.process_request(req1, None)

    req2 = scrapy.Request("http://example.com/path2", meta=meta)
    req2 = mw.process_request(req2, None)

    req3 = scrapy.Request("http://fooexample.com/path?key=value", meta=meta)
    req3 = mw.process_request(req3, None)

    assert req1.meta.get('download_slot')
    assert req3.meta.get('download_slot')

    assert req1.meta['download_slot'] == req2.meta['download_slot']
    assert req1.meta['download_slot'] != req3.meta['download_slot']


问题


面经


文章

微信
公众号

扫码关注公众号