python类delete()的实例源码

test_RankerProxy.py 文件源码 项目:retrieve-and-rank-tuning 作者: rchaks 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _try_deleting_ranker(self, ranker_id):
        config = load_config()
        url, user_id, password = get_rnr_credentials(config)

        response = requests.get(path.join(url, 'v1/rankers', ranker_id),
                                auth=(user_id, password),
                                headers={'x-global-transaction-id': 'Rishavs app',
                                         'Content-type': 'application/json'})
        response_text = json.dumps(response.json(), indent=4, sort_keys=True)
        if response.status_code == 200:
            self.logger.info('Found a test ranker that needs cleanup: %s' % response_text)
            response = requests.delete(path.join(url, 'v1/rankers', ranker_id),
                                       auth=(user_id, password),
                                       headers={'x-global-transaction-id': 'Rishavs app',
                                                'Content-type': 'application/json'})
            response.raise_for_status()
            self.logger.info("Successfully deleted test ranker: %s" % ranker_id)
        else:
            self.logger.info('No cleanup required for ranker id: %s (got response: %s)' % (ranker_id, response_text))
app.py 文件源码 项目:gremlin 作者: amalgam8 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def delete_recipe(recipe_id):
    # clear fault injection rules
    url = '{0}/v1/rules?tag={1}'.format(a8_controller_url, recipe_id)
    headers = {}
    if a8_controller_token != "" :
        headers['Authorization'] = "Bearer " + token

    try:
        r = requests.delete(url, headers=headers)
    except Exception, e:
        sys.stderr.write("Could not DELETE {0}".format(url))
        sys.stderr.write("\n")
        sys.stderr.write(str(e))
        sys.stderr.write("\n")
        abort(500, "Could not DELETE {0}".format(url))

    if r.status_code != 200 and r.status_code != 204:
        abort(r.status_code)

    return ""
failuregenerator.py 文件源码 项目:gremlin 作者: amalgam8 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def clear_rules_from_all_proxies(self):
        """
            Clear fault injection rules from all known service proxies.
        """
        if self.debug:
            print 'Clearing rules'
        try:
            headers = {"Content-Type" : "application/json"}
            if self.a8_controller_token != "" :
                headers['Authorization'] = "Bearer " + self.a8_controller_token
            for rule_id in self._rule_ids:
                resp = requests.delete(self.a8_controller_url + "?id=" + rule_id,
                                       headers = headers)
                resp.raise_for_status()
        except requests.exceptions.ConnectionError, e:
            print "FAILURE: Could not communicate with control plane %s" % self.a8_controller_url
            print e
            sys.exit(3)

    #TODO: Create a plugin model here, to support gremlinproxy and nginx
slm.py 文件源码 项目:son-mano-framework 作者: sonata-nfv 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def roll_back_instantiation(self, serv_id):
        """
        This method tries to roll back the instantiation workflow if an error
        occured. It will send messages to the SMR and the IA to remove deployed
        SSMs, FSMs and stacks. It will instruct the Repositories to delete the
        records.
        """

        # Kill the stack
        corr_id = str(uuid.uuid4())
        payload = json.dumps({'instance_uuid': serv_id})
        self.manoconn.notify(t.IA_REMOVE,
                             payload,
                             reply_to=t.IA_REMOVE,
                             correlation_id=corr_id)

        # Kill the SSMs and FSMs
        self.terminate_ssms(serv_id, require_resp=False)

        self.terminate_fsms(serv_id, require_resp=False)

        LOG.info("Instantiation aborted, cleanup completed")

        # TODO: Delete the records
test_scex.py 文件源码 项目:son-mano-framework 作者: sonata-nfv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def tearDown(self):
        #Killing the scaling Executive
        if self.plex_proc is not None:
            self.plex_proc.terminate()
        del self.plex_proc

        #Killing the connection with the broker
        try:
            self.manoconn.stop_connection()
            self.sm_connection.stop_connection()
        except Exception as e:
            LOG.exception("Stop connection exception.")

        #Clearing the threading helpers
        del self.wait_for_event1
        del self.wait_for_event2

        url_user = "{0}/api/users/specific-management".format(self.man_host)
        url_vhost = "{0}/api/vhosts/fsm-1234".format(self.man_host)
        requests.delete(url=url_user, headers=self.headers, auth=('guest', 'guest'))
        requests.delete(url=url_vhost, headers=self.headers, auth=('guest', 'guest'))

    #Method that terminates the timer that waits for an event
SigSci.py 文件源码 项目:SigSciApiPy 作者: signalsciences 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def delete_configuration(self, EP):
        try:
            url = self.base_url + self.CORPS_EP + self.corp + self.SITES_EP + self.site + EP

            with open(self.file) as data_file:
                data = json.load(data_file)

            for config in data['data']:
                requests.delete(url + "/" + config['id'], cookies=self.authn.cookies, headers=self.get_headers())

            print("Delete complete!")

        except Exception as e:
            print('Error: %s ' % str(e))
            print('Query: %s ' % url)
            quit()
