python类post()的实例源码

facebook_api.py 文件源码 项目:meetup-facebook-bot 作者: Stark-Mountain 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def send_message_to_facebook(access_token, user_id, message_data):
    """ Makes use of Send API:
        https://developers.facebook.com/docs/messenger-platform/send-api-reference
    """
    headers = {
        'Content-Type': 'application/json',
    }
    params = {
        'access_token': access_token,
    }
    payload = {
        'recipient': {
            'id': user_id,
        },
        'message': message_data,
    }
    url = 'https://graph.facebook.com/v2.6/me/messages'
    response = requests.post(url, headers=headers, params=params,
                             data=json.dumps(payload))
    response.raise_for_status()
    return response.json()
api.py 文件源码 项目:upstox-python 作者: upstox 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def api_call(self, url, http_method, data):

        headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key,
                   "authorization" : "Bearer " + self.access_token}

        r = None

        if http_method is PyCurlVerbs.POST:
            r = requests.post(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.DELETE:
            r = requests.delete(url, headers=headers)
        elif http_method is PyCurlVerbs.PUT:
            r = requests.put(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.GET:
            r = requests.get(url, headers=headers)

        return r
okta_auth.py 文件源码 项目:okta-awscli 作者: jmhale 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def primary_auth(self):
        """ Performs primary auth against Okta """

        auth_data = {
            "username": self.username,
            "password": self.password
        }
        resp = requests.post(self.base_url+'/api/v1/authn', json=auth_data)
        resp_json = resp.json()
        if 'status' in resp_json:
            if resp_json['status'] == 'MFA_REQUIRED':
                factors_list = resp_json['_embedded']['factors']
                state_token = resp_json['stateToken']
                session_token = self.verify_mfa(factors_list, state_token)
            elif resp_json['status'] == 'SUCCESS':
                session_token = resp_json['sessionToken']
        elif resp.status_code != 200:
            print(resp_json['errorSummary'])
            exit(1)
        else:
            print(resp_json)
            exit(1)

        return session_token
numerapi.py 文件源码 项目:numerai 作者: gansanay 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def upload_prediction(self, file_path):
        filename, signedRequest, headers, status_code = self.authorize(file_path)
        if status_code!=200:
            return status_code

        dataset_id, comp_id, status_code = self.get_current_competition()
        if status_code!=200:
            return status_code

        with open(file_path, 'rb') as fp:
            r = requests.Request('PUT', signedRequest, data=fp.read())
            prepped = r.prepare()
            s = requests.Session()
            resp = s.send(prepped)
            if resp.status_code!=200:
                return resp.status_code

        r = requests.post(self._submissions_url,
                    data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
                    headers=headers)

        return r.status_code
service.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def sendRequest(ip, port, route, data=None, protocol="http"):
        url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)

        if data is not None:
            try:
                resp = requests.post(url, data=data)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        else:
            try:
                resp = requests.get(url)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        return resp
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def put_comments(self, resource, comment):
        """ Post a comment on a file or URL.

        The initial idea of VirusTotal Community was that users should be able to make comments on files and URLs,
        the comments may be malware analyses, false positive flags, disinfection instructions, etc.

        Imagine you have some automatic setup that can produce interesting results related to a given sample or URL
        that you submit to VirusTotal for antivirus characterization, you might want to give visibility to your setup
        by automatically reviewing samples and URLs with the output of your automation.

        :param resource: either a md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
                         to comment on.
        :param comment: the actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
                        and reference users using the "@" syntax (e.g. @VirusTotalTeam).
        :return: If the comment was successfully posted the response code will be 1, 0 otherwise.
        """
        params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}

        try:
            response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
