python类urljoin()的实例源码

bjshare.py 文件源码 项目:BJ_Sickrage 作者: gabrielbdsantos 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def login(self):
        cookie_dict = requests.utils.dict_from_cookiejar(self.session.cookies)
        if cookie_dict.get('session'):
            return True

        login_params = {
            'submit': 'Login',
            'username': self.username,
            'password': self.password,
            'keeplogged': 0,
        }

        if not self.get_url(self.urls['login'], post_data=login_params, returns='text'):
            logger.log(u"Unable to connect to provider", logger.WARNING)
            return False

        response = self.get_url(urljoin(self.urls['base_url'],'index.php'), returns='text')

        if re.search('<title>Login :: BJ-Share</title>', response):
            logger.log(u"Invalid username or password. Check your settings", logger.WARNING)
            return False

        return True
base.py 文件源码 项目:python-twitch-client 作者: tsifrer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _request_get(self, path, params=None):
        url = urljoin(BASE_URL, path)

        headers = self._get_request_headers()

        response = requests.get(url, params=params, headers=headers)
        if response.status_code >= 500:

            backoff = self._initial_backoff
            for _ in range(self._max_retries):
                time.sleep(backoff)
                backoff_response = requests.get(url, params=params, headers=headers)
                if backoff_response.status_code < 500:
                    response = backoff_response
                    break
                backoff *= 2

        response.raise_for_status()
        return response.json()
