python类urlopen()的实例源码

setup.py 文件源码 项目:macholib 作者: secmobi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_pypi_src_download(package):
        url = 'https://pypi.python.org/pypi/%s/json'%(package,)
        fp = urllib.urlopen(url)
        try:
            try:
                data = fp.read()

            finally:
                fp.close()
        except urllib.error:
            raise RuntimeError("Cannot determine download link for %s"%(package,))

        pkgdata = json.loads(data.decode('utf-8'))
        if 'urls' not in pkgdata:
            raise RuntimeError("Cannot determine download link for %s"%(package,))

        for info in pkgdata['urls']:
            if info['packagetype'] == 'sdist' and info['url'].endswith('tar.gz'):
                return (info.get('md5_digest'), info['url'])

        raise RuntimeError("Cannot determine downlink link for %s"%(package,))
tbtools.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def paste(self):
        """Create a paste and return the paste id."""
        data = json.dumps({
            'description': 'Werkzeug Internal Server Error',
            'public': False,
            'files': {
                'traceback.txt': {
                    'content': self.plaintext
                }
            }
        }).encode('utf-8')
        try:
            from urllib2 import urlopen
        except ImportError:
            from urllib.request import urlopen
        rv = urlopen('https://api.github.com/gists', data=data)
        resp = json.loads(rv.read().decode('utf-8'))
        rv.close()
        return {
            'url': resp['html_url'],
            'id': resp['id']
        }
reachability-monitor.py 文件源码 项目:securedrop-reachability-monitor 作者: freedomofpress 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def read_directory(self, directory_url):
        """Parses the SecureDrop directory into a dictionary of instance
        details."""
        # CloudFlare will block us if we don't set user-agent
        dir_req = Request(directory_url)
        dir_req.add_header("User-Agent", 
                           "Mozilla/5.0 (Windows NT 6.1; rv:45.0) "
                           "Gecko/20100101 Firefox/45.0")
        directory = urlopen(dir_req).read().decode()

        instances = []
        for line in directory.splitlines()[1:-1]:
            fields = line.split("\t")
            instances.append(dict(organization=fields[0],
                                  landing_page=fields[1],
                                  ths_address=fields[2]))

        return instances
upload.py 文件源码 项目:instagram_private_api 作者: ping 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_post_video(self):
        # Reposting https://streamable.com/deltx
        video_info_res = urlopen('https://api.streamable.com/videos/deltx')
        video_info = json.loads(video_info_res.read().decode('utf8'))
        mp4_info = video_info['files']['mp4']

        video_url = ('https:' if mp4_info['url'].startswith('//') else '') + mp4_info['url']
        video_size = (mp4_info['width'], mp4_info['height'])
        thumbnail_url = ('https:' if video_info['thumbnail_url'].startswith('//') else '') + video_info['thumbnail_url']
        duration = mp4_info['duration']

        video_res = urlopen(video_url)
        video_data = video_res.read()
        thumb_res = urlopen(thumbnail_url)
        thumb_data = thumb_res.read()
        results = self.api.post_video(video_data, video_size, duration, thumb_data, caption='<3')
        self.assertEqual(results.get('status'), 'ok')
        self.assertIsNotNone(results.get('media'))
upload.py 文件源码 项目:instagram_private_api 作者: ping 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_post_video_story(self):
        # Reposting https://streamable.com/08ico
        video_info_res = urlopen('https://api.streamable.com/videos/08ico')
        video_info = json.loads(video_info_res.read().decode('utf8'))
        mp4_info = video_info['files']['mp4']

        video_url = ('https:' if mp4_info['url'].startswith('//') else '') + mp4_info['url']
        video_size = (mp4_info['width'], mp4_info['height'])
        thumbnail_url = ('https:' if video_info['thumbnail_url'].startswith('//') else '') + video_info['thumbnail_url']
        duration = mp4_info['duration']

        video_res = urlopen(video_url)
        video_data = video_res.read()
        thumb_res = urlopen(thumbnail_url)
        thumb_data = thumb_res.read()
        results = self.api.post_video_story(video_data, video_size, duration, thumb_data)
        self.assertEqual(results.get('status'), 'ok')
        self.assertIsNotNone(results.get('media'))
checkpoint.py 文件源码 项目:instagram_private_api 作者: ping 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def respond_to_checkpoint(self, response_code):
        headers = {
            'User-Agent': self.USER_AGENT,
            'Origin': 'https://i.instagram.com',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US',
            'Accept-Encoding': 'gzip',
            'Referer': self.endpoint,
            'Cookie': self.cookie,
        }

        req = Request(self.endpoint, headers=headers)
        data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
        res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)

        if res.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(res.read())
            content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
        else:
            content = res.read().decode('utf-8')

        return res.code, content
