python类auth()的实例源码

test1.py 文件源码 项目:cerebros_bot 作者: jslim18 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
authentication.py 文件源码 项目:core-python 作者: yidao620c 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def auth():
    # Basic Authentication
    requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
    requests.get('https://api.github.com/user', auth=('user', 'pass'))

    # Digest Authentication
    url = 'http://httpbin.org/digest-auth/auth/user/pass'
    requests.get(url, auth=HTTPDigestAuth('user', 'pass'))

    # OAuth2 Authentication????requests-oauthlib
    url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
    auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
    requests.get(url, auth=auth)



    pass
bot.py 文件源码 项目:cerebros_bot 作者: jslim18 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
api.py 文件源码 项目:upstox-python 作者: upstox 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def retrieve_access_token(self):
        """ once you have the authorization code, you can call this function to get
            the access_token. The access_token gives you full access to the API and is
            valid throughout the day
        """
        if self.api_key is None:
            raise (TypeError, 'Value api_key cannot be None. Please go to the Developer Console to get this value')
        if self.redirect_uri is None:
            raise (TypeError, 'Value redirect_uri cannot be None. Please go to the Developer Console to get this value')
        if self.api_secret is None:
            raise (TypeError, 'Value api_secret cannot be None. Please go to the Developer Console to get this value')
        if self.code is None:
            raise (TypeError, 'Value code cannot be None. Please visit the login URL to generate a code')

        params = {'code': self.code, 'redirect_uri': self.redirect_uri, 'grant_type': 'authorization_code'}

        url = self.config['host'] + self.config['routes']['accessToken']
        headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key}
        r = requests.post(url, auth=(self.api_key, self.api_secret), data=json.dumps(params), headers=headers)
        body = json.loads(r.text)
        if 'access_token' not in body:
            raise SystemError(body);
        return body['access_token']
restapiplugin.py 文件源码 项目:fauxmo-plugins 作者: n8henrie 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_state(self) -> str:
        """Get device state.

        Returns:
            "on", "off", or "unknown"

        """
        if self.state_cmd is None:
            return "unknown"

        resp = requests.request(self.state_method, self.state_cmd,
                                data=self.state_data, json=self.state_json,
                                headers=self.headers, auth=self.auth)

        if self.state_response_off in resp.text:
            return "off"
        elif self.state_response_on in resp.text:
            return "on"
        return "unknown"
ghost2loggerloopback.py 文件源码 项目:stackstorm-ghost2logger 作者: StackStorm-Exchange 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _ghost2loggergetGUID(self):
        # Let's deal with the JSON API calls here

        HEADERS = {"Content-Type": 'application/json'}

        _URL = self._ghost_url + "/v1/guid"
        try:
            r = requests.get(_URL, headers=HEADERS, verify=False,
                             auth=(self._username, self._password))
            _data = r.json()
            if _data['host'] == "GUID":
                for _item in _data['pattern']:
                    _guid = str(_item)

                return str(_guid)
            else:
                return ""

        except Exception as e:
            self._logger.info('[Ghost2logger]: Cannot get GUID: ' +
                              str(e))