transfer_knowledgebase.py 文件源码 项目:zendesk-utils 作者: trailbehind 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def delete_all_zendesk_articles(self, category):
    '''
        delete all articles in the category being filled with UV articles
        for our app, we had multiple uservoice knowledge bases, and each gets put in a zendesk category
    '''

    print "**DELETING ALL SECTIONS/ARTICLES in destination category {}".format(category)
    url = '{}/api/v2/help_center/categories/{}/sections.json'.format(self.zendesk_url, category)
    response = requests.get(url, headers=self.headers, auth=self.credentials)
    if response.status_code != 200:
      print('FAILED to delete articles with error {}'.format(response.status_code))
      exit()
    sections = response.json()['sections']
    section_ids=list(section['id'] for section in sections)

    for section_id in section_ids:
      url = "{}/api/v2/help_center/sections/{}.json".format(self.zendesk_url, section_id)
      response = requests.delete(url, headers=self.headers, auth=self.credentials)
      if response.status_code != 204:
        print('FAILED to delete sections for category {} with error {}'.format(category, response.status_code))
        exit()
common.py 文件源码 项目:tintri-python-sdk 作者: Tintri 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def _send_raw_http_request(self, method, url, data=None):
        self.__logger.debug('%s %s' % (method, url))
        if method in ['POST', 'PUT', 'PATCH']:
            self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data)

        headers = {'content-type': 'application/json'}
        if self.__session_id:
            headers['cookie'] = 'JSESSIONID=%s' % self.__session_id

        if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
            if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False)
            elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False)
            elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False)
            elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False)
            elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False)
            self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe
            return httpresp
        else:
            raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
phpIPAM.py 文件源码 项目:phpIPAM 作者: michaelluich 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def sections_delete(self, section_id,):
        headers = {'token': self.token}
        p = requests.delete(self.base + "sections/%s/" %(section_id), headers=headers)
        print p.text
        sections_delete = json.loads(p.text)

        if p.status_code != 200:
            logging.error("phpipam.sections_delete: Failure %s" % (p.status_code))
            logging.error(sections_delete)
            self.error = p.status_code
            self.error_message = sections_delete['message']
            return self.error, self.error_message

        if not sections_delete['success']:
            logging.error("phpipam.sections_delete: FAILURE: %s" % (sections_delete['code']))
            self.error = sections_delete['code']
            return self.error

        logging.info("phpipam.sections_delete: success %s" % (sections_delete['success']))
        return sections_delete['code']
phpIPAM.py 文件源码 项目:phpIPAM 作者: michaelluich 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def subnet_delete(self, subnet_id, ):
        headers = {'token': self.token}
        p = requests.delete(self.base + "subnets/%s/" % (subnet_id), headers=headers)
        print p.text
        subnets_delete = json.loads(p.text)

        if p.status_code != 200:
            logging.error("phpipam.subnets_delete: Failure %s" % (p.status_code))
            logging.error(subnets_delete)
            self.error = p.status_code
            self.error_message = subnets_delete['message']
            return self.error, self.error_message

        if not subnets_delete['success']:
            logging.error("phpipam.subnets_delete: FAILURE: %s" % (subnets_delete['code']))
            self.error = subnets_delete['code']
            return self.error

        logging.info("phpipam.subnets_delete: success %s" % (subnets_delete['success']))
        return subnets_delete['code']
phpIPAM.py 文件源码 项目:phpIPAM 作者: michaelluich 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def vlan_delete(self, vlan_id, ):
        headers = {'token': self.token}
        p = requests.delete(self.base + "vlans/%s/" % (vlan_id), headers=headers)
        print p.text
        vlans_delete = json.loads(p.text)

        if p.status_code != 200:
            logging.error("phpipam.vlans_delete: Failure %s" % (p.status_code))
            logging.error(vlans_delete)
            self.error = p.status_code
            self.error_message = vlans_delete['message']
            return self.error, self.error_message

        if not vlans_delete['success']:
            logging.error("phpipam.vlans_delete: FAILURE: %s" % (vlans_delete['code']))
            self.error = vlans_delete['code']
            return self.error

        logging.info("phpipam.vlans_delete: success %s" % (vlans_delete['success']))
        return vlans_delete['code']
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def delete_domain_whitelist():
    """ Deletes the domain whitelist set previously .

    :usage:

        >>> response = fbbotw.delete_domain_whitelist()
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    :facebook docs: `/domain-whitelisting#delete <https://developers.facebook.\
    com/docs/messenger-platform/messenger-profile/domain-whitelisting#delete>`_
    """
    url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
    payload = {}
    payload['fields'] = ["whitelisted_domains"]
    data = json.dumps(payload)
    status = requests.delete(url, headers=HEADER, data=data)
    return status