RpiState.py 文件源码 项目:Rpi-envMonitor 作者: conanwhf 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def public_ip(self):
        ip_regex = re.compile("(([0-9]{1,3}\.){3}[0-9]{1,3})")
        # List of host which return the public IP address:
        hosts = """http://www.lawrencegoetz.com/programs/ipinfo/
            http://mwburden.com/cgi-bin/getipaddr
            http://checkip.eurodyndns.org/
            http://checkip.dyndns.org/
            http://checkrealip.com/
            http://adresseip.com
            http://www.ipchicken.com/
            http://checkmyip.com/
            http://www.naumann-net.org/""".split("\n")
        for i in hosts:
            host = i.strip()
            #print(host)
            try:
                response = request.urlopen(host).read()
                result = ip_regex.findall(response.decode('utf-8'))
                if result: 
                    return result[0][0]
            except:
                pass
        return "UNKNOWN"
woxikon_de_lookup.py 文件源码 项目:thesaurus_query.vim 作者: Ron89 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _woxikon_de_url_handler(target):
    '''
    Query woxikon for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(unescape(decode_utf_8(response.read())))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # other error
    except socket.timeout:  # timeout error failed to be captured by URLError
        return 1
    return web_content
jeck_ru_lookup.py 文件源码 项目:thesaurus_query.vim 作者: Ron89 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _jeck_ru_url_handler(target):
    '''
    Query jiport for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(decode_utf_8(response.read()))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # any other error
    except socket.timeout:  # if timeout error not captured by URLError
        return 1
    return web_content
getlinks_4.py 文件源码 项目:python 作者: goodstuden 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getLinks(pageUrl):
    global pages
    html=urlopen("http://en.wikipedia.org"+pageUrl)
    bs=BeautifulSoup(html,"html.parser")
    try:
        print(bs.h1.get_text())
        print(bs.find(id="mw-content-text").findAll("p")[0])
        print(bs.find(id="ca-edit").find("span").find("a").attrs['href'])
    except AttributeError:
        print("????????")
    for link in bs.findAll("a",href=re.compile("^(/wiki/)")):
        if 'href' in link.attrs:
            if link.attrs['href'] not in pages:
                newpage=link.attrs["href"]
                print("---------\n"+newpage)
                pages.add(newpage)
                getLinks(newpage)
magicmirrorplatform.py 文件源码 项目:AlexaPi 作者: alexa-pi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def mm_heartbeat(self):
        # Check if stop or set next timer
        if self.shutdown:
            return
        threading.Timer(self.hb_timer, self.mm_heartbeat).start()

        address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSHB")

        logger.debug("Sending MM Heatbeat")

        try:
            response = urlopen(address).read()
        except URLError as err:
            logger.error("URLError: %s", err.reason)
            return

        logger.debug("Response: " + response)
views.py 文件源码 项目:BioQueue 作者: liyao001 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def query_usage(request):
    """
    This function should only be called when the user is using IE8 or IE9
    :param request: 
    :return: 
    """
    try:
        from urllib2 import urlopen
    except ImportError:
        from urllib.request import urlopen

    api_bus = get_config('program', 'api', 1)+'/Kb/findSoftwareUsage?software='+request.POST['software']
    try:
        res_data = urlopen(api_bus)
        res = res_data.read()
        return HttpResponse(res)
    except Exception as e:
        return error(api_bus)
flightdata.py 文件源码 项目:AboveTustin 作者: kevinabrandon 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def refresh(self):
        try:
            #open the data url
            self.req = urlopen(self.data_url)

            #read data from the url
            self.raw_data = self.req.read()

            #load in the json
            self.json_data = json.loads(self.raw_data.decode())

            #get time from json
            self.time = datetime.fromtimestamp(self.parser.time(self.json_data))

            #load all the aircarft
            self.aircraft = self.parser.aircraft_data(self.json_data, self.time)

        except Exception:
            print("exception in FlightData.refresh():")
            traceback.print_exc()
tbtools.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def paste(self):
        """Create a paste and return the paste id."""
        data = json.dumps({
            'description': 'Werkzeug Internal Server Error',
            'public': False,
            'files': {
                'traceback.txt': {
                    'content': self.plaintext
                }
            }
        }).encode('utf-8')
        try:
            from urllib2 import urlopen
        except ImportError:
            from urllib.request import urlopen
        rv = urlopen('https://api.github.com/gists', data=data)
        resp = json.loads(rv.read().decode('utf-8'))
        rv.close()
        return {
            'url': resp['html_url'],
            'id': resp['id']
        }
iati_downloader.py 文件源码 项目:geocoder-ie 作者: devgateway 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def download(dest_path, url):
    try:
        file_name = url.split('/')[-1]
        path = os.path.realpath(os.path.join(dest_path, unquote_plus(file_name)))
        if not os.path.exists(path):
            f = urlopen(url)
            headers = f.headers['content-type'].split('/')
            md = 'w'
            if 'html' in headers:
                file_name = '{}.html'.format(uuid.uuid1())
            else:
                md = 'wb'
                with open(path, md) as local_file:
                    local_file.write(f.read())

        if os.path.exists(path):
            return path
        else:
            logger.info("Wasn't able to find the file....!")
            return None
    except Exception as error:
        logger.error('download error %s', error)
