python类get()的实例源码

gym_http_server.py 文件源码 项目:gym-http-api 作者: openai 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_optional_param(json, param, default):
    if json is None:
        logger.info("Request is not a valid json")
        raise InvalidUsage("Request is not a valid json")
    value = json.get(param, None)
    if (value is None) or (value=='') or (value==[]):
        logger.info("An optional request parameter '{}' had value {} and was replaced with default value {}".format(param, value, default))
        value = default
    return value
gym_http_server.py 文件源码 项目:gym-http-api 作者: openai 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def shutdown():
    """ Request a server shutdown - currently used by the integration tests to repeatedly create and destroy fresh copies of the server running in a separate thread"""
    f = request.environ.get('werkzeug.server.shutdown')
    f()
    return 'Server shutting down'
github.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def gitignore_template(self, language):
        """Return the template for language.

        :returns: str
        """
        url = self._build_url('gitignore', 'templates', language)
        json = self._json(self._get(url), 200)
        if not json:
            return ''
        return json.get('source', '')
github.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def admin_stats(self, option):
        """This is a simple way to get statistics about your system.

        :param str option: (required), accepted values: ('all', 'repos',
            'hooks', 'pages', 'orgs', 'users', 'pulls', 'issues',
            'milestones', 'gists', 'comments')
        :returns: dict
        """
        stats = {}
        if option.lower() in ('all', 'repos', 'hooks', 'pages', 'orgs',
                              'users', 'pulls', 'issues', 'milestones',
                              'gists', 'comments'):
            url = self._build_url('enterprise', 'stats', option.lower())
            stats = self._json(self._get(url), 200)
        return stats
utils.py 文件源码 项目:tulen 作者: detorto 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def rl_dispatch():
    set_event()
    while True:
        rated_queue.get()
        set_event();
utils.py 文件源码 项目:tulen 作者: detorto 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def read_minfo(method):
    json = load_json("./files/minfo.json")
    if json:
        return str(json.get(method,"no method info"))
    else: 
        return "no info at all"
utils.py 文件源码 项目:tulen 作者: detorto 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def update_minfo(method, type, incr):
    with json_lock:
        json = load_json("./files/minfo.json")
        if not json:
           json = {"method":{"type":incr}}
        else:
           mmap = json.get(method, {})
           mmap[type] = mmap.get(type,0)+incr
           json[method] = mmap
        open("./files/minfo.json","w").write(pretty_dump(json))
conda_api.py 文件源码 项目:anaconda-project 作者: Anaconda-Platform 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def environ_get_prefix(environ):
    for name in _all_prefix_variables:
        if name in environ:
            return environ.get(name)
    return None
conda_api.py 文件源码 项目:anaconda-project 作者: Anaconda-Platform 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def current_platform():
    m = platform.machine()
    if m in _non_x86_linux_machines:
        return 'linux-%s' % m
    else:
        _platform_map = {'linux2': 'linux', 'linux': 'linux', 'darwin': 'osx', 'win32': 'win', }
        p = _platform_map.get(sys.platform, 'unknown')
        return '%s-%d' % (p, (8 * tuple.__itemsize__))
client.py 文件源码 项目:PyPardot4 作者: mneedham91 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _check_response(response):
        """
        Checks the HTTP response to see if it contains JSON. If it does, checks the JSON for error codes and messages.
        Raises PardotAPIError if an error was found. If no error was found, returns the JSON. If JSON was not found,
        returns the response status code.
        """
        if response.headers.get('content-type') == 'application/json':
            json = response.json()
            error = json.get('err')
            if error:
                raise PardotAPIError(json_response=json)
            return json
        else:
            return response.status_code
client.py 文件源码 项目:PyPardot4 作者: mneedham91 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def authenticate(self):
        """
         Authenticates the user and sets the API key if successful. Returns True if authentication is successful,
         False if authentication fails.
        """
        try:
            auth = self.post('login', params={'email': self.email, 'password': self.password})
            self.api_key = auth.get('api_key')
            if self.api_key is not None:
                return True
            return False
        except PardotAPIError:
            return False