pdns.py 文件源码 项目:desec-stack 作者: desec-io 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _pdns_delete(url):
    # We first delete the zone from nslord, the main authoritative source of our DNS data.
    # However, we do not want to wait for the zone to expire on the slave ("nsmaster").
    # We thus issue a second delete request on nsmaster to delete the zone there immediately.
    r1 = requests.delete(settings.NSLORD_PDNS_API + url, headers=headers_nslord)
    if r1.status_code < 200 or r1.status_code >= 300:
        # Deletion technically does not fail if the zone didn't exist in the first place
        if r1.status_code == 422 and 'Could not find domain' in r1.text:
            pass
        else:
            raise PdnsException(r1)

    # Delete from nsmaster as well
    r2 = requests.delete(settings.NSMASTER_PDNS_API + url, headers=headers_nsmaster)
    if r2.status_code < 200 or r2.status_code >= 300:
        # Deletion technically does not fail if the zone didn't exist in the first place
        if r2.status_code == 422 and 'Could not find domain' in r2.text:
            pass
        else:
            raise PdnsException(r2)

    return (r1, r2)
ds_api_logging.py 文件源码 项目:esignature-recipes-python 作者: docusign 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def logs_download():
    """Download (and then delete) the logs from DocuSign

    The logs are stored in files/log/<account_id>/log_<timestamp>_<orig_name>
    <orig_name> is the original name of the file from the platform. Eg:
        00_OK_GetAccountSharedAccess.txt
        01_OK_GetFolderList.txt
        02_OK_ExecuteLoggedApiBusinessLogic.txt
        03_Created_RequestRecipientToken.txt
        04_OK_GetEnvelope.txt
        05_OK_GetEnvelope.txt
        06_OK_SendEnvelope.txt

    Returns {err: False or a problem string, entries: log_entries, err_code }
        returns an array of just the new log_entries
    """
    global auth
    auth = ds_authentication.get_auth()
    if auth["err"]:
        return {"err": auth["err"], "err_code": auth["err_code"]}
    r = logging_do_download() # returns {err, new_entries}
    return r
client.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
webdriver.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def httpdo(method, url, data=None):
    """
    Do HTTP Request
    """
    if isinstance(data, dict):
        data = json.dumps(data)
    if DEBUG:
        print "Shell: curl -X {method} -d '{data}' '{url}'".format(method=method, data=data or '', url=url)

    fn = dict(GET=requests.get, POST=requests.post, DELETE=requests.delete)[method]
    response = fn(url, data=data)
    retjson = response.json()
    if DEBUG:
        print 'Return:', json.dumps(retjson, indent=4)
    r = convert(retjson)
    if r.status != 0:
        raise WebDriverError(r.status, r.value)
    return r
logclient.py 文件源码 项目:aliyun-log-python-sdk 作者: aliyun 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def delete_logstore(self, project_name, logstore_name):
        """ delete log store
        Unsuccessful opertaion will cause an LogException.

        :type project_name: string
        :param project_name: the Project name 

        :type logstore_name: string
        :param logstore_name: the logstore name

        :return: DeleteLogStoreResponse

        :raise: LogException
        """
        headers = {}
        params = {}
        resource = "/logstores/" + logstore_name
        (resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
        return DeleteLogStoreResponse(header, resp)
logclient.py 文件源码 项目:aliyun-log-python-sdk 作者: aliyun 项目源码 文件源码 阅读 120 收藏 0 点赞 0 评论 0
def delete_shard(self, project_name, logstore_name, shardId):
        """ delete a readonly shard 
        Unsuccessful opertaion will cause an LogException.

        :type project_name: string
        :param project_name: the Project name 

        :type logstore_name: string
        :param logstore_name: the logstore name

        :type shardId: int
        :param shardId: the read only shard id

        :return: ListShardResponse

        :raise: LogException
        """
        headers = {}
        params = {}
        resource = "/logstores/" + logstore_name + "/shards/" + str(shardId)
        (resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
        return DeleteShardResponse(header, resp)
logclient.py 文件源码 项目:aliyun-log-python-sdk 作者: aliyun 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def delete_index(self, project_name, logstore_name):
        """ delete index of a logstore
        Unsuccessful opertaion will cause an LogException.

        :type project_name: string
        :param project_name: the Project name 

        :type logstore_name: string
        :param logstore_name: the logstore name

        :return: DeleteIndexResponse

        :raise: LogException
        """

        headers = {}
        params = {}
        resource = "/logstores/" + logstore_name + "/index"
        (resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
        return DeleteIndexResponse(header, resp)
logclient.py 文件源码 项目:aliyun-log-python-sdk 作者: aliyun 项目源码 文件源码 阅读 86 收藏 0 点赞 0 评论 0
def delete_logtail_config(self, project_name, config_name):
        """ delete logtail config in a project
        Unsuccessful opertaion will cause an LogException.

        :type project_name: string
        :param project_name: the Project name 

        :type config_name: string
        :param config_name: the logtail config name

        :return: DeleteLogtailConfigResponse

        :raise: LogException
        """

        headers = {}
        params = {}
        resource = "/configs/" + config_name
        (resp, headers) = self._send("DELETE", project_name, None, resource, params, headers)
        return DeleteLogtailConfigResponse(headers, resp)


问题


面经


文章

微信
公众号

扫码关注公众号