geonames.py 文件源码 项目:geocoder-ie 作者: devgateway 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def query(location, cty_codes, query_method, fuzzy):
    results = []
    try:
        base_url = get_geonames_base_url()
        username = get_geonames_user_name()
        query_string = base_url + 'username={user}&{query_method}={name}&' \
                                  'style=FULL&orderby={order}&startRow=0&maxRows=5&fuzzy={fuzzy}' \
            .format(user=username, query_method=query_method, name=quote(location), order='relevance', fuzzy=fuzzy)
        if cty_codes and len(cty_codes) > 0:
            query_string = query_string + '&' + '&'.join([('country={}'.format(c)) for c in cty_codes])

        json_decode = json.JSONDecoder()  # used to parse json response
        response = urlopen(query_string)
        response_string = response.read().decode('utf-8')
        parsed_response = json_decode.decode(response_string)
        if parsed_response.get('geonames') and len(parsed_response.get('geonames')) > 0:
            for item in parsed_response['geonames']:
                results.append(parse(item))

    except URLError as e:
        logger.info("Oops!  something didn't go well")
        logger.info(e)

    return results
urllib-learn1.py 文件源码 项目:learn-python 作者: xushubo 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def fetch_xml(url):
    with request.urlopen(url) as f:
        print('Status:', f.status, f.reason)
        for k, v in f.getheaders():
            print('%s: %s' % (k, v))
        html = f.read().decode('utf-8')
    pattern_one = re.compile(r'<yweather:location.*?city="(.*?)".*?country="(.*?)".*?region="(.*?)".*?/>', re.S)
    pattern_two = re.compile(r'<yweather:forecast.*?date="(.*?)".*?day="(.*?)".*?high="(.*?)".*?low="(.*?)".*?text="(.*?)".*?/>', re.S)
    location_info = re.findall(pattern_one, html)
    items = re.findall(pattern_two, html)
    weather = {}
    weather['city'] = location_info[0][0]
    weather['country'] = location_info[0][1]
    weather['region'] = location_info[0][2]
    for item in items:
        weather[item[1]] = {}
        weather[item[1]]['data'] = item[0]
        weather[item[1]]['high'] = item[2]
        weather[item[1]]['low'] = item[3]
        weather[item[1]]['text'] = item[4]
    return weather
download_images.py 文件源码 项目:cleverhans 作者: tensorflow 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def download_image(image_id, url, x1, y1, x2, y2, output_dir):
    """Downloads one image, crops it, resizes it and saves it locally."""
    output_filename = os.path.join(output_dir, image_id + '.png')
    if os.path.exists(output_filename):
        # Don't download image if it's already there
        return True
    try:
        # Download image
        url_file = urlopen(url)
        if url_file.getcode() != 200:
            return False
        image_buffer = url_file.read()
        # Crop, resize and save image
        image = Image.open(BytesIO(image_buffer)).convert('RGB')
        w = image.size[0]
        h = image.size[1]
        image = image.crop((int(x1 * w), int(y1 * h), int(x2 * w),
                            int(y2 * h)))
        image = image.resize((299, 299), resample=Image.ANTIALIAS)
        image.save(output_filename)
    except IOError:
        return False
    return True
GetFollow_Oracle_2.py 文件源码 项目:danmu-bilibili 作者: saberxxy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getSoup(start, stop):

    try:
        for number in range(start, stop+1):
            # http://space.bilibili.com/15989779/#!/
            url = 'http://space.bilibili.com/' + str(number) + '/#!/'
            response = request.urlopen(url)
            # print(response.getcode())
            html_cont = response.read()
            soup = BeautifulSoup(html_cont, 'lxml', from_encoding='utf-8')
            username = soup.find("h1").get_text().strip()[:-6]  # ?????
            uid = number  # number??uid
            get_gz_uid = GetFollowUid(number)
            gzsuid, gznumber = get_gz_uid.get_uids()  # ????id?????

            saveData(uid, username, gznumber, gzsuid)  # ?????
    except Exception:
        print("get page error")
        return getSoup(number+1, stop+1)


# ????
GetFans_Oracle_2.py 文件源码 项目:danmu-bilibili 作者: saberxxy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getSoup(start, stop):
    try:
        for number in range(start, stop + 1):
            # http://space.bilibili.com/15989779/#!/
            url = 'http://space.bilibili.com/'+str(number)+'/#!/'
            response = request.urlopen(url)
            # print(response.getcode())
            html_cont = response.read()
            soup = BeautifulSoup(html_cont, 'lxml', from_encoding='utf-8')
            username = soup.find("h1").get_text().strip()[:-6] # ?????
            uid = number  # number??uid
            get_fans_uid = GetFansUid(number)
            fansuid, fansnumber = get_fans_uid.get_uids()  # ????id?????

            saveData(uid, username, fansnumber, fansuid)  # ?????
    except Exception:
        print("get page error")
        return getSoup(number + 1, stop + 1)


# ????


问题


面经


文章

微信
公众号

扫码关注公众号