wit.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __run_actions(self, session_id, current_request, message, context, i,
                      verbose):
        if i <= 0:
            raise WitError('Max steps reached, stopping.')
        json = self.converse(session_id, message, context, verbose=verbose)
        if 'type' not in json:
            raise WitError('Couldn\'t find type in Wit response')
        if current_request != self._sessions[session_id]:
            return context

        self.logger.debug('Context: %s', context)
        self.logger.debug('Response type: %s', json['type'])

        # backwards-cpmpatibility with API version 20160516
        if json['type'] == 'merge':
            json['type'] = 'action'
            json['action'] = 'merge'

        if json['type'] == 'error':
            raise WitError('Oops, I don\'t know what to do.')

        if json['type'] == 'stop':
            return context

        request = {
            'session_id': session_id,
            'context': dict(context),
            'text': message,
            'entities': json.get('entities'),
        }
        if json['type'] == 'msg':
            self.throw_if_action_missing('send')
            response = {
                'text': json.get('msg').encode('utf8'),
                'quickreplies': json.get('quickreplies'),
            }
            self.actions['send'](request, response)
        elif json['type'] == 'action':
            action = json['action']
            self.throw_if_action_missing(action)
            context = self.actions[action](request)
            if context is None:
                self.logger.warn('missing context - did you forget to return it?')
                context = {}
        else:
            raise WitError('unknown type: ' + json['type'])
        if current_request != self._sessions[session_id]:
            return context
        return self.__run_actions(session_id, current_request, None, context,
                                  i - 1, verbose)
commands.py 文件源码 项目:dockercloud-cli 作者: docker 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def service_ps(quiet, status, stack):
    try:
        headers = ["NAME", "UUID", "STATUS", "#CONTAINERS", "IMAGE", "DEPLOYED", "PUBLIC DNS", "STACK"]

        stack_resource_uri = None
        if stack:
            s = dockercloud.Utils.fetch_remote_stack(stack, raise_exceptions=False)
            if isinstance(s, dockercloud.NonUniqueIdentifier):
                raise dockercloud.NonUniqueIdentifier(
                    "Identifier %s matches more than one stack, please use UUID instead" % stack)
            if isinstance(s, dockercloud.ObjectNotFound):
                raise dockercloud.ObjectNotFound("Identifier '%s' does not match any stack" % stack)
            stack_resource_uri = s.resource_uri
        service_list = dockercloud.Service.list(state=status, stack=stack_resource_uri)

        data_list = []
        long_uuid_list = []
        has_unsynchronized_service = False
        stacks = {}
        for stack in dockercloud.Stack.list():
            stacks[stack.resource_uri] = stack.name
        for service in service_list:
            service_state = utils.add_unicode_symbol_to_state(service.state)
            if not service.synchronized and service.state != "Redeploying":
                service_state += "(*)"
                has_unsynchronized_service = True
            data_list.append([service.name, service.uuid[:8],
                              service_state,
                              service.current_num_containers,
                              service.image_name,
                              utils.get_humanize_local_datetime_from_utc_datetime_string(service.deployed_datetime),
                              service.public_dns,
                              stacks.get(service.stack)])
            long_uuid_list.append(service.uuid)
        if len(data_list) == 0:
            data_list.append(["", "", "", "", "", ""])

        if quiet:
            for uuid in long_uuid_list:
                print(uuid)
        else:
            utils.tabulate_result(data_list, headers)
            if has_unsynchronized_service:
                print(
                    "\n(*) Please note that this service needs to be redeployed to "
                    "have its configuration changes applied")
    except Exception as e:
        print(e, file=sys.stderr)
        sys.exit(EXCEPTION_EXIT_CODE)
commands.py 文件源码 项目:dockercloud-cli 作者: docker 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def service_create(image, name, cpu_shares, memory, privileged, target_num_containers, run_command, entrypoint,
                   expose, publish, envvars, envfiles, tag, linked_to_service, autorestart, autodestroy, autoredeploy,
                   roles, sequential, volume, volumes_from, deployment_strategy, sync, net, pid):
    has_exception = False
    try:
        ports = utils.parse_published_ports(publish)

        # Add exposed_port to ports, excluding whose inner_port that has been defined in published ports
        exposed_ports = utils.parse_exposed_ports(expose)
        for exposed_port in exposed_ports:
            existed = False
            for port in ports:
                if exposed_port.get('inner_port', '') == port.get('inner_port', ''):
                    existed = True
                    break
            if not existed:
                ports.append(exposed_port)

        envvars = utils.parse_envvars(envvars, envfiles)
        links_service = utils.parse_links(linked_to_service, 'to_service')

        tags = []
        if tag:
            if isinstance(tag, list):
                for t in tag:
                    tags.append({"name": t})
            else:
                tags.append({"name": tag})

        bindings = utils.parse_volume(volume)
        bindings.extend(utils.parse_volumes_from(volumes_from))

        service = dockercloud.Service.create(image=image, name=name, cpu_shares=cpu_shares,
                                             memory=memory, privileged=privileged,
                                             target_num_containers=target_num_containers, run_command=run_command,
                                             entrypoint=entrypoint, container_ports=ports, container_envvars=envvars,
                                             linked_to_service=links_service,
                                             autorestart=autorestart, autodestroy=autodestroy,
                                             autoredeploy=autoredeploy,
                                             roles=roles, sequential_deployment=sequential, tags=tags,
                                             bindings=bindings,
                                             deployment_strategy=deployment_strategy, net=net, pid=pid)
        result = service.save()
        if not utils.sync_action(service, sync):
            has_exception = True
        if result:
            print(service.uuid)
    except Exception as e:
        print(e, file=sys.stderr)
        has_exception = True
    if has_exception:
        sys.exit(EXCEPTION_EXIT_CODE)
