python类basestring()的实例源码

data_generator.py 文件源码 项目:feagen 作者: ianlini 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def generate(self, data_keys, dag_output_path=None):
        if isinstance(data_keys, basestring):
            data_keys = (data_keys,)
        involved_dag, generation_order = self.build_involved_dag(data_keys)
        if dag_output_path is not None:
            draw_dag(involved_dag, dag_output_path)

        # generate data
        for node in generation_order:
            node_attr = involved_dag.node[node]
            if node_attr['skipped']:
                continue
            self._generate_one(
                involved_dag, node, node_attr['func_name'],
                node_attr['handler'], node_attr['handler_kwargs'],
                node_attr['__re_args__'], node_attr['mode'])

        return involved_dag
bundling.py 文件源码 项目:feagen 作者: ianlini 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_data_keys_from_structure(structure):
    data_keys = []

    def _get_data_keys_from_structure(structure):
        if isinstance(structure, basestring):
            data_keys.append(structure)
        elif isinstance(structure, list):
            data_keys.extend(structure)
        elif isinstance(structure, dict):
            for _, val in six.viewitems(structure):
                _get_data_keys_from_structure(val)
        else:
            raise TypeError("The bundle structure only support "
                            "dict, list and str.")
    _get_data_keys_from_structure(structure)

    return data_keys
tfs.py 文件源码 项目:tfs 作者: geevi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sequential(x, net, defaults = {}, name = '', reuse = None, var = {}, layers = {}):
    layers = dict(list(layers.items()) + list(predefined_layers.items()))
    y = x
    logging.info('Building Sequential Network : %s', name)

    with tf.variable_scope(name, reuse = reuse):
        for i in range(len(net)):
            ltype   = net[i][0]
            lcfg    = net[i][1] if len(net[i]) == 2 else {}
            lname   = lcfg.get('name', ltype + str(i))
            ldefs   = defaults.get(ltype, {})
            lcfg    = dict(list(ldefs.items()) + list(lcfg.items()))
            for k, v in list(lcfg.items()):
                if isinstance(v, basestring) and v[0] == '$':
                    # print var, v
                    lcfg[k] = var[v[1:]]
            y  = layers[ltype](y, lname, **lcfg)
            logging.info('\t %s \t %s', lname, y.get_shape().as_list())
        return y
extract.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def extract_database_schema(pymongo_database, collection_names=None):
    """ Extract the database schema, for every collection in collection_names

    :param pymongo_database: pymongo.database.Database
    :param collection_names: str, list of str, default None
    :return database_schema: dict
    """
    if isinstance(collection_names, basestring):
        collection_names = [collection_names]

    database_collections = pymongo_database.collection_names(include_system_collections=False)
    if collection_names is None:
        collection_names = database_collections
    else:
        collection_names = [col for col in collection_names if col in database_collections]

    database_schema = dict()
    for collection in collection_names:
        logger.info('...collection %s', collection)
        pymongo_collection = pymongo_database[collection]
        database_schema[collection] = extract_collection_schema(pymongo_collection)

    return database_schema
metric.py 文件源码 项目:snap-plugin-lib-py 作者: intelsdi-x 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def data(self, value):
        if isinstance(value, bool):
            self._data_type = bool
            self._pb.bool_data = value
        elif isinstance(value, int):
            self._data_type = int
            self._pb.int64_data = value
        elif isinstance(value, float):
            self._data_type = float
            self._pb.float64_data = value
        elif isinstance(value, basestring):
            self._data_type = str
            self._pb.string_data = value
        elif isinstance(value, bytes):
            self._data_type = bytes
            self._pb.bytes_data = value
        else:
            raise TypeError("Unsupported data type '{}'.  (Supported: "
                            "int, long, float, str and bool)".format(value))
config_policy.py 文件源码 项目:snap-plugin-lib-py 作者: intelsdi-x 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _check_key(key):
    errors = []
    if isinstance(key, tuple):
        for i in key:
            if not isinstance(i, basestring):
                errors.append("Expected: string, Received: {}:{}"
                              .format(type(i), i))
    elif isinstance(key, basestring):
        return (key,), key
    elif key is None:
        return "", ""
    else:
        raise TypeError("Expected: tuple of strings, Received: {}"
                        .format(type(key)))
    if len(errors) > 0:
        raise TypeError(errors)
    return key, ".".join(key)