test1.py 文件源码 项目:cerebros_bot 作者: jslim18 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
pywalib.py 文件源码 项目:PyWallet 作者: AndreMiras 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def add_transaction(tx):
        """
        POST transaction to etherscan.io.
        """
        tx_hex = rlp.encode(tx).encode("hex")
        # use https://etherscan.io/pushTx to debug
        print("tx_hex:", tx_hex)
        url = 'https://api.etherscan.io/api'
        url += '?module=proxy&action=eth_sendRawTransaction'
        if ETHERSCAN_API_KEY:
            '&apikey=%' % ETHERSCAN_API_KEY
        # TODO: handle 504 timeout, 403 and other errors from etherscan
        response = requests.post(url, data={'hex': tx_hex})
        # response is like:
        # {'jsonrpc': '2.0', 'result': '0x24a8...14ea', 'id': 1}
        # or on error like this:
        # {'jsonrpc': '2.0', 'id': 1, 'error': {
        #   'message': 'Insufficient funds...', 'code': -32010, 'data': None}}
        response_json = response.json()
        print("response_json:", response_json)
        PyWalib.handle_etherscan_tx_error(response_json)
        tx_hash = response_json['result']
        # the response differs from the other responses
        return tx_hash
flora.py 文件源码 项目:flora 作者: Lamden 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def register(name):
    # hit api to see if name is already registered
    if check_name(name)['status'] == 'error':
        print('{} already registered.'.format(name))
    else:
        # generate new keypair
        (pub, priv) = rsa.newkeys(512)

        if os.path.exists(KEY_LOCATION) == False:
            os.mkdir(KEY_LOCATION)

        # save to disk
        with open('{}/.key'.format(KEY_LOCATION), 'wb') as f:
            pickle.dump((pub, priv), f, pickle.HIGHEST_PROTOCOL)

        r = requests.post('{}/names'.format(API_LOCATION), data = {'name' : name, 'n' : pub.n, 'e' : pub.e})
        if r.json()['status'] == 'success':
            print('Successfully registered new name: {}'.format(name))
        else:
            print('Error registering name: {}'.format(name))