commands.py 文件源码 项目:dockercloud-cli 作者: docker 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def service_run(image, name, cpu_shares, memory, privileged, target_num_containers, run_command, entrypoint,
                expose, publish, envvars, envfiles, tag, linked_to_service, autorestart, autodestroy, autoredeploy,
                roles, sequential, volume, volumes_from, deployment_strategy, sync, net, pid):
    has_exception = False
    try:
        ports = utils.parse_published_ports(publish)

        # Add exposed_port to ports, excluding whose inner_port that has been defined in published ports
        exposed_ports = utils.parse_exposed_ports(expose)
        for exposed_port in exposed_ports:
            existed = False
            for port in ports:
                if exposed_port.get('inner_port', '') == port.get('inner_port', ''):
                    existed = True
                    break
            if not existed:
                ports.append(exposed_port)

        envvars = utils.parse_envvars(envvars, envfiles)
        links_service = utils.parse_links(linked_to_service, 'to_service')

        tags = []
        if tag:
            if isinstance(tag, list):
                for t in tag:
                    tags.append({"name": t})
            else:
                tags.append({"name": tag})

        bindings = utils.parse_volume(volume)
        bindings.extend(utils.parse_volumes_from(volumes_from))

        service = dockercloud.Service.create(image=image, name=name, cpu_shares=cpu_shares,
                                             memory=memory, privileged=privileged,
                                             target_num_containers=target_num_containers, run_command=run_command,
                                             entrypoint=entrypoint, container_ports=ports, container_envvars=envvars,
                                             linked_to_service=links_service,
                                             autorestart=autorestart, autodestroy=autodestroy,
                                             autoredeploy=autoredeploy,
                                             roles=roles, sequential_deployment=sequential, tags=tags,
                                             bindings=bindings,
                                             deployment_strategy=deployment_strategy, net=net, pid=pid)
        service.save()
        result = service.start()
        if not utils.sync_action(service, sync):
            has_exception = True
        if result:
            print(service.uuid)
    except Exception as e:
        print(e, file=sys.stderr)
        has_exception = True
    if has_exception:
        sys.exit(EXCEPTION_EXIT_CODE)
commands.py 文件源码 项目:dockercloud-cli 作者: docker 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def tag_ls(identifiers, quiet):
    has_exception = False

    headers = ["IDENTIFIER", "TYPE", "TAGS"]
    data_list = []
    tags_list = []
    for identifier in identifiers:
        try:
            obj = dockercloud.Utils.fetch_remote_service(identifier, raise_exceptions=False)
            if isinstance(obj, dockercloud.ObjectNotFound):
                obj = dockercloud.Utils.fetch_remote_nodecluster(identifier, raise_exceptions=False)
                if isinstance(obj, dockercloud.ObjectNotFound):
                    obj = dockercloud.Utils.fetch_remote_node(identifier, raise_exceptions=False)
                    if isinstance(obj, dockercloud.ObjectNotFound):
                        raise dockercloud.ObjectNotFound(
                            "Identifier '%s' does not match any service, node or nodecluster" % identifier)
                    else:
                        obj_type = 'Node'
                else:
                    obj_type = 'NodeCluster'
            else:
                obj_type = 'Service'

            tagnames = []
            for tags in dockercloud.Tag.fetch(obj).list():
                tagname = tags.get('name', '')
                if tagname:
                    tagnames.append(tagname)

            data_list.append([identifier, obj_type, ' '.join(tagnames)])
            tags_list.append(' '.join(tagnames))
        except Exception as e:
            if isinstance(e, dockercloud.ObjectNotFound):
                data_list.append([identifier, 'None', ''])
            else:
                data_list.append([identifier, '', ''])
            tags_list.append('')
            print(e, file=sys.stderr)
            has_exception = True
    if quiet:
        for tags in tags_list:
            print(tags)
    else:
        utils.tabulate_result(data_list, headers)
    if has_exception:
        sys.exit(EXCEPTION_EXIT_CODE)