ccc.py 文件源码 项目:py-cloud-compute-cannon 作者: Autodesk 项目源码 文件源码 阅读 405 收藏 0 点赞 0 评论 0
def submit(self, job):
        # TODO: inherit docstring
        self._check_job(job)

        # Wraps the main command to copy inputs into working dir and copy outputs out
        cmdstring = ['export CCC_WORKDIR=`pwd`']
        if job.inputs:
            cmdstring = ['cp -rf /inputs/* .'] + cmdstring
        if isinstance(job.command, basestring):
            cmdstring.append(job.command)
        else:
            cmdstring.append(' '.join(job.command))

        cmdstring.append('cd $CCC_WORKDIR && cp -r * /outputs 2>/dev/null')

        returnval = self.proxy.submitjob(image=job.image,
                                         command=['sh', '-c', ' && '.join(cmdstring)],
                                         inputs=job.inputs,
                                         cpus=job.numcpus,  # how is this the "minimum"?
                                         maxDuration=1000*job.runtime,
                                         workingDir='/workingdir')

        job.jobid = returnval['jobId']
        job._result_json = None
sensors.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(
            self,
            partition_names,
            metastore_conn_id='metastore_default',
            poke_interval=60 * 3,
            *args,
            **kwargs):
        super(NamedHivePartitionSensor, self).__init__(
            poke_interval=poke_interval, *args, **kwargs)

        if isinstance(partition_names, basestring):
            raise TypeError('partition_names must be an array of strings')

        self.metastore_conn_id = metastore_conn_id
        self.partition_names = partition_names
        self.next_poke_idx = 0
views.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def configure(self):
        # process the parsed view variables
        if 'path' not in self.vars:
            raise MissingParameterError("Missing variable \"path\" in view: %s" % self.name)
        self.path = self.vars['path']

        # if it's a complex view
        if isinstance(self.path, dict):
            self.configure_complex_view(self.path)
        elif isinstance(self.path, basestring):
            self.configure_simple_view(self.path)
        else:
            raise ValueError("Unrecognised structure for \"path\" configuration in view: %s" % self.name)

        # if we don't have a template yet
        if self.template is None:
            # try to load it
            if 'template' not in self.vars:
                raise MissingParameterError("Missing variable \"template\" in view: %s" % self.name)
            self.template = self.template_engine.load_template(self.vars['template'])

        self.configure_context()
plugin.py 文件源码 项目:pytest-logger 作者: aurzenligl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _loggers_from_logcfg(logcfg, logopt):
    def to_stdout(loggers, opt):
        def one(loggers, one):
            if isinstance(one, basestring):
                return one, next(row for row in loggers if one in row[0])[1]
            else:
                return one
        return [one(loggers, x) for x in opt]

    def to_file(loggers):
        return [(name, row[2]) for row in loggers for name in row[0]]

    return Loggers(
        stdout=to_stdout(logcfg._loggers, logopt),
        file_=to_file(logcfg._loggers)
    )
test_futurize.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_basestring(self):
        """
        The 2to3 basestring fixer breaks working Py2 code that uses basestring.
        This tests whether something sensible is done instead.
        """
        before = """
        assert isinstance('hello', basestring)
        assert isinstance(u'hello', basestring)
        assert isinstance(b'hello', basestring)
        """
        after = """
        from past.builtins import basestring
        assert isinstance('hello', basestring)
        assert isinstance(u'hello', basestring)
        assert isinstance(b'hello', basestring)
        """
        self.convert_check(before, after)
test_futurize.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_basestring_issue_156(self):
        before = """
        x = str(3)
        allowed_types = basestring, int
        assert isinstance('', allowed_types)
        assert isinstance(u'', allowed_types)
        assert isinstance(u'foo', basestring)
        """
        after = """
        from builtins import str
        from past.builtins import basestring
        x = str(3)
        allowed_types = basestring, int
        assert isinstance('', allowed_types)
        assert isinstance(u'', allowed_types)
        assert isinstance(u'foo', basestring)
        """
        self.convert_check(before, after)