ghost2loggerloopback.py 文件源码 项目:stackstorm-ghost2logger 作者: StackStorm-Exchange 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _ghost2loggergpurge(self):
        # Let's deal with the JSON API calls here
        self._logger.info('[Ghost2logger]: Purge Ghost2logger')

        HEADERS = {"Content-Type": 'application/json'}

        _URL = self._ghost_url + "/v1/all"
        try:
            r = requests.delete(_URL, headers=HEADERS, verify=False,
                                auth=(self._username, self._password))

            _data = r.json()
            self._logger.info('[Ghost2logger]: Purged ghost2logger' +
                              str(_data))
        except Exception as e:
            self._logger.info('[Ghost2logger]: Cannot purge Ghost2logger: ' +
                              str(e))
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __getAppSpaces(self):

        """Returns a list with the name of availables appspaces on FileNet,
        to apps variable.
        """        
        try:
            appspaces = requests.get(self.baseurl+'appspacenames',
                                     auth=self.cred)
            appspaces.raise_for_status()
            self.appspaces = appspaces.json()            
            self.apps = appspaces.json().keys()

        except Exception as e:
            print (str(e)+':\n'+str(appspaces.json()['UserMessage']['Text']))
            print (appspaces.json())
            self.apps =  appspaces.json()
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __getQueues(self):

        """Creates a list with URL adresses from workbaskets. Also creates a
        dictionary with workbasket name as key and it's URL as value.
        """
        for apps in self.roles.keys():
            for roles in self.roles.values():
                if roles:
                    for role in roles:
                        my_role = requests.get(self.baseurl
                                               +'appspaces/'
                                               +apps+'/roles/'
                                               +role,
                                               auth = self.cred)
                        if my_role.ok:                            
                            for uri in my_role.json()['workbaskets'].values():
                                self.queue_urls.append(uri['URI'])                                
                                self.workbaskets[uri['URI'].split(
                                    '/')[-1]] = uri['URI']
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def getQueue(self, work_basket):

        """Returns a Queue for a given Workbasket.
        Usage:
        >>> my_queue = pe.getQueue('workbasket_name')
        >>> my_queue.get('count')->Variable with the total tasks in this Queue.
        """                

        queue = requests.get(self.client.baseurl
                             + self.client.workbaskets.get(work_basket),
                             auth = self.client.cred)
        count = requests.get(queue.url + '/queueelements/count',
                             auth = self.client.cred).json()['count']
        queue = queue.json()
        queue['count'] = count
        return queue
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getAllTasks(self):

        """Returns all tasks from all Queues.
        Usage:
        >>> tasks = pe.getAllTasks()
        When a Queue has no tasks, a message informing which Queues are empty,
        will be printed.
        """
        tasks = []
        for uri in self.client.queue_urls:
            queue = requests.get(self.client.baseurl + uri,
                                 auth = self.client.cred)
            found_tasks = self.getTasks(queue.json())
            if found_tasks:
                tasks.append(found_tasks)
        return [tsk for task in tasks for tsk in task]
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def lockTask(self, task):

        """Receives a task dictionary, obtainned with getTasks() method,
        and locks the task so other users can't access this task at same time.
        Usage:
        >>> pe.lockTask(task)
        """

        locked = requests.get(self.client.baseurl
                              +task['stepElement'],
                              auth = self.client.cred)
        eTag = locked.headers['ETag']
        locked = requests.put(self.client.baseurl
                              + task['stepElement'],
                              auth = self.client.cred,
                              params={'action':'lock',
                                      'If-Match':eTag}
                              )
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def endTask(self, task, comment=None):        
        """Receives a task and finishes it, finishing the workflow itself or
        moving to the next step in the task. Is also possible to create a
        comment before ending the task.
        Usage:
        >>> pe.endTask(task) #or
        >>> pe.endTask(task, u'Completed the task!')

        """
        params = {'action':'dispatch'}
        step = self.getStep(task)
        if step.get('systemProperties').get('responses')\
           and not step.get('systemProperties').get('selectedResponse'):
            return "This task needs to be updated. Check the updateTask method."
        else:
            params['selectedResponse'] = 1           

        if comment:
            task = self.saveAndUnlockTask(task, comment)

        lock = self.lockTask(task)
        params['If-Match'] = task['ETag']
        dispatched = requests.put(self.client.baseurl + task['stepElement'],
                                  auth = self.client.cred,
                                  params=params)
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getUser(self, search_string):
        """Receives a string and looks for it in directory service. If the
        string search isn't found, the message "User not Found" will be
        returned, otherwise a list with all matching cases, limited to 50
        results will be returned.
        Usage:
        >>> users = pe.getUser('user_name')
        """

        users = []
        user = requests.get(self.client.baseurl+'users',
                            auth=self.client.cred,
                            params={'searchPattern':search_string,
                                    'searchType':4, 'limit':50})
        if user.json().get('users'):            
            for usr in user.json()['users']:
                users.append(usr['displayName'])
        else:
            return "User not Found"
        return users
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def getGroup(self, search_string):
        """Receives a string and looks for it in directory service. If the
        string search isn't found, the message "Group not Found" will be
        returned, otherwise a list with all matching cases, limited to 50
        results will be returned.
        Usage:
        >>> users = pe.getGroup('group_name')
        """        

        groups = []
        group = requests.get(self.client.baseurl+'groups',
                            auth=self.client.cred,
                            params={'searchPattern':search_string,
                                    'searchType':4, 'limit':3000})
        if group.json().get('groups'):
            for grp in group.json()['groups']:
                groups.append(grp['displayName'])
        else:
            return "Group not Found"
        return groups