upload.py 文件源码 项目:mail.ru-uploader 作者: instefa 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def cloud_auth(session, login=LOGIN, password=PASSWORD):
    try:
        r = session.post('https://auth.mail.ru/cgi-bin/auth?lang=ru_RU&from=authpopup',
                         data = {'Login': login, 'Password': password, 'page': urljoin(CLOUD_URL, '?from=promo'),
                                 'new_auth_form': 1, 'Domain': get_email_domain(login)}, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Cloud auth HTTP request error: {}'.format(e))
        return None

    if r.status_code == requests.codes.ok:
        if LOGIN_CHECK_STRING in r.text:
            return True
        elif LOGGER:
            LOGGER.error('Cloud authorization request error. Check your credentials settings in {}. \
Do not forget to accept cloud LA by entering it in browser. \
HTTP code: {}, msg: {}'.format(CONFIG_FILE, r.status_code, r.text))
    elif LOGGER:
        LOGGER.error('Cloud authorization request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
    return None
upload.py 文件源码 项目:mail.ru-uploader 作者: instefa 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_csrf(session):
    try:
        r = session.get(urljoin(CLOUD_URL, 'tokens/csrf'), verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Get csrf HTTP request error: {}'.format(e))
        return None

    if r.status_code == requests.codes.ok:
        r_json = r.json()
        token = r_json['body']['token']
        assert len(token) == 32, 'invalid CSRF token <{}> lentgh'.format(token)
        return token
    elif LOGGER:
        LOGGER.error('CSRF token request error. Check your connection and credentials settings in {}. \
HTTP code: {}, msg: {}'.format(CONFIG_FILE, r.status_code, r.text))
    return None
upload.py 文件源码 项目:mail.ru-uploader 作者: instefa 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_upload_domain(session, csrf=''):
    """ return current cloud's upload domain url
    it seems that csrf isn't necessary in session,
    but forcing assert anyway to avoid possible future damage
    """
    assert csrf is not None, 'no CSRF' 
    url = urljoin(CLOUD_URL, 'dispatcher?token=' + csrf)

    try:
        r = session.get(url, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Get upload domain HTTP request error: {}'.format(e))
        return None

    if r.status_code == requests.codes.ok:
        r_json = r.json()
        return r_json['body']['upload'][0]['url']
    elif LOGGER:
        LOGGER.error('Upload domain request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
    return None
upnp.py 文件源码 项目:upnpclient 作者: flyte 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _read_actions(self):
        action_url = urljoin(self._url_base, self._control_url)

        for action_node in self._findall('actionList/action'):
            name = action_node.findtext('name', namespaces=action_node.nsmap)
            argsdef_in = []
            argsdef_out = []
            for arg_node in action_node.findall(
                    'argumentList/argument', namespaces=action_node.nsmap):
                findtext = partial(arg_node.findtext, namespaces=arg_node.nsmap)
                arg_name = findtext('name')
                arg_statevar = self.statevars[findtext('relatedStateVariable')]
                if findtext('direction').lower() == 'in':
                    argsdef_in.append((arg_name, arg_statevar))
                else:
                    argsdef_out.append((arg_name, arg_statevar))
            action = Action(action_url, self.service_type, name, argsdef_in, argsdef_out)
            self.action_map[name] = action
            self.actions.append(action)
scrape.py 文件源码 项目:brobeat-OLD 作者: blacktop 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_log_types():
    url = "https://www.bro.org/sphinx/script-reference/"
    resp = requests.get(url=url + "log-files.html")
    soup = BeautifulSoup(resp.content, "html.parser")
    bro_logs = dict(logs=[])
    for table in soup.find_all("table", {"class": "docutils"}):
        for row in table.find('tbody').find_all('tr'):
            log = {}
            cols = row.find_all('td')
            cols = [ele.text.strip() for ele in cols]
            tds = [ele for ele in cols if ele]
            log['file'] = tds[0]
            log['log_type'] = os.path.splitext(log['file'])[0]
            log['description'] = tds[1]
            log['fields'] = []
            link = row.find('a', href=True)
            # do not add a URL for notice_alarm.log
            if link is not None and 'notice_alarm' not in log['log_type']:
                log['url'] = urljoin(url, link['href'])
                logger.info('adding log type: {}'.format(log['log_type']))
            bro_logs['logs'].append(log)
    return bro_logs
_ibm_http_client.py 文件源码 项目:ProjectQ 作者: ProjectQ-Framework 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _authenticate(email=None, password=None):
    """
    :param email:
    :param password:
    :return:
    """
    if email is None:
        try:
            input_fun = raw_input
        except NameError:
            input_fun = input
        email = input_fun('IBM QE user (e-mail) > ')
    if password is None:
        password = getpass.getpass(prompt='IBM QE password > ')

    r = requests.post(urljoin(_api_url, 'users/login'),
                      data={"email": email, "password": password})
    r.raise_for_status()

    json_data = r.json()
    user_id = json_data['userId']
    access_token = json_data['id']

    return user_id, access_token
_ibm_http_client_test.py 文件源码 项目:ProjectQ 作者: ProjectQ-Framework 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_send_real_device_offline(monkeypatch):
    def mocked_requests_get(*args, **kwargs):
        class MockResponse:
            def __init__(self, json_data, status_code):
                self.json_data = json_data
                self.status_code = status_code

            def json(self):
                return self.json_data

        # Accessing status of device. Return online.
        status_url = 'Backends/ibmqx2/queue/status'
        if args[0] == urljoin(_api_url_status, status_url):
            return MockResponse({"state": False}, 200)
    monkeypatch.setattr("requests.get", mocked_requests_get)
    shots = 1
    json_qasm = "my_json_qasm"
    name = 'projectq_test'
    with pytest.raises(_ibm_http_client.DeviceOfflineError):
        _ibm_http_client.send(json_qasm,
                              device="ibmqx2",
                              user=None, password=None,
                              shots=shots, verbose=True)
adapter.py 文件源码 项目:apimas 作者: grnet 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def construct_collection(self, instance, spec, loc, context):
        """
        Constructor for `.collection` predicate.

        This constructor aims to aggregate the cerberus validation schemas
        for every single field defined by the collection.
        """
        instance = super(self.__class__, self).construct_collection(
            instance, spec, loc, context)
        self.init_adapter_conf(instance)
        schema = {field_name: schema.get(self.ADAPTER_CONF, {})
                  for field_name, schema in doc.doc_get(
                      instance, ('*',)).iteritems()}
        collection = context.get('parent_name')
        endpoint = urljoin(
            self.root_url, TRAILING_SLASH.join([loc[0], collection]))
        endpoint += TRAILING_SLASH
        instance[self.ADAPTER_CONF] = schema
        client = ApimasClient(endpoint, schema)
        self.clients[loc[0] + '/' + collection] = client
        return instance
base.py 文件源码 项目:python-twitch-client 作者: tsifrer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _request_post(self, path, data=None, params=None):
        url = urljoin(BASE_URL, path)

        headers = self._get_request_headers()

        response = requests.post(url, json=data, params=params, headers=headers)
        response.raise_for_status()
        if response.status_code == 200:
            return response.json()
base.py 文件源码 项目:python-twitch-client 作者: tsifrer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _request_put(self, path, data=None, params=None):
        url = urljoin(BASE_URL, path)

        headers = self._get_request_headers()

        response = requests.put(url, json=data, params=params, headers=headers)
        response.raise_for_status()
        if response.status_code == 200:
            return response.json()
base.py 文件源码 项目:python-twitch-client 作者: tsifrer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _request_delete(self, path, params=None):
        url = urljoin(BASE_URL, path)

        headers = self._get_request_headers()

        response = requests.delete(url, params=params, headers=headers)
        response.raise_for_status()
        if response.status_code == 200:
            return response.json()
gitapi.py 文件源码 项目:GitHub-Repo-Recommender 作者: frankyjuang 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def req(url, hdr):
    try:
        res = requests.get(urljoin(BASE_URL, url), headers=hdr, timeout=10.0)
    except requests.Timeout:
        raise RequestTimeoutError(url)
    except requests.ConnectionError:
        raise RequestTimeoutError(url)
    if res.status_code != 200:
        raise StatusCodeError(url, res.status_code)
    return res
upload.py 文件源码 项目:mail.ru-uploader 作者: instefa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_cloud_space(session, csrf='', login=LOGIN):
    """ returns available free space in bytes """
    assert csrf is not None, 'no CSRF'

    timestamp = str(int(time.mktime(datetime.datetime.now().timetuple())* 1000))
    quoted_login = quote_plus(login)
    command = ('user/space?api=' + str(API_VER) + '&email=' + quoted_login +
               '&x-email=' + quoted_login + '&token=' + csrf + '&_=' + timestamp)
    url = urljoin(CLOUD_URL, command)

    try:
        r = session.get(url, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Get cloud space HTTP request error: {}'.format(e))
        return 0

    if r.status_code == requests.codes.ok:
        r_json = r.json()
        total_bytes = r_json['body']['total'] * 1024 * 1024
        used_bytes = r_json['body']['used'] * 1024 * 1024
        return total_bytes - used_bytes
    elif LOGGER:
        LOGGER.error('Cloud free space request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
    return 0
upload.py 文件源码 项目:mail.ru-uploader 作者: instefa 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post_file(session, domain='', file='', login=LOGIN):
    """ posts file to the cloud's upload server
    param: file - string filename with path
    """
    assert domain is not None, 'no domain'
    assert file is not None, 'no file'

    filetype = guess_type(file)[0]
    if not filetype:
        filetype = DEFAULT_FILETYPE
        if LOGGER:
            LOGGER.warning('File {} type is unknown, using default: {}'.format(file, DEFAULT_FILETYPE))

    filename = os.path.basename(file)
    quoted_login = quote_plus(login)
    timestamp = str(int(time.mktime(datetime.datetime.now().timetuple()))) + TIME_AMEND
    url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp)
    m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, 'rb'), filetype)})

    try:
        r = session.post(url, data=m, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Post file HTTP request error: {}'.format(e))
        return (None, None)

    if r.status_code == requests.codes.ok:
        if len(r.content):
            hash = r.content[:40].decode()
            size = int(r.content[41:-2])
            return (hash, size)
        elif LOGGER:
            LOGGER.error('File {} post error, no hash and size received'.format(file))
    elif LOGGER:
        LOGGER.error('File {} post error, http code: {}, msg: {}'.format(file, r.status_code, r.text))
    return (None, None)
conftest.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def prepare_url(value):
    # Issue #1483: Make sure the URL always has a trailing slash
    httpbin_url = value.url.rstrip('/') + '/'

    def inner(*suffix):
        return urljoin(httpbin_url, '/'.join(suffix))

    return inner
test_redfish.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_rfs_url(self):
        CONF.podm.url = "https://127.0.0.1:8443"
        expected = urljoin(CONF.podm.url, "redfish/v1/Systems/1")

        # test without service_ext
        result = redfish.get_rfs_url("/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("/Systems/1")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("Systems/1")
        self.assertEqual(expected, result)

        # test with service_ext
        result = redfish.get_rfs_url("/redfish/v1/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("/redfish/v1/Systems/1")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("redfish/v1/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("redfish/v1/Systems/1")
        self.assertEqual(expected, result)
test_redfish.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_get_rfs_url_with_tailing_slash(self):
        CONF.podm.url = "https://127.0.0.1:8443/"
        expected = urljoin(CONF.podm.url, "redfish/v1/Systems/1")

        # test without service_ext
        result = redfish.get_rfs_url("/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("/Systems/1")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("Systems/1")
        self.assertEqual(expected, result)

        # test with service_ext
        result = redfish.get_rfs_url("/redfish/v1/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("/redfish/v1/Systems/1")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("redfish/v1/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("redfish/v1/Systems/1")
        self.assertEqual(expected, result)
upnp.py 文件源码 项目:upnpclient 作者: flyte 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, url_base, service_type, service_id, control_url, scpd_url, event_sub_url):
        self._url_base = url_base
        self.service_type = service_type
        self.service_id = service_id
        self._control_url = control_url
        self.scpd_url = scpd_url
        self._event_sub_url = event_sub_url

        self.actions = []
        self.action_map = {}
        self.statevars = {}
        self._log = _getLogger('Service')

        self._log.debug('%s url_base: %s', self.service_id, self._url_base)
        self._log.debug('%s SCPDURL: %s', self.service_id, self.scpd_url)
        self._log.debug('%s controlURL: %s', self.service_id, self._control_url)
        self._log.debug('%s eventSubURL: %s', self.service_id, self._event_sub_url)

        url = urljoin(self._url_base, self.scpd_url)
        self._log.info('Reading %s', url)
        resp = requests.get(url, timeout=HTTP_TIMEOUT)
        resp.raise_for_status()
        self.scpd_xml = etree.fromstring(resp.content)
        self._find = partial(self.scpd_xml.find, namespaces=self.scpd_xml.nsmap)
        self._findtext = partial(self.scpd_xml.findtext, namespaces=self.scpd_xml.nsmap)
        self._findall = partial(self.scpd_xml.findall, namespaces=self.scpd_xml.nsmap)

        self._read_state_vars()
        self._read_actions()
core.py 文件源码 项目:pymarketcap 作者: mondeja 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def stats(self):
        """ Get global cryptocurrencies statistics.

        Returns:
            dict: Global markets statistics
        """
        url = urljoin(self.urls["api"], 'global/')
        response = get(url).json(parse_int=self.parse_int,
                                 parse_float=self.parse_float)
        return response


    #######    WEB PARSER METHODS    #######
core.py 文件源码 项目:pymarketcap 作者: mondeja 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_ranks(self, query, temp):
        """Internal function for get gainers and losers

        Args:
            query: Query to obtain ranks, gainers or losers
            temp: Temporal period obtaining gainers or losers,
                1h, 24h or 7d
        """
        url = urljoin(self.urls["web"], 'gainers-losers/')
        html = self._html(url)

        call = str(query) + '-' + str(temp)

        response = []
        html_rank = html.find('div', {'id': call}).find_all('tr')

        for curr in html_rank[1:]:
            _childs, childs = (curr.contents, [])
            for c in _childs:
                if c != '\n':
                    childs.append(c)
            for n, g in enumerate(childs):
                if n == 1:
                    name = str(g.a.getText())
                elif n == 2:
                    symbol = str(g.string)
                elif n == 3:
                    _volume_24h = sub(r'\$|,', '', g.a.getText())
                    volume_24h = self.parse_int(_volume_24h)
                elif n == 4:
                    _price = sub(r'\$|,', '', g.a.getText())
                    price = self.parse_float(_price)
                elif n == 5:
                    percent = self.parse_float(sub(r'%', '', g.string))
            currency = {'symbol': symbol, 'name': name,
                        '24h_volume_usd': volume_24h,
                        'price_usd': price, 'percent_change': percent}
            response.append(currency)
        return response
core.py 文件源码 项目:pymarketcap 作者: mondeja 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def global_cap(self, bitcoin=True, start=None, end=None):
        """Get global market capitalization graphs, including
        or excluding Bitcoin

        Args:
            bitcoin (bool, optional): Indicates if Bitcoin will
                be includedin global market capitalization graph.
                As default True
            start (optional, datetime): Time to start retrieving
                graphs data. If not provided get As default None
            end (optional, datetime): Time to end retrieving
                graphs data.

        Returns (dict):
            List of lists with timestamp and values
        """
        base_url = self.urls["graphs_api"]
        if bitcoin:
            endpoint = "global/marketcap-total/"
        else:
            endpoint = "global/marketcap-altcoin/"
        url = urljoin(base_url, endpoint)

        if start and end:
            url += self._add_start_end(url, start, end)

        return get(url).json()
conftest.py 文件源码 项目:filegardener 作者: smorin 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def prepare_url(value):
    # Issue #1483: Make sure the URL always has a trailing slash
    httpbin_url = value.url.rstrip('/') + '/'

    def inner(*suffix):
        return urljoin(httpbin_url, '/'.join(suffix))

    return inner
conftest.py 文件源码 项目:filegardener 作者: smorin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def prepare_url(value):
    # Issue #1483: Make sure the URL always has a trailing slash
    httpbin_url = value.url.rstrip('/') + '/'

    def inner(*suffix):
        return urljoin(httpbin_url, '/'.join(suffix))

    return inner
scrape.py 文件源码 项目:brobeat-OLD 作者: blacktop 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def build_url(current_url, next_url):
    if is_url(next_url):
        return next_url
    else:
        return urljoin(current_url, next_url)
_ibm_http_client.py 文件源码 项目:ProjectQ 作者: ProjectQ-Framework 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _run(qasm, device, user_id, access_token, shots):
    suffix = 'codes/execute'

    r = requests.post(urljoin(_api_url, suffix),
                      data=qasm,
                      params={"access_token": access_token,
                              "deviceRunType": device,
                              "fromCache": "false",
                              "shots": shots},
                      headers={"Content-Type": "application/json"})
    r.raise_for_status()

    r_json = r.json()
    execution_id = r_json["id"]
    return execution_id
_ibm_http_client.py 文件源码 项目:ProjectQ 作者: ProjectQ-Framework 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_result(execution_id, access_token, num_retries=300, interval=1):
    suffix = 'Executions/{execution_id}'.format(execution_id=execution_id)

    for _ in range(num_retries):
        r = requests.get(urljoin(_api_url, suffix),
                         params={"access_token": access_token})
        r.raise_for_status()

        r_json = r.json()
        status = r_json["status"]["id"]
        if status == "DONE":
            return r_json["result"]
        time.sleep(interval)
extensions.py 文件源码 项目:apimas 作者: grnet 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __call__(self, value):
        if value is None:
            return value
        return urljoin(self.ref_endpoint, value).rstrip('/') + '/'
clients.py 文件源码 项目:apimas 作者: grnet 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def format_endpoint(self, resource_id):
        """
        This method concatenates the resource's endpoint with a specified
        identifier.

        Example: endpoint/<pk>/
        """
        if isinstance(resource_id, unicode):
            resource_id = resource_id.encode("utf-8")
        return urljoin(self.endpoint, quote(
            str(resource_id))) + TRAILING_SLASH


问题


面经


文章

微信
公众号

扫码关注公众号