export_clus.py 文件源码 项目:internet-content-detection 作者: liubo0621 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    '''
    @summary:
    ---------
    @param :
    ---------
    @result:
    '''

    clues_json = get_clues()
    clues_count = len(clues_json['data'])

    clues_json = tools.dumps_json(clues_json)
    print(clues_json)
    # save_clues_to_file(clues_json)


    keys = 'pattek.com.cn'
    prpcrypt = Prpcrypt(keys)
    encrypt_text = prpcrypt.encrypt(clues_json)

    data = {'info':encrypt_text}

    # ?????
    url = 'http://192.168.60.38:8002/datasync_al/interface/cluesConfSync?'
    json = tools.get_json_by_requests(url, data = data)
    # ??????
    result = record_sync_status(clues_count, json.get("status"), json.get('message'), json.get('data'), 0)
    print(result)
    log.debug('''
        ------ ??????? -----
        %s
        ?????? %d
        '''%(json, result))

    # ?????
    url = 'http://124.205.229.232:8005/gdyq/datasync_al/interface/cluesConfSync'
    json = tools.get_json_by_requests(url, data = data)
    # ??????
    result = record_sync_status(clues_count, json.get("status"), json.get('message'), json.get('data'), 1)
    log.debug('''
        ------ ??????? -----
        %s
        ?????? %d
        '''%(json, result))
github.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def feeds(self):
        """List GitHub's timeline resources in Atom format.

        :returns: dictionary parsed to include URITemplates
        """
        def replace_href(feed_dict):
            if not feed_dict:
                return feed_dict
            ret_dict = {}
            # Let's pluck out what we're most interested in, the href value
            href = feed_dict.pop('href', None)
            # Then we update the return dictionary with the rest of the values
            ret_dict.update(feed_dict)
            if href is not None:
                # So long as there is something to template, let's template it
                ret_dict['href'] = URITemplate(href)
            return ret_dict

        url = self._build_url('feeds')
        json = self._json(self._get(url), 200, include_cache_info=False)
        if json is None:  # If something went wrong, get out early
            return None

        # We have a response body to parse
        feeds = {}

        # Let's pop out the old links so we don't have to skip them below
        old_links = json.pop('_links', {})
        _links = {}
        # If _links is in the response JSON, iterate over that and recreate it
        # so that any templates contained inside can be turned into
        # URITemplates
        for key, value in old_links.items():
            if isinstance(value, list):
                # If it's an array/list of links, let's replace that with a
                # new list of links
                _links[key] = [replace_href(d) for d in value]
            else:
                # Otherwise, just use the new value
                _links[key] = replace_href(value)

        # Start building up our return dictionary
        feeds['_links'] = _links

        for key, value in json.items():
            # This should roughly be the same logic as above.
            if isinstance(value, list):
                feeds[key] = [URITemplate(v) for v in value]
            else:
                feeds[key] = URITemplate(value)

        return feeds
conda_api.py 文件源码 项目:anaconda-project 作者: Anaconda-Platform 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parse_spec(spec):
    """Parse a package name and version spec as conda would.

    Returns:
       ``ParsedSpec`` or None on failure
    """
    if not is_string(spec):
        raise TypeError("Expected a string not %r" % spec)

    m = _spec_pat.match(spec)
    if m is None:
        return None
    name = m.group('name').lower()
    pip_constraint = m.group('pc')
    if pip_constraint is not None:
        pip_constraint = pip_constraint.replace(' ', '')

    conda_constraint = m.group('cc')

    exact_version = None
    exact_build_string = None
    if conda_constraint is not None:
        m = _conda_constraint_pat.match(conda_constraint)
        assert m is not None
        exact_version = m.group('version')
        for special in ('|', '*', ','):
            if special in exact_version:
                exact_version = None
                break
        if exact_version is not None:
            exact_build_string = m.group('build')
            if exact_build_string is not None:
                assert exact_build_string[0] == '='
                exact_build_string = exact_build_string[1:]

    return ParsedSpec(name=name,
                      conda_constraint=conda_constraint,
                      pip_constraint=pip_constraint,
                      exact_version=exact_version,
                      exact_build_string=exact_build_string)

# these are in order of preference. On pre-4.1.4 Windows,
# CONDA_PREFIX and CONDA_ENV_PATH aren't set, so we get to
# CONDA_DEFAULT_ENV.


问题


面经


文章

微信
公众号

扫码关注公众号