python类put()的实例源码

api.py 文件源码 项目:upstox-python 作者: upstox 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def api_call(self, url, http_method, data):

        headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key,
                   "authorization" : "Bearer " + self.access_token}

        r = None

        if http_method is PyCurlVerbs.POST:
            r = requests.post(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.DELETE:
            r = requests.delete(url, headers=headers)
        elif http_method is PyCurlVerbs.PUT:
            r = requests.put(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.GET:
            r = requests.get(url, headers=headers)

        return r
custom-resource.py 文件源码 项目:aws-waf-security-automation 作者: cerbo 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def send_response(event, context, responseStatus, responseData):
    responseBody = {'Status': responseStatus,
                    'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
                    'PhysicalResourceId': context.log_stream_name,
                    'StackId': event['StackId'],
                    'RequestId': event['RequestId'],
                    'LogicalResourceId': event['LogicalResourceId'],
                    'Data': responseData}

    req = None
    try:
        req = requests.put(event['ResponseURL'], data=json.dumps(responseBody))

        if req.status_code != 200:
            print(req.text)
            raise Exception('Recieved non 200 response while sending response to CFN.')
        return

    except requests.exceptions.RequestException as e:
        if req != None:
            print(req.text)
        print(e)
        raise
myq.py 文件源码 项目:Home-Assistant 作者: jmart518 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def set_state(self, device_id, state):
        """Set device state."""
        payload = {
            'attributeName': 'desireddoorstate',
            'myQDeviceId': device_id,
            'AttributeValue': state,
        }
        device_action = requests.put(
            'https://{host_uri}/{device_set_endpoint}'.format(
                host_uri=HOST_URI,
                device_set_endpoint=self.DEVICE_SET_ENDPOINT),
                data=payload,
                headers={
                    'MyQApplicationId': self.brand[APP_ID],
                    'SecurityToken': self.security_token,
                    'User-Agent': self.USERAGENT
                }
        )

        return device_action.status_code == 200
pipedrive_client.py 文件源码 项目:django_pipedrive 作者: MasAval 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def call_api(self, method, endpoint, payload):
        url = urlparse.urljoin(self.api_base_url, endpoint)

        if method == 'POST':
            response = requests.post(url, data=payload)
        elif method == 'delete':
            response = requests.delete(url)
        elif method == 'put':
            response = requests.put(url, data=payload)
        else:
            if self.api_key:
                payload.update({'api_token': self.api_key})
            response = requests.get(url, params=payload)

        content = json.loads(response.content)

        return content
api.py 文件源码 项目:lecli 作者: rapid7 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def replace_log(log_id, params):
    """
    Replace the given log with the details provided
    """
    headers = api_utils.generate_headers('rw')

    try:
        response = requests.put(_url(('logs', log_id))[1], json=params, headers=headers)
        if response_utils.response_error(response):
            sys.stderr.write('Update log failed.\n')
            sys.exit(1)
        elif response.status_code == 200:
            sys.stdout.write('Log: %s updated to:\n' % log_id)
            api_utils.pretty_print_string_as_json(response.text)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
handler.py 文件源码 项目:aws-tailor 作者: alanwill 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def cfn_response(response_status, response_data, reason, physical_resource_id, event, context):
    # Put together the response to be sent to the S3 pre-signed URL

    if reason:
        reason = reason
    else:
        reason = 'See the details in CloudWatch Log Stream: ' + context.log_stream_name

    responseBody = {'Status': response_status,
                    'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
                    'PhysicalResourceId': physical_resource_id,
                    'StackId': event['StackId'],
                    'RequestId': event['RequestId'],
                    'LogicalResourceId': event['LogicalResourceId'],
                    'Data': response_data
                    }
    print('Response Body:', responseBody)
    response = requests.put(event['ResponseURL'], data=json.dumps(responseBody))
    if response.status_code != 200:
        print(response.text)
        raise Exception('Response error received.')
    return
mesos.py 文件源码 项目:mesos-autoscaler 作者: prudhvitella 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __requests_put(self, uri, data, headers):
        if self.mesos_user:
            # print "User " + self.mesos_user + "Pass " + self.mesos_pass
            # print "URI " + self.mesos_host + uri
            response = requests.put(self.mesos_host + uri, data,
                                    headers=headers,
                                    verify=False,
                                    auth=(self.mesos_user,
                                          self.mesos_pass))
        else:
            response = requests.put(self.mesos_host + uri,
                                    data,
                                    headers=headers,
                                    verify=False)
            # print "Response:" + str(response)
        return response.json()
marathon.py 文件源码 项目:mesos-autoscaler 作者: prudhvitella 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __requests_put(self, uri, data, headers):
        if self.marathon_user:
            # print "User " + self.marathon_user + "Pass " + self.marathon_pass
            # print "URI " + self.marathon_host + uri
            response = requests.put(self.marathon_host + uri, data,
                                    headers=headers,
                                    verify=False,
                                    auth=(self.marathon_user,
                                          self.marathon_pass))
        else:
            response = requests.put(self.marathon_host + uri,
                                    data,
                                    headers=headers,
                                    verify=False)
            # print "Response:" + str(response)
        return response
DBScript.py 文件源码 项目:aws-database-migration-tools 作者: awslabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sendResponse(event, context, responseStatus, responseData):
    responseBody = {'Status': responseStatus,
                    'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
                    'PhysicalResourceId': context.log_stream_name,
                    'StackId': event['StackId'],
                    'RequestId': event['RequestId'],
                    'LogicalResourceId': event['LogicalResourceId'],
                    'Data': responseData}
    print 'RESPONSE BODY:n' + json.dumps(responseBody)
    try:
        req = requests.put(event['ResponseURL'], data=json.dumps(responseBody))
        if req.status_code != 200:
            print req.text
            raise Exception('Recieved non 200 response while sending response to CFN.')
        return
    except requests.exceptions.RequestException as e:
        print e
        raise
generate_fake_data.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_cloud(self, cloud, data=None):
        cloud_url = self.url + "/clouds"
        payload = {"name": cloud}

        print("Creating cloud entry for %s with data %s" % (payload, data))
        resp = requests.post(cloud_url, headers=self.headers,
                             data=json.dumps(payload), verify=False)
        if resp.status_code != 201:
            raise Exception(resp.text)

        self.cloud = resp.json()
        if data:
            reg_id = self.cloud["id"]
            cloud_data_url = self.url + "/clouds/%s/variables" % reg_id
            resp = requests.put(cloud_data_url, headers=self.headers,
                                data=json.dumps(data), verify=False)
            if resp.status_code != 200:
                print(resp.text)
generate_fake_data.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_region(self, region, data=None):
        region_url = self.url + "/regions"
        payload = {"name": region, "cloud_id": self.cloud.get("id")}

        print("Creating region entry for %s with data %s" % (payload, data))
        resp = requests.post(region_url, headers=self.headers,
                             data=json.dumps(payload), verify=False)
        if resp.status_code != 201:
            raise Exception(resp.text)

        self.region = resp.json()
        if data:
            reg_id = self.region["id"]
            region_data_url = self.url + "/regions/%s/variables" % reg_id
            resp = requests.put(region_data_url, headers=self.headers,
                                data=json.dumps(data), verify=False)
            if resp.status_code != 200:
                print(resp.text)
generate_fake_data.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_cell(self, cell, data=None):
        region_url = self.url + "/cells"
        payload = {"region_id": self.region.get("id"),
                   "cloud_id": self.cloud.get("id"), "name": cell}

        print("Creating cell entry %s with data %s" % (payload, data))
        resp = requests.post(region_url, headers=self.headers,
                             data=json.dumps(payload), verify=False)
        if resp.status_code != 201:
            raise Exception(resp.text)

        self.cell = resp.json()
        if data:
            c_id = resp.json()["id"]
            region_data_url = self.url + "/cells/%s/variables" % c_id
            resp = requests.put(region_data_url, headers=self.headers,
                                data=json.dumps(data), verify=False)
            if resp.status_code != 200:
                print(resp.text)
dactyl_build.py 文件源码 项目:dactyl 作者: ripple 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def upload_es_json(es_json, es_index, es_base, id, doc_type='article'):
    """Uploads a document to the ElasticSearch index."""

    # Using requests
    url = "{es_base}/{index}/{doc_type}/{id}".format(
        es_base=es_base,
        index=es_index.lower(), # ES index names must be lowercase
        doc_type=doc_type,
        id=id,
    )
    headers = {"Content-Type": "application/json"}
    logger.info("Uploading to ES: PUT %s" % url)
    r = requests.put(url, headers=headers, data=es_json)
    if r.status_code >= 400:
        recoverable_error("ES upload failed with error: '%s'" % r.text,
                config.bypass_errors)
backend.py 文件源码 项目:berlin-devfest-2016-backend 作者: giansegato 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def predict(path):
    global n
    global model

    parentNode = os.path.split(path)[0]
    j = requests.get(FIREBASE_URL + parentNode + FIREBASE_ENDING_URL).json()
    sample = np.asarray([j[x] for x in j.keys() if "x_" in x]).reshape(1, -1)

    if sample.shape[1] == n:
        j['y_predicted'] = model.predict_proba(sample)[0][1]
        j['action'] = model.predict(sample)[0]
        r = requests.put(FIREBASE_URL + parentNode + FIREBASE_ENDING_URL, data = json.dumps(j))

        print "Features: ", sample
        print "Prediction: ", model.predict_proba(sample) 
    else:
        print "Couldn't make prediction, wrong dimension: ", sample.shape[1], " vs ", n

# 2nd: SSEClient that prints stuff
build.py 文件源码 项目:deeppavlov 作者: deepmipt 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def upload_model_to_nexus(project):
    """
    Use 'pyb -P model_name="<model_name>" upload_model_to_nexus' to upload archived model to Nexus repository
    of the lab. If model_name == 'deeppavlov_docs', then documentation from build/docs will be archived and uploaded.
    archive_model task will be executed before.
    """
    import requests, datetime
    os.chdir('build')
    model_name = project.get_property('model_name')
    file_name = model_name + '_' + datetime.date.today().strftime("%y%m%d") + '.tar.gz'
    url = 'http://share.ipavlov.mipt.ru:8080/repository/'
    url += 'docs/' if model_name == 'deeppavlov_docs' else 'models/'
    headers = {'Content-Type': 'application/binary'}
    with open(file_name, 'rb') as artifact:
        requests.put(url + model_name + '/' + file_name, headers=headers,
                     data=artifact, auth=('jenkins', 'jenkins123'))
meraki.py 文件源码 项目:provisioning-lib 作者: meraki 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def renameorg(apikey, orgid, neworgname, suppressprint=False):
    __hasorgaccess(apikey, orgid)
    calltype = 'Organization Rename'
    puturl = '{0}/organizations/{1}'.format(str(base_url), str(orgid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    putdata = {
        'name': format(str(neworgname))
    }
    dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


# Create a new organization
# https://dashboard.meraki.com/api_docs#create-a-new-organization
meraki.py 文件源码 项目:provisioning-lib 作者: meraki 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def updatecontact(apikey, networkid, contactid, name, suppressprint=False):
    calltype = 'Phone Contact'
    puturl = '{0}/networks/{1}/phoneContacts/{2}'.format(str(base_url), str(networkid), str(contactid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }

    putdata = {'name': name}

    dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


# Delete a phone contact. Google Directory contacts cannot be removed.
# https://dashboard.meraki.com/api_docs#delete-a-phone-contact
merakiapi.py 文件源码 项目:provisioning-lib 作者: meraki 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def updatevlan(apikey, networkid, vlanid, vlanname=None, mxip=None, subnetip=None, suppressprint=False):
    calltype = 'VLAN'
    puturl = '{0}/networks/{1}/vlans/{2}'.format(str(base_url), str(networkid), str(vlanid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    putdata = {}
    if vlanname is not None:
        putdata['name'] = format(str(vlanname))
    if mxip is not None:
        putdata['applianceIp'] = format(str(mxip))
    if subnetip is not None:
        putdata['subnet'] = format(str(subnetip))

    putdata = json.dumps(putdata)
    dashboard = requests.put(puturl, data=putdata, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result
merakiapi.py 文件源码 项目:provisioning-lib 作者: meraki 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def renameorg(apikey, orgid, neworgname, suppressprint=False):
    __hasorgaccess(apikey, orgid)
    calltype = 'Organization Rename'
    puturl = '{0}/organizations/{1}'.format(str(base_url), str(orgid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    putdata = {
        'name': format(str(neworgname))
    }
    dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def lockTask(self, task):

        """Receives a task dictionary, obtainned with getTasks() method,
        and locks the task so other users can't access this task at same time.
        Usage:
        >>> pe.lockTask(task)
        """

        locked = requests.get(self.client.baseurl
                              +task['stepElement'],
                              auth = self.client.cred)
        eTag = locked.headers['ETag']
        locked = requests.put(self.client.baseurl
                              + task['stepElement'],
                              auth = self.client.cred,
                              params={'action':'lock',
                                      'If-Match':eTag}
                              )
fnetpepAPI.py 文件源码 项目:FileNetPEAPI 作者: wandss 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def endTask(self, task, comment=None):        
        """Receives a task and finishes it, finishing the workflow itself or
        moving to the next step in the task. Is also possible to create a
        comment before ending the task.
        Usage:
        >>> pe.endTask(task) #or
        >>> pe.endTask(task, u'Completed the task!')

        """
        params = {'action':'dispatch'}
        step = self.getStep(task)
        if step.get('systemProperties').get('responses')\
           and not step.get('systemProperties').get('selectedResponse'):
            return "This task needs to be updated. Check the updateTask method."
        else:
            params['selectedResponse'] = 1           

        if comment:
            task = self.saveAndUnlockTask(task, comment)

        lock = self.lockTask(task)
        params['If-Match'] = task['ETag']
        dispatched = requests.put(self.client.baseurl + task['stepElement'],
                                  auth = self.client.cred,
                                  params=params)
shared.py 文件源码 项目:warranty_check 作者: device42 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def upload_lifecycle(self, data):
        path = '/api/1.0/lifecycle_event/'
        url = self.url + path
        payload = data
        headers = {
            'Authorization': 'Basic ' + base64.b64encode(self.username + ':' + self.password),
            'Content-Type': 'application/x-www-form-urlencoded'
        }

        r = requests.put(url, data=payload, headers=headers, verify=False)

        if DEBUG:
            print '\t[+] Posting data: %s' % str(payload)
            print '\t[*] Status code: %d' % r.status_code
            print '\t[*] Response: %s' % str(r.text)
        return r.json()
s3.py 文件源码 项目:sqlalchemy-media 作者: pylover 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _upload_file(self, url: str, data: str, content_type: str,
                     rrs: bool = False):
        ensure_aws4auth()

        auth = AWS4Auth(self.access_key, self.secret_key, self.region, 's3')
        if rrs:
            storage_class = 'REDUCED_REDUNDANCY'
        else:
            storage_class = 'STANDARD'
        headers = {
            'Cache-Control': 'max-age=' + str(self.max_age),
            'x-amz-acl': self.acl,
            'x-amz-storage-class': storage_class
        }
        if content_type:
            headers['Content-Type'] = content_type
        res = requests.put(url, auth=auth, data=data, headers=headers)
        if not 200 <= res.status_code < 300:
            raise S3Error(res.text)
deploycustomer.py 文件源码 项目:automation-scripts 作者: meraki 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setdevicedata(p_apikey, p_shardhost, p_nwid, p_device, p_field, p_value, p_movemarker):
    #modifies value of device record. Returns the new value
    #on failure returns one device record, with all values 'null'
    #p_movemarker is boolean: True/False

    movevalue = "false"
    if p_movemarker:
        movevalue = "true"

    time.sleep(API_EXEC_DELAY)
    try:
        r = requests.put('https://%s/api/v0/networks/%s/devices/%s' % (p_shardhost, p_nwid, p_device), data=json.dumps({p_field: p_value, 'moveMapMarker': movevalue}), headers={'X-Cisco-Meraki-API-Key': p_apikey, 'Content-Type': 'application/json'})
    except:
        printusertext('ERROR 16: Unable to contact Meraki cloud')
        sys.exit(2)

    if r.status_code != requests.codes.ok:
        return ('null')

    return('ok')
Container.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def saucelabsSetup(self):
        self.log("Setup for saucelabs")
        auth = (self._remoteUsername, os.getenv("REMOTE_ACCESSKEY"))
        baseURL = "{0}/{1}".format(self.SAUCELABS_API_BASE, self._remoteUsername)
        jobsURL = "{0}/jobs".format(baseURL)
        response = requests.get(jobsURL, \
            auth = auth, \
            headers = { 'content-type': 'application/json' }, \
            params = { 'full': 'true' })
        self.log("response={0}".format(json.dumps(response.json())))
        remoteJobid = self.saucelabsFindJobId(response.json(), self._remoteTestID())
        self.saucelabsJobURL = '{0}/jobs/{1}'.format(baseURL, remoteJobid)
        self._resultReference = "{0}/{1}".format(self.SAUCELABS_RESULTS_BASE, remoteJobid)
        response = requests.put(self.saucelabsJobURL,
            auth = auth, 
            headers = { 'content-type': 'application/json' }, 
            data = json.dumps({'name': self._buildID() }))
dss_object_ops.py 文件源码 项目:jcsclient 作者: jiocloudservices 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def execute(self):
        # get signature
        auth = DSSAuth(self.http_method, self.access_key, self.secret_key, self.dss_op_path, content_type = 'application/octet-stream')
        signature = auth.get_signature()
        self.http_headers['Authorization'] = signature
        self.http_headers['Date'] = formatdate(usegmt=True)
        statinfo = os.stat(self.local_file_name)
        self.http_headers['Content-Length'] = statinfo.st_size
        self.http_headers['Content-Type'] = 'application/octet-stream'

        # construct request
        request_url = self.dss_url + self.dss_op_path
        data = open(self.local_file_name, 'rb')
        # make request
        resp = requests.put(request_url, headers = self.http_headers, data=data, verify = self.is_secure_request)
        return resp
dss_object_ops.py 文件源码 项目:jcsclient 作者: jiocloudservices 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def execute(self):
        # get signature
        query_str = 'partNumber=' + self.part_number + '&uploadId=' + self.upload_id
        auth = DSSAuth(self.http_method, self.access_key, self.secret_key, self.dss_op_path, query_str = self.dss_query_str_for_signature, content_type = 'application/octet-stream')
        signature = auth.get_signature()
        self.http_headers['Authorization'] = signature
        self.http_headers['Date'] = formatdate(usegmt=True)
        statinfo = os.stat(self.local_file_name)
        self.http_headers['Content-Length'] = statinfo.st_size
        self.http_headers['Content-Type'] = 'application/octet-stream'

        # construct request
        request_url = self.dss_url + self.dss_op_path 
        if(self.dss_query_str is not None):
            request_url += '?' + self.dss_query_str  
        data = open(self.local_file_name, 'rb')
        # make request
        resp = requests.put(request_url, headers = self.http_headers, data=data, verify = self.is_secure_request)

        return resp
smr_engine.py 文件源码 项目:son-mano-framework 作者: sonata-nfv 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_vh(self, sm_type, uuid):
        exists = False
        vh_name = '{0}-{1}'.format(sm_type,uuid)
        api = '/api/vhosts/'
        url_list = '{0}{1}'.format(self.host,api)
        url_create = '{0}{1}{2}'.format(self.host,api,vh_name)
        url_permission = '{0}/api/permissions/{1}/specific-management'.format(self.host,vh_name)
        data = '{"configure":".*","write":".*","read":".*"}'
        list = requests.get(url=url_list, auth= ('guest','guest')).json()
        for i in range(len(list)):
            if list[i]['name'] == vh_name:
                exists = True
                break
        if not exists:
            response = requests.put(url=url_create, headers=self.headers, auth=('guest', 'guest'))
            permission = requests.put(url=url_permission, headers=self.headers, data=data, auth=('guest', 'guest'))
            return response.status_code, permission.status_code
        else:
            return 0,0
transfer_knowledgebase.py 文件源码 项目:zendesk-utils 作者: trailbehind 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete_all_zendesk_articles(self, category):
    '''
        delete all articles in the category being filled with UV articles
        for our app, we had multiple uservoice knowledge bases, and each gets put in a zendesk category
    '''

    print "**DELETING ALL SECTIONS/ARTICLES in destination category {}".format(category)
    url = '{}/api/v2/help_center/categories/{}/sections.json'.format(self.zendesk_url, category)
    response = requests.get(url, headers=self.headers, auth=self.credentials)
    if response.status_code != 200:
      print('FAILED to delete articles with error {}'.format(response.status_code))
      exit()
    sections = response.json()['sections']
    section_ids=list(section['id'] for section in sections)

    for section_id in section_ids:
      url = "{}/api/v2/help_center/sections/{}.json".format(self.zendesk_url, section_id)
      response = requests.delete(url, headers=self.headers, auth=self.credentials)
      if response.status_code != 204:
        print('FAILED to delete sections for category {} with error {}'.format(category, response.status_code))
        exit()
registration.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _upload_assets_to_OSF(dlgr_id, osf_id, provider="osfstorage"):
    """Upload experimental assets to the OSF."""
    root = "https://files.osf.io/v1"
    snapshot_filename = "{}-code.zip".format(dlgr_id)
    snapshot_path = os.path.join("snapshots", snapshot_filename)
    r = requests.put(
        "{}/resources/{}/providers/{}/".format(
            root,
            osf_id,
            provider,
        ),
        params={
            "kind": "file",
            "name": snapshot_filename,
        },
        headers={
            "Authorization": "Bearer {}".format(
                config.get("osf_access_token")),
            "Content-Type": "text/plain",
        },
        data=open(snapshot_path, 'rb'),
    )
    r.raise_for_status()


问题


面经


文章

微信
公众号

扫码关注公众号