apiServer.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def sendMessage() -> str:
    """ Send a message (internal or external) to the HackerOne report identified by the given ID"""
    data = request.get_json(force=True)
    message = data['message']
    internal = data['internal']
    id = data['id']

    if config.DEBUG:
        print("/v1/sendMessage: id=%s, internal=%s" % (id, internal))
    if config.DEBUGVERBOSE:
        print("message=%s" % message)

    h1Data = {'data': {'type': 'activity-comment',
                       'attributes': {'message': message,
                                      'internal': internal}}}
    headers = {'Content-Type': 'application/json'}
    resp = requests.post('https://api.hackerone.com/v1/reports/%s/activities' % id,
                         headers=headers,
                         data=json.dumps(h1Data).encode('utf-8'),
                         auth=(config.apiName, secrets.apiToken))
    return json.dumps(resp.json())
apiServer.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def changeStatus() -> str:
    """ Change the status of the report at the given ID to the given status """
    data = request.get_json(force=True)
    status = data['status']
    message = data['message']
    id = data['id']

    if config.DEBUG:
        print("/v1/changeStatus: id=%s, status=%s" % (id, status))
    if config.DEBUGVERBOSE:
        print("message=%s" % message)

    h1Data = {'data': {'type': 'state-change',
                       'attributes': {'message': message,
                                      'state': status}}}
    headers = {'Content-Type': 'application/json'}
    resp = requests.post('https://api.hackerone.com/v1/reports/%s/state_changes' % id,
                         headers=headers,
                         data=json.dumps(h1Data).encode('utf-8'),
                         auth=(config.apiName, secrets.apiToken))
    return json.dumps(resp.json())
AutoTriageUtils.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def postComment(id: str, vti: VulnTestInfo, internal=False, addStopMessage=False) -> Mapping:
    """ Post a comment to the report with the given ID using the information in the given VulnTestInfo
          - Set internal=True in order to post an internal comment
          - Set addStopMessage=True in order to add the stop message """
    if config.DEBUG:
        print("Posting comment: internal=%s, reproduced=%s, id=%s" % (str(internal), str(vti.reproduced), id))

    if addStopMessage:
        message = vti.message + '\n\n' + constants.disableMessage
    else:
        message = vti.message

    postMessage("Posting Message: \n\n%s" % message)  # TODO: Delete this

    resp = requests.post('http://api:8080/v1/sendMessage',
                         json={'message': message, 'internal': internal, 'id': id},
                         auth=HTTPBasicAuth('AutoTriageBot', secrets.apiBoxToken))

    if config.triageOnReproduce and vti.reproduced:
        changeStatus(id, 'triaged')

    return resp.json()
client.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, api_url, auth_custom=None, auth_user=None, auth_pass=None, login=None):
        """

        :param api_url: str Moira API URL
        :param auth_custom: dict auth custom headers
        :param auth_user: str auth user
        :param auth_pass: str auth password
        :param login: str auth login
        """
        if not api_url.endswith('/'):
            self.api_url = api_url + '/'
        else:
            self.api_url = api_url

        self.auth = None
        self.auth_custom = {'X-Webauth-User': login}

        if auth_user and auth_pass:
            self.auth = HTTPBasicAuth(auth_user, auth_pass)

        if auth_custom:
            self.auth_custom.update(auth_custom)