api.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _make_request(self, path, cni_envs, expected_status=None):
        method = 'POST'

        address = config.CONF.cni_daemon.bind_address
        url = 'http://%s/%s' % (address, path)
        try:
            LOG.debug('Making request to CNI Daemon. %(method)s %(path)s\n'
                      '%(body)s',
                      {'method': method, 'path': url, 'body': cni_envs})
            resp = requests.post(url, json=cni_envs,
                                 headers={'Connection': 'close'})
        except requests.ConnectionError:
            LOG.exception('Looks like %s cannot be reached. Is kuryr-daemon '
                          'running?', address)
            raise
        LOG.debug('CNI Daemon returned "%(status)d %(reason)s".',
                  {'status': resp.status_code, 'reason': resp.reason})
        if expected_status and resp.status_code != expected_status:
            LOG.error('CNI daemon returned error "%(status)d %(reason)s".',
                      {'status': resp.status_code, 'reason': resp.reason})
            raise k_exc.CNIError('Got invalid status code from CNI daemon.')
        return resp
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def scan_file(self, this_file):
        """ Submit a file to be scanned by VirusTotal

        :param this_file: File to be scanned (32MB file size limit)
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key}
        try:
            if type(this_file) == str and os.path.isfile(this_file):
                files = {'file': (this_file, open(this_file, 'rb'))}
            elif isinstance(this_file, StringIO.StringIO):
                files = {'file': this_file.read()}
            else:
                files = {'file': this_file}
        except TypeError as e:
            return dict(error=e.message)

        try:
            response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def rescan_file(self, this_hash):
        """ Rescan a previously submitted filed or schedule an scan to be performed in the future.

        :param this_hash: a md5/sha1/sha256 hash. You can also specify a CSV list made up of a combination of any of
                          the three allowed hashes (up to 25 items), this allows you to perform a batch request with
                          one single call. Note that the file must already be present in our file store.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key, 'resource': this_hash}

        try:
            response = requests.post(self.base + 'file/rescan', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def scan_url(self, this_url):
        """ Submit a URL to be scanned by VirusTotal.

        :param this_url: The URL that should be scanned. This parameter accepts a list of URLs (up to 4 with the
                         standard request rate) so as to perform a batch scanning request with one single call. The
                         URLs must be separated by a new line character.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key, 'url': this_url}

        try:
            response = requests.post(self.base + 'url/scan', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def scan_file(self, this_file, notify_url=None, notify_changes_only=None):
        """ Submit a file to be scanned by VirusTotal.

        Allows you to send a file for scanning with VirusTotal. Before performing your submissions we encourage you to
        retrieve the latest report on the files, if it is recent enough you might want to save time and bandwidth by
        making use of it. File size limit is 32MB, in order to submmit files up to 200MB in size you must request a
        special upload URL.

        :param this_file: The file to be uploaded.
        :param notify_url: A URL to which a POST notification should be sent when the scan finishes.
        :param notify_changes_only: Used in conjunction with notify_url. Indicates if POST notifications should be
                                    sent only if the scan results differ from the previous analysis.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key}
        files = {'file': (this_file, open(this_file, 'rb'))}

        try:
            response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def cancel_rescan_file(self, resource):
        """ Delete a previously scheduled scan.

        Deletes a scheduled file rescan task. The file rescan api allows you to schedule periodic scans of a file,
        this API call tells VirusTotal to stop rescanning a file that you have previously enqueued for recurrent
        scanning.

        :param resource: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
        :return: JSON acknowledgement. In the event that the scheduled scan deletion fails for whatever reason, the
        response code will be -1.
        """
        params = {'apikey': self.api_key, 'resource': resource}

        try:
            response = requests.post(self.base + 'rescan/delete', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def put_comments(self, resource, comment):
        """ Post a comment on a file or URL.

        Allows you to place comments on URLs and files, these comments will be publicly visible in VirusTotal
        Community, under the corresponding tab in the reports for each particular item.

        Comments can range from URLs and locations where a given file was found in the wild to full reverse
        engineering reports on a given malware specimen, anything that may help other analysts in extending their
        knowledge about a particular file or URL.

        :param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
        to comment on.
        :param comment: The actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
        and reference users using the "@" syntax (e.g. @VirusTotalTeam).
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}

        try:
            response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
youtube.py 文件源码 项目:Zoom2Youtube 作者: Welltory 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_auth_code(self):
        """ Get access token for connect to youtube api """
        oauth_url = 'https://accounts.google.com/o/oauth2/token'
        data = dict(
            refresh_token=self.refresh_token,
            client_id=self.client_id,
            client_secret=self.client_secret,
            grant_type='refresh_token',
        )

        headers = {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Accept': 'application/json'
        }
        response = requests.post(oauth_url, data=data, headers=headers)
        response = response.json()
        return response.get('access_token')
bot.py 文件源码 项目:cerebros_bot 作者: jslim18 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
kcdu.py 文件源码 项目:kcdu 作者: learhy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_cd(col_name, type, display_name):
    url = 'https://api.kentik.com/api/v5/customdimension'
    json_template = '''
    {
        "name": "{{ column }}",
        "type": "{{ data_type }}",
        "display_name": "{{ pretty_name }}"
    }
    '''
    t = Template(json_template)
    data = json.loads(t.render(column = col_name, data_type = type, pretty_name = display_name))    
    response = requests.post(url, headers=headers, data=data)
    if response.status_code != 201:
        print("Unable to create custom dimension column. Exiting.")
        print("Status code: {}").format(response.status_code)
        print("Error message: {}").format(response.json()['error'])
        exit()
    else:
        print("Custom dimension \"{}\" created as id: {}").format(display_name, \
            response.json()['customDimension']['id'])
        return(response.json()['customDimension']['id'])
dockletrequest.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post_to_all(self, url = '/', data={}):
        if (url == '/'):
            res = []
            for masterip in masterips:
                try:
                    requests.post("http://"+getip(masterip)+":"+master_port+"/isalive/",data=data)
                except Exception as e:
                    logger.debug(e)
                    continue
                res.append(masterip)
            return res
        data = dict(data)
        data['token'] = session['token']
        logger.info("Docklet Request: user = %s data = %s, url = %s"%(session['username'], data, url))
        result = {}
        for masterip in masterips:
            try:
                res = requests.post("http://"+getip(masterip)+":"+master_port+url,data=data).json()
            except Exception as e:
                logger.debug(e)
                continue
            result[masterip] = res
            logger.debug("get result from " + getip(masterip))

        return result
disney_api.py 文件源码 项目:wait4disney 作者: gtt116 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_token():
    headers = {
        'Host': 'authorization.shanghaidisneyresort.com',
        'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
        'X-NewRelic-ID': 'Uw4BWVZSGwICU1VRAgkH',
        'X-Conversation-Id': 'shdrA5320488-E03F-4795-A818-286C658EEBB6',
        'Accept': 'application/json',
        'User-Agent': 'SHDR/4.1.1 (iPhone; iOS 9.3.5; Scale/2.00)',
        'Accept-Language': 'zh-Hans-CN;q=1',
    }

    data = 'assertion_type=public&client_id=DPRD-SHDR.MOBILE.IOS-PROD&client_secret=&grant_type=assertion'

    resp = requests.post('https://authorization.shanghaidisneyresort.com/curoauth/v1/token',
                         headers=headers, data=data)
    resp.raise_for_status()
    token = resp.json()['access_token']
    return token
myq.py 文件源码 项目:Home-Assistant 作者: jmart518 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def login(self):
        """Log in to the MyQ service."""

        params = {
            'username': self.username,
            'password': self.password
        }

        login = requests.post(
            'https://{host_uri}/{login_endpoint}'.format(
                host_uri=HOST_URI,
                login_endpoint=self.LOGIN_ENDPOINT),
                json=params,
                headers={
                    'MyQApplicationId': self.brand[APP_ID],
                    'User-Agent': self.USERAGENT
                }
        )

        auth = login.json()
        self.security_token = auth['SecurityToken']
        self._logger.debug('Logged in to MyQ API')
        return True
placebo_tests.py 文件源码 项目:placebo 作者: huseyinyilmaz 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_last_request(self, mock):
        response = requests.post(GetMock.url,
                                 headers={'custom-header': 'huseyin'},
                                 data={'name': 'huseyin'})
        self.assertEqual(response.status_code, 200)
        last_request = mock.last_request
        # Test if last request has expected values.
        self.assertEqual(last_request.url, GetMock.url)

        body = last_request.body
        # In python 3 httpretty backend returns binary string for body.
        # So we are decoding it back to unicode to test.
        if isinstance(body, six.binary_type):
            body = body.decode('utf-8')
        self.assertEqual(body, 'name=huseyin')
        self.assertEqual(last_request.headers.get('custom-header'),
                         'huseyin')
        # Make sure that class's last_request is same as instances.
        self.assertIs(GetMock.last_request, mock.last_request)
test_biodownloader.py 文件源码 项目:BioDownloader 作者: biomadeira 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_fetch_from_url_or_retry_post_json(self):
        # mocked requests
        identifier = "1csb, 2pah"
        base_url = c.http_pdbe
        endpoint_url = "api/pdb/entry/summary/"
        response = response_mocker(kwargs={}, base_url=base_url,
                                   endpoint_url=endpoint_url,
                                   content_type='application/octet-stream',
                                   post=True, data=identifier)
        self.fetch_from_url_or_retry = MagicMock(return_value=response)
        url = base_url + endpoint_url + identifier
        r = self.fetch_from_url_or_retry(url, json=True, post=True, data=identifier,
                                         header={'application/octet-stream'},
                                         retry_in=None, wait=0,
                                         n_retries=10, stream=False).json()
        self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
pipedrive_client.py 文件源码 项目:django_pipedrive 作者: MasAval 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def call_api(self, method, endpoint, payload):
        url = urlparse.urljoin(self.api_base_url, endpoint)

        if method == 'POST':
            response = requests.post(url, data=payload)
        elif method == 'delete':
            response = requests.delete(url)
        elif method == 'put':
            response = requests.put(url, data=payload)
        else:
            if self.api_key:
                payload.update({'api_token': self.api_key})
            response = requests.get(url, params=payload)

        content = json.loads(response.content)

        return content
okta_auth.py 文件源码 项目:okta-awscli 作者: jmhale 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def verify_single_factor(self, factor_id, state_token):
        """ Verifies a single MFA factor """
        factor_answer = raw_input('Enter MFA token: ')
        req_data = {
            "stateToken": state_token,
            "answer": factor_answer
        }
        post_url = "%s/api/v1/authn/factors/%s/verify" % (self.base_url, factor_id)
        resp = requests.post(post_url, json=req_data)
        resp_json = resp.json()
        if 'status' in resp_json:
            if resp_json['status'] == "SUCCESS":
                return resp_json['sessionToken']
        elif resp.status_code != 200:
            print(resp_json['errorSummary'])
            exit(1)
        else:
            print(resp_json)
            exit(1)
sensor.py 文件源码 项目:thermostat-sensor 作者: ankurp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read_and_report(force_alert=False):
  h, t = dht.read_retry(dht.DHT22, TEMP_SENSOR_PIN)
  data = {
    "reading": {
      "humidity": h,
      "temperature": t,
      "sensor_id": mac
    }
  }

  if force_alert:  
    data['reading']['force_alert'] = True

  print("Sending reading: {}".format(data))
  try:
    response = requests.post(
      "http://{DOMAIN}/readings".format(DOMAIN=SERVER_DOMAIN),
      data=json.dumps(data), headers=headers)
    print("Reading Sent")
  except requests.exceptions.RequestException as e:
    print("Error sending reading: {}".format(e))
bitx.py 文件源码 项目:bitrader 作者: jr-minnaar 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def api_request(self, call, params, kind='auth', http_call='get'):
        """
        General API request. Generally, use the convenience functions below
        :param kind: the type of request to make. 'auth' makes an authenticated call; 'basic' is unauthenticated
        :param call: the API call to make
        :param params: a dict of query parameters
        :return: a json response, a BitXAPIError is thrown if the api returns with an error
        """
        url = self.construct_url(call)
        auth = self.auth if kind == 'auth' else None
        if http_call == 'get':
            response = requests.get(url, params, headers=self.headers, auth=auth)
        elif http_call == 'post':
            response = requests.post(url, data = params, headers=self.headers, auth=auth)
        else:
            raise ValueError('Invalid http_call parameter')
        try:
            result = response.json()
        except ValueError:
            result = {'error': 'No JSON content returned'}
        if response.status_code != 200 or 'error' in result:
            raise BitXAPIError(response)
        else:
            return result
bitx.py 文件源码 项目:bitrader 作者: jr-minnaar 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def create_limit_order(self, order_type, volume, price):
        """
        Create a new limit order
        :param order_type: 'buy' or 'sell'
        :param volume: the volume, in BTC
        :param price: the ZAR price per bitcoin
        :return: the order id
        """
        data = {
            'pair': self.pair,
            'type': 'BID' if order_type == 'buy' else 'ASK',
            'volume': str(volume),
            'price': str(price)

        }
        result = self.api_request('postorder', params=data, http_call='post')
        return result
utils.py 文件源码 项目:Plamber 作者: OlegKlimenko 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_captcha(captcha_post):
    """
    Validates the Google re-captcha data.

    :param str captcha_post: The re-captcha post data
    :return bool:
    """
    validated = False

    data = {
        'secret': settings.GOOGLE_RECAPTCHA_SECRET_KEY,
        'response': captcha_post
    }

    response = requests.post('https://www.google.com/recaptcha/api/siteverify', data=data, verify=False).json()
    if response['success']:
        validated = True

    return validated


问题


面经


文章

微信
公众号

扫码关注公众号