test_futurize.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_basestring(self):
        """
        In conservative mode, futurize would not modify "basestring"
        but merely import it from ``past``, and the following code would still
        run on both Py2 and Py3.
        """
        before = """
        assert isinstance('hello', basestring)
        assert isinstance(u'hello', basestring)
        assert isinstance(b'hello', basestring)
        """
        after = """
        from past.builtins import basestring
        assert isinstance('hello', basestring)
        assert isinstance(u'hello', basestring)
        assert isinstance(b'hello', basestring)
        """
        self.convert_check(before, after, conservative=True)
restsession.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def abs_url(self, url):
        """Given a relative or absolute URL; return an absolute URL.

        Args:
            url(basestring): A relative or absolute URL.

        Returns:
            str: An absolute URL.

        """
        parsed_url = urllib.parse.urlparse(url)
        if not parsed_url.scheme and not parsed_url.netloc:
            # url is a relative URL; combine with base_url
            return urllib.parse.urljoin(str(self.base_url), str(url))
        else:
            # url is already an absolute URL; return as is
            return url
restsession.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self, url, params=None, **kwargs):
        """Sends a GET request.

        Args:
            url(basestring): The URL of the API endpoint.
            params(dict): The parameters for the HTTP GET request.
            **kwargs:
                erc(int): The expected (success) response code for the request.
                others: Passed on to the requests package.

        Raises:
            SparkApiError: If anything other than the expected response code is
                returned by the Cisco Spark API endpoint.

        """
        assert isinstance(url, basestring)
        assert params is None or isinstance(params, dict)

        # Expected response code
        erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['GET'])

        response = self.request('GET', url, erc, params=params, **kwargs)
        return extract_and_parse_json(response)
restsession.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def post(self, url, json=None, data=None, **kwargs):
        """Sends a POST request.

        Args:
            url(basestring): The URL of the API endpoint.
            json: Data to be sent in JSON format in tbe body of the request.
            data: Data to be sent in the body of the request.
            **kwargs:
                erc(int): The expected (success) response code for the request.
                others: Passed on to the requests package.

        Raises:
            SparkApiError: If anything other than the expected response code is
                returned by the Cisco Spark API endpoint.

        """
        assert isinstance(url, basestring)

        # Expected response code
        erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['POST'])

        response = self.request('POST', url, erc, json=json, data=data,
                                **kwargs)
        return extract_and_parse_json(response)
restsession.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def put(self, url, json=None, data=None, **kwargs):
        """Sends a PUT request.

        Args:
            url(basestring): The URL of the API endpoint.
            json: Data to be sent in JSON format in tbe body of the request.
            data: Data to be sent in the body of the request.
            **kwargs:
                erc(int): The expected (success) response code for the request.
                others: Passed on to the requests package.

        Raises:
            SparkApiError: If anything other than the expected response code is
                returned by the Cisco Spark API endpoint.

        """
        assert isinstance(url, basestring)

        # Expected response code
        erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['PUT'])

        response = self.request('PUT', url, erc, json=json, data=data,
                                **kwargs)
        return extract_and_parse_json(response)
restsession.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete(self, url, **kwargs):
        """Sends a DELETE request.

        Args:
            url(basestring): The URL of the API endpoint.
            **kwargs:
                erc(int): The expected (success) response code for the request.
                others: Passed on to the requests package.

        Raises:
            SparkApiError: If anything other than the expected response code is
                returned by the Cisco Spark API endpoint.

        """
        assert isinstance(url, basestring)

        # Expected response code
        erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['DELETE'])

        self.request('DELETE', url, erc, **kwargs)
people.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self, personId):
        """Get a person's details, by ID.

        Args:
            personId(basestring): The ID of the person to be retrieved.

        Returns:
            Person: A Person object with the details of the requested person.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(personId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('people/' + personId)

        # Return a Person object created from the response JSON data
        return Person(json_data)
people.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete(self, personId):
        """Remove a person from the system.

        Only an admin can remove a person.

        Args:
            personId(basestring): The ID of the person to be deleted.

        Raises:
            AssertionError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(personId, basestring, may_be_none=False)

        # API request
        self._session.delete('people/' + personId)