client.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.get(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
client.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def delete(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
client.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def put(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.put(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
utorrent.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def add_torrent(self, torrent_data: Union[str, bytes], download_dir: str=None) -> bool:

        self.total_size = 0
        self.expected_torrent_name = ''
        lf = NamedTemporaryFile()
        lf.write(torrent_data)

        params = {'action': 'add-file', 'token': self.token}
        files = {'torrent_file': open(lf.name, 'rb')}
        try:
            response = requests.post(
                self.UTORRENT_URL,
                auth=self.auth,
                params=params,
                files=files,
                timeout=25).json()
            lf.close()
            if 'error' in response:
                return False
            else:
                return True
        except RequestException:
            lf.close()
            return False
utorrent.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def add_url(self, url: str, download_dir: str=None) -> bool:

        self.total_size = 0
        self.expected_torrent_name = ''

        params = {'action': 'add-url', 'token': self.token, 's': url}
        try:
            response = requests.get(
                self.UTORRENT_URL,
                auth=self.auth,
                # cookies=self.cookies,
                params=params,
                timeout=25).json()
            if 'error' in response:
                return False
            else:
                return True
        except RequestException:
            return False
client.py 文件源码 项目:pact-broker-client 作者: Babylonpartners 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def push_pact(self, *, pact_file, provider, consumer, consumer_version):
        request_url = urls.PUSH_PACT_URL.format(
            broker_url=self.broker_url,
            provider=provider,
            consumer=consumer,
            consumer_version=consumer_version
        )

        with open(pact_file) as data_file:
            pact_json = json.load(data_file)

        response = requests.put(
            request_url,
            auth=self._auth,
            json=pact_json
        )
        response.raise_for_status()
        return response, (
            f'Pact between {consumer} and {provider} pushed.'
        )
client.py 文件源码 项目:pact-broker-client 作者: Babylonpartners 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def tag_consumer(self, *, provider, consumer, consumer_version, tag):
        request_url = urls.TAG_CONSUMER_URL.format(
            broker_url=self.broker_url,
            consumer=consumer,
            consumer_version=consumer_version,
            tag=tag
        )
        response = requests.put(
            request_url,
            headers={'Content-Type': 'application/json'},
            auth=self._auth
        )
        response.raise_for_status()
        return response, (
            f'{consumer} version {consumer_version} tagged as {tag}'
        )
connection.py 文件源码 项目:tfs 作者: devopshq 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, base_url, project, user, password, verify=False, timeout=None, auth_type=None):
        if not base_url.endswith('/'):
            base_url += '/'

        collection, project = self.get_collection_and_project(project)
        # Remove part after / in project-name, like DefaultCollection/MyProject => DefaultCollection
        # API responce only in Project, without subproject
        self._url = base_url + '%s/_apis/' % collection
        if project:
            self._url_prj = base_url + '%s/%s/_apis/' % (collection, project)
        else:
            self._url_prj = self._url

        self.http_session = requests.Session()
        auth = auth_type(user, password)
        self.http_session.auth = auth

        self.timeout = timeout
        self._verify = verify
        if not self._verify:
            from requests.packages.urllib3.exceptions import InsecureRequestWarning
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
resolver.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _compose_args(self, method, data=None):
        args = {}
        if data is None:
            data = {}
        query = 'params' if method == 'get' else 'data'
        args[query] = self._structure.get('common-params', {})
        args[query].update(data)
        args['headers'] = self._structure.get('common-headers', {})
        auth = self._structure.get('auth')
        if auth == 'params':
            args[query].update({'username': self.billing_username,
                                'password': self.billing_password})
        elif auth == 'headers':
            args['auth'] = HTTPBasicAuth(
                self.billing_username, self.billing_password)
        if self._structure.get('verify') == False:
            args['verify'] = False
        return args
netconf.py 文件源码 项目:pathman-sr 作者: CiscoDevNet 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_url(url):
    '''request url'''
    headers = {'Content-type': 'application/json'}
    auth = (controller['any_user'], controller['any_pass'])
    logging.info(url)
    try:
        response = requests.get(url, headers=headers, auth=auth, verify=False)
        logging.info("Url GET Status: %s" % response.status_code)

        if response.status_code in [200]:
            return True, response.json()
        else:
            return False, str(response.text)
    except requests.exceptions.ConnectionError, e:
        logging.error('Connection Error: %s' % e.message)
        return False, str(e.message)
netconf.py 文件源码 项目:pathman-sr 作者: CiscoDevNet 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def post_xml(url, data):
    '''request post'''
    headers = {'Content-type': 'application/xml'}
    auth = (controller['any_user'], controller['any_pass'])
    try:
        response =  requests.post(url, data=data, auth=auth, headers=headers, verify=False)
        logging.info("Url POST Status: %s" % response.status_code)
        if response.status_code in [200, 204]:
            if len(response.text) > 0:
                return True, response.json()
            else:
                return True, {}
        else:
            return False, str(response.text)
    except requests.exceptions.ConnectionError, e:
        logging.error('Connection Error: %s' % e.message)
        return False, str(e.message)


问题


面经


文章

微信
公众号

扫码关注公众号