access_tokens.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, base_url, timeout=None):
        """Initialize an AccessTokensAPI object with the provided RestSession.

        Args:
            base_url(basestring): The base URL the API endpoints.
            timeout(int): Timeout in seconds for the API requests.

        Raises:
            TypeError: If the parameter types are incorrect.

        """
        check_type(base_url, basestring, may_be_none=False)
        check_type(timeout, int)

        super(AccessTokensAPI, self).__init__()

        self._base_url = str(validate_base_url(base_url))
        self._timeout = timeout
        self._endpoint_url = urllib.parse.urljoin(self.base_url, API_ENDPOINT)
        self._request_kwargs = {"timeout": timeout}
memberships.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(self, membershipId):
        """Get details for a membership, by ID.

        Args:
            membershipId(basestring): The membership ID.

        Returns:
            Membership: A Membership object with the details of the requested
                membership.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(membershipId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('memberships/' + membershipId)

        # Return a Membership object created from the response JSON data
        return Membership(json_data)
organizations.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self, orgId):
        """Get the details of an Organization, by ID.

        Args:
            orgId(basestring): The ID of the Organization to be retrieved.

        Returns:
            Organization: An Organization object with the details of the
                requested organization.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(orgId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('organizations/' + orgId)

        # Return a Organization object created from the returned JSON object
        return Organization(json_data)
roles.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get(self, roleId):
        """Get the details of a Role, by ID.

        Args:
            roleId(basestring): The ID of the Role to be retrieved.

        Returns:
            Role: A Role object with the details of the requested Role.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(roleId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('roles/' + roleId)

        # Return a Role object created from the returned JSON object
        return Role(json_data)
webhooks.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self, webhookId):
        """Get the details of a webhook, by ID.

        Args:
            webhookId(basestring): The ID of the webhook to be retrieved.

        Returns:
            Webhook: A Webhook object with the details of the requested
                webhook.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(webhookId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('webhooks/' + webhookId)

        # Return a Webhook object created from the response JSON data
        return Webhook(json_data)
rooms.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self, roomId):
        """Get the details of a room, by ID.

        Args:
            roomId(basestring): The ID of the room to be retrieved.

        Returns:
            Room: A Room object with the details of the requested room.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(roomId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('rooms/' + roomId)

        # Return a Room object created from the response JSON data
        return Room(json_data)
licenses.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self, licenseId):
        """Get the details of a License, by ID.

        Args:
            licenseId(basestring): The ID of the License to be retrieved.

        Returns:
            License: A License object with the details of the requested
                License.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(licenseId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('licenses/' + licenseId)

        # Return a License object created from the returned JSON object
        return License(json_data)
teams.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self, teamId):
        """Get the details of a team, by ID.

        Args:
            teamId(basestring): The ID of the team to be retrieved.

        Returns:
            Team: A Team object with the details of the requested team.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(teamId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('teams/' + teamId)

        # Return a Team object created from the response JSON data
        return Team(json_data)
messages.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self, messageId):
        """Get the details of a message, by ID.

        Args:
            messageId(basestring): The ID of the message to be retrieved.

        Returns:
            Message: A Message object with the details of the requested
                message.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(messageId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('messages/' + messageId)

        # Return a Message object created from the response JSON data
        return Message(json_data)
team_memberships.py 文件源码 项目:ciscosparkapi 作者: CiscoDevNet 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self, membershipId):
        """Get details for a team membership, by ID.

        Args:
            membershipId(basestring): The team membership ID.

        Returns:
            TeamMembership: A TeamMembership object with the details of the
                requested team membership.

        Raises:
            TypeError: If the parameter types are incorrect.
            SparkApiError: If the Cisco Spark cloud returns an error.

        """
        check_type(membershipId, basestring, may_be_none=False)

        # API request
        json_data = self._session.get('team/memberships/' + membershipId)

        # Return a TeamMembership object created from the response JSON data
        return TeamMembership(json_data)


问题


面经


文章

微信
公众号

扫码关注公众号