python类Mapping()的实例源码

multiple.py 文件源码 项目:Hanabi-AI 作者: MeGotsThis 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self,
                 username: str,
                 password: str,
                 botModule: str,
                 botconfig: Mapping,
                 numPlayers: int,
                 variant: Variant,
                 spectators: bool,
                 gameName: str,
                 *args,
                 **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.username: str = username
        self.password: str = password
        module = importlib.import_module(botModule + '.bot')
        self.botCls: Type[Bot] = module.Bot  # type: ignore
        self.botconfig: Mapping = botconfig
        self.numPlayers: int = numPlayers
        self.variant: Variant = variant
        self.spectators: bool = spectators
        self.gameName: str = gameName
        self.conn: socketIO_client.SocketIO
        self.tablePlayers: List[str] = []
        self.readyToStart: bool = False
        self.game: Optional[Game] = None
kernel.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_or_create(cls, lang: str,
                       client_token: Optional[str]=None,
                       mounts: Optional[Iterable[str]]=None,
                       envs: Optional[Mapping[str, str]]=None,
                       max_mem: int=0, exec_timeout: int=0) -> str:
        if client_token:
            assert len(client_token) > 8
        else:
            client_token = uuid.uuid4().hex
        resp = yield Request('POST', '/kernel/create', {
            'lang': lang,
            'clientSessionToken': client_token,
            'config': {
                'mounts': mounts,
                'envs': envs,
            },
            # 'limits': {
            #     'maxMem': max_mem,
            #     'execTimeout': exec_timeout,
            # },
        })
        data = resp.json()
        o = cls(data['kernelId'])  # type: ignore
        o.created = data.get('created', True)  # True is for legacy
        return o
sources.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
        dispatcher = singledispatch(method)
        provides = set()

        def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any:
            call = dispatcher.dispatch(type)
            try:
                return call(self, query, context=context)
            except TypeError:
                raise DataSource.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
            provides.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._provides = provides
        update_wrapper(wrapper, method)
        return wrapper
openRedirect.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testGETOpenRedirect(url: str, cookies: Mapping[str, str]) -> Optional[str]:
    """ If the given URL redirects when accessed with the given cookies via GET, return the new URL, otherwise
        return None """
    driver = SeleniumDrivers.getFirefoxDriver()
    driver.setCookies(url, cookies)

    try:
        driver.get(url)

        time.sleep(config.timeout)

        if driver.current_url == url:
            driver.reset()
            return None
        else:
            url = driver.current_url
            driver.reset()
            return url
    except (TimeoutException, URLError):
        driver.reset()
        return None
openRedirect.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testPOSTOpenRedirect(url: str, cookies: Mapping[str, str], data: Mapping[str, str]) -> Optional[str]:
    """ If the given URL redirects when accessed with the given cookies via POST, return the new URL, otherwise
        return None """
    driver = SeleniumDrivers.getFirefoxDriver()
    driver.setCookies(url, cookies)
    try:
        driver.post(url, data)

        time.sleep(config.timeout)

        if driver.current_url == url:
            driver.reset()
            return None
        else:
            url = driver.current_url
            driver.reset()
            return url
    except (TimeoutException, URLError):
        driver.reset()
        return None
xss.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testGETXSSDriver(url: str, cookies: Mapping[str, str], driver: webdriver) -> Optional[str]:
    """ If the given URL pops an alert box when accessed with the given cookies, return the contents of the alert box,
        otherwise return None """
    driver.setCookies(url, cookies)

    try:
        driver.get(url)

        WebDriverWait(driver, config.timeout).until(expected_conditions.alert_is_present())
        # Note that despite the name switch_to_alert also handles prompt:
        #   - http://selenium-python.readthedocs.io/navigating.html#popup-dialogs
        alert = driver.switch_to_alert()
        text = alert.text
        driver.reset()
        return text
    except (TimeoutException, URLError):
        driver.reset()
        return None
xss.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testPOSTXSSDriver(url: str, cookies: Mapping[str, str], data: Mapping[str, str], driver: webdriver) -> \
        Optional[str]:
    """ If the given URL pops an alert box when accessed with the given cookies, return the contents of the alert box,
        otherwise return None """
    driver.setCookies(url, cookies)

    try:
        driver.post(url, data)

        WebDriverWait(driver, config.timeout).until(expected_conditions.alert_is_present())
        # Note that despite the name switch_to_alert also handles prompt:
        #   - http://selenium-python.readthedocs.io/navigating.html#popup-dialogs
        alert = driver.switch_to_alert()
        text = alert.text
        driver.reset()
        return text
    except (TimeoutException, URLError):
        driver.reset()
        return None
AutoTriageUtils.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def postComment(id: str, vti: VulnTestInfo, internal=False, addStopMessage=False) -> Mapping:
    """ Post a comment to the report with the given ID using the information in the given VulnTestInfo
          - Set internal=True in order to post an internal comment
          - Set addStopMessage=True in order to add the stop message """
    if config.DEBUG:
        print("Posting comment: internal=%s, reproduced=%s, id=%s" % (str(internal), str(vti.reproduced), id))

    if addStopMessage:
        message = vti.message + '\n\n' + constants.disableMessage
    else:
        message = vti.message

    postMessage("Posting Message: \n\n%s" % message)  # TODO: Delete this

    resp = requests.post('http://api:8080/v1/sendMessage',
                         json={'message': message, 'internal': internal, 'id': id},
                         auth=HTTPBasicAuth('AutoTriageBot', secrets.apiBoxToken))

    if config.triageOnReproduce and vti.reproduced:
        changeStatus(id, 'triaged')

    return resp.json()
payout.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def suggestPayoutGivenType(db: Mapping[str, BountyInfo], domains: List[str]) -> BountyInfo:
    """ Returns a BountyInfo containing a suggested payout and the std for the given report given the DB
        for that class of vulnerability"""
    if len(domains) == 0:
        return db['average']
    sum = 0.0
    stdSum = 0.0  # Not actually the std, but good enough™
    cnt = 0
    for domain in domains:
        try:
            sum += db[domain].average
            stdSum += db[domain].std
            cnt += 1
        except KeyError:
            pass
    try:
        return BountyInfo(average=sum/cnt, std=stdSum/cnt)
    except ZeroDivisionError:
        return db['average']
ReportWrapper.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def extractJson(message: str) -> Optional[Mapping]:
    """ Returns the first json blob found in the string if any are found """
    # First pass that relies on it being in a code block
    for match in re.findall('\`\s*?{[\s\S]*?}\s*?\`', message):
        potJson = match[1:-1].strip()
        try:
            return json.loads(potJson)
        except ValueError:
            pass
    # Second pass doesn't require the code block, but it still uses the json parser
    for match in re.findall('{[\s\S]*}', message):
        try:
            return json.loads(match)
        except ValueError:
            pass
    # Third pass uses ast.literal_eval (which IS safe-it only evals literals) and some replacements to handle malformed
    # JSON. This is a horrible JSON parser and will incorrectly parse certain types of JSON, but it is far more
    # accepting so we might as well try doing this
    for match in re.findall('{[\s\S]*}', message):
        try:
            return fuzzyJsonParse(match)
        except (SyntaxError, ValueError):
            pass
    return None
tensorflow.py 文件源码 项目:jack 作者: uclmr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __call__(self, batch: Mapping[TensorPort, np.ndarray],
                 goal_ports: List[TensorPort] = None) -> Mapping[TensorPort, np.ndarray]:
        """Runs a batch and returns values/outputs for specified goal ports.
        Args:
            batch: mapping from ports to values
            goal_ports: optional output ports, defaults to output_ports of this module will be returned

        Returns:
            A mapping from goal ports to tensors.
        """
        goal_ports = goal_ports or self.output_ports
        feed_dict = self.convert_to_feed_dict(batch)
        goal_tensors = {p: self.tensors[p] for p in goal_ports
                        if p in self.output_ports or p in self.training_output_ports}
        outputs = self.tf_session.run(goal_tensors, feed_dict)

        for p in goal_ports:
            if p not in outputs and p in batch:
                outputs[p] = batch[p]

        return outputs
tensorflow.py 文件源码 项目:jack 作者: uclmr 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def create_training_output(self, shared_resources: SharedResources,
                               training_input_tensors: Mapping[TensorPort, tf.Tensor]) \
            -> Mapping[TensorPort, tf.Tensor]:
        """
        This function needs to be implemented in order to define how the module produces tensors only used
        during training given tensors corresponding to the ones defined by `training_input_ports`, which might include
        tensors corresponding to ports defined by `output_ports`. This sub-graph should only be created during training.

        Args:
            shared_resources: contains resources shared by modules, such as hyper-parameters or vocabularies.
            training_input_tensors: a mapping from training input tensorports to tensors.

        Returns:
            mapping from defined training output ports to their tensors.
        """
        raise NotImplementedError
torch.py 文件源码 项目:jack 作者: uclmr 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __call__(self, batch: Mapping[TensorPort, np.ndarray],
                 goal_ports: List[TensorPort] = None) -> Mapping[TensorPort, np.ndarray]:
        """Runs a batch and returns values/outputs for specified goal ports.
        Args:
            batch: mapping from ports to values
            goal_ports: optional output ports, defaults to output_ports of this module will be returned

        Returns:
            A mapping from goal ports to tensors.
        """
        goal_ports = goal_ports or self.output_ports
        inputs = [p.create_torch_variable(batch.get(p), gpu=torch.cuda.device_count() > 0) for p in self.input_ports]
        outputs = self.prediction_module.forward(*inputs)
        ret = {p: p.torch_to_numpy(t) for p, t in zip(self.output_ports, outputs) if p in goal_ports}
        for p in goal_ports:
            if p not in ret and p in batch:
                ret[p] = batch[p]
        return ret
base.py 文件源码 项目:settei 作者: spoqa 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __get__(self, obj, cls: typing.Optional[type]=None):
        if obj is None:
            return self
        default, expression = self.get_raw_value(obj)
        if not default:
            if not isinstance(expression, collections.abc.Mapping):
                raise ConfigTypeError(
                    '{0!r} field must be a mapping, not {1}'.format(
                        self.key, typing._type_repr(type(expression))
                    )
                )
            elif 'class' not in expression:
                raise ConfigValueError(
                    '{0!r} field lacks "class" field'.format(self.key)
                )
            value = self.evaluate(expression)
            self.typecheck(value)
        return value
base.py 文件源码 项目:settei 作者: spoqa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def evaluate(self, expression) -> object:
        if not isinstance(expression, collections.abc.Mapping):
            return expression
        try:
            import_path = expression['class']
        except KeyError:
            return expression
        f = self.import_(import_path)
        args = expression.get('*', ())
        if isinstance(args, str) or \
           not isinstance(args, collections.abc.Sequence):
            raise ConfigValueError(
                '"*" field must be a list, not ' + repr(args)
            )
        kw = {k: v for k, v in expression.items() if k not in ('class', '*')}
        if self.recurse:
            args = map(self.evaluate, args)
            kw = {k: self.evaluate(v) for k, v in kw.items()}
        return f(*args, **kw)
celery.py 文件源码 项目:settei 作者: spoqa 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def worker_config(self) -> typing.Mapping[str, object]:
        """(:class:`typing.Mapping`\ [:class:`str`, :class:`object`])
        The configuration maping for worker that will go to :attr:`Celery.conf
        <celery.Celery.conf>`.

        """
        raw_config = self.config.get('worker', {})
        if isinstance(raw_config, collections.abc.Mapping):
            celery_config = {k.upper(): v for k, v in raw_config.items()}
        else:
            celery_config = {}
        celery_config.update(
            BROKER_URL=self.worker_broker_url,
            CELERY_RESULT_BACKEND=self.worker_result_backend,
            CELERYBEAT_SCHEDULE=self.worker_schedule
        )
        return celery_config
test.py 文件源码 项目:filters 作者: eflglobal 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def sorted_dict(value):
    # type: (Mapping) -> Any
    """
    Sorts a dict's keys to avoid leaking information about the
    backend's handling of unordered dicts.
    """
    if isinstance(value, Mapping):
        return OrderedDict(
            (key, sorted_dict(value[key]))
                for key in sorted(iterkeys(value))
        )

    elif isinstance(value, Sequence) and not isinstance(value, string_types):
        return list(map(sorted_dict, value))

    else:
        return value
context.py 文件源码 项目:sketchbook 作者: futursolo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(
        self, *, cache_sketches: bool=True,
        source_encoding: str="utf-8",
            custom_escape_fns: Mapping[str, Callable[[Any], str]]={}) -> None:

        self._source_encoding = source_encoding

        escape_fns = escaping.builtin_escape_fns.copy()
        if custom_escape_fns:
            escape_fns.update(custom_escape_fns)
        self._escape_fns = types.MappingProxyType(escape_fns)

        self._stmt_classes = list(statements.builtin_stmt_classes)

        class OutputStmt(statements.BaseOutput):
            _filter_fn_names = list(self.escape_fns.keys())

        self._stmt_classes.append(OutputStmt)

        self._cache_sketches = cache_sketches
gfa.py 文件源码 项目:phasm 作者: AbeelLab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def gfa2_line_to_la(reads: Mapping[str, Read]):
    def mapper(line: str):
        if not line.startswith('E'):
            raise ValueError('Given GFA2 line is not an edge.')

        sid1, sid2, arange, brange, alignment, tags = gfa2_parse_edge(line)

        a_read = reads[sid1[:-1]]
        b_read = reads[sid2[:-1]]

        return LocalAlignment(
            a_read.with_orientation(sid1[-1]),
            b_read.with_orientation(sid2[-1]),
            arange, brange, alignment)

    return mapper
serialization.py 文件源码 项目:aioinflux 作者: plugaai 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parse_data(data, measurement=None, tag_columns=None, **extra_tags):
    """Converts input data into line protocol format"""
    if isinstance(data, bytes):
        return data
    elif isinstance(data, str):
        return data.encode('utf-8')
    elif isinstance(data, pd.DataFrame):
        if measurement is None:
            raise ValueError("Missing 'measurement'")
        return parse_df(data, measurement, tag_columns, **extra_tags)
    elif isinstance(data, Mapping):
        return make_line(data, measurement, **extra_tags)
    elif isinstance(data, Iterable):
        return b'\n'.join([parse_data(i, measurement, tag_columns, **extra_tags) for i in data])
    else:
        raise ValueError('Invalid input', data)
config.py 文件源码 项目:shanghai 作者: chireiden 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __getitem__(self, key: str) -> Any:
        node = self.mapping
        leafs = key.split(".")

        for i, leaf in enumerate(leafs):
            if not isinstance(node, c_abc.Mapping):
                raise KeyError(f"Element {'.'.join(leafs[:i])!r} is not a mapping")
            if not leaf:
                raise KeyError(f"Empty sub-key after {'.'.join(leafs[:i])!r}")
            if leaf not in node:
                break
            node = node[leaf]
        else:
            return node

        raise KeyError(f"Cannot find '{key}'")
config.py 文件源码 项目:shanghai 作者: chireiden 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _parse_servers(self, mapping: Mapping) -> List[Server]:
        servers = []
        servers_conf = mapping.get('servers')
        if not servers_conf:
            raise ConfigurationError(f"Network {self.name!r} has no servers")
        if not isinstance(servers_conf, list):
            raise ConfigurationError(f"Servers of Network {self.name!r} are not a list")
        for server_conf in mapping.get('servers', ()):
            if isinstance(server_conf, str):
                server = Server.from_string(server_conf)
            else:
                server = Server.with_optional_port(**server_conf)
            servers.append(server)

        else:
            return servers
asyncpg.py 文件源码 项目:asyncqlio 作者: SunDwarf 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def execute(self, sql: str, params: typing.Mapping[str, typing.Any] = None):
        """
        Executes SQL inside the transaction.

        :param sql: The SQL to execute.
        :param params: The parameters to excecute with.
        """
        # re-paramatarize the query
        logger.debug("Executing query {} with params {}".format(sql, params))
        query, params = get_param_query(sql, params)

        try:
            results = await self.acquired_connection.execute(query, *params)
        except (asyncpg.IntegrityConstraintViolationError,
                asyncpg.exceptions.NotNullViolationError) as e:
            raise IntegrityError(*e.args) from e
        except asyncpg.ObjectNotInPrerequisiteStateError as e:
            raise OperationalError(*e.args) from e
        except (asyncpg.SyntaxOrAccessError, asyncpg.InFailedSQLTransactionError) as e:
            raise DatabaseException(*e.args) from e

        return results
sqlite3.py 文件源码 项目:asyncqlio 作者: SunDwarf 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def execute(self, sql: str, params: typing.Union[typing.Mapping, typing.Iterable] = None):
        """
        Executes SQL in the current transaction.
        """
        # lock to ensure nothing else is using the connection at once

        logger.debug("Running SQL {} with params {}".format(sql, params))
        async with self._lock:
            async with threadpool():
                for stmt in separate_statements(sql):
                    try:
                        if params is None:
                            res = self.connection.execute(stmt)
                        else:
                            res = self.connection.execute(stmt, params)
                    except sqlite3.IntegrityError as e:
                        raise IntegrityError(*e.args)
                    except sqlite3.OperationalError as e:
                        raise DatabaseException(*e.args)

            return res
sqlite3.py 文件源码 项目:asyncqlio 作者: SunDwarf 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def cursor(self, sql: str, params: typing.Union[typing.Mapping, typing.Iterable] = None) \
            -> 'Sqlite3ResultSet':
        """
        Gets a cursor for the specified SQL.
        """

        logger.debug("Running SQL {} with params {}".format(sql, params))

        async with self._lock:
            async with threadpool():
                for stmt in separate_statements(sql):
                    cur = self.connection.cursor()
                    try:
                        if params is None:
                            cur.execute(stmt)
                        else:
                            cur.execute(stmt, params)
                    except sqlite3.OperationalError as e:
                        raise DatabaseException(*e.args)

        return Sqlite3ResultSet(cur)
query.py 文件源码 项目:asyncqlio 作者: SunDwarf 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def map_many(self, *rows: typing.Mapping[str, typing.Any]):
        """
        Maps many records to one row.

        This will group the records by the primary key of the main query table, then add additional
        columns as appropriate.
        """
        # this assumes that the rows come in grouped by PK on the left
        # also fuck right joins
        # get the first row and construct the first table row using map_columns
        # this will also map any extra relationship data there
        first_row = rows[0]
        tbl_row = self.map_columns(first_row)

        # loop over every "extra" rows
        # and update the relationship data in the table
        for runon_row in rows[1:]:
            tbl_row._update_relationships(runon_row)
            pass

        return tbl_row

    # Helper methods for natural builder-style queries
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(
        self,
        params: t.Mapping[str, str],
        lti_provider: models.LTIProvider = None
    ) -> None:
        self.launch_params = params

        if lti_provider is not None:
            self.lti_provider = lti_provider
        else:
            lti_id = params['lti_provider_id']
            self.lti_provider = helpers.get_or_404(
                models.LTIProvider,
                lti_id,
            )

        self.key = self.lti_provider.key
        self.secret = self.lti_provider.secret

    # TODO support more than just flask
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(
        self,
        operation: str = None,
        score: str = None,
        result_data: t.Mapping[str, str] = None,
        message_identifier: str = None,
        lis_outcome_service_url: str = None,
        lis_result_sourcedid: str = None,
        consumer_key: str = None,
        consumer_secret: str = None,
        post_request: t.Any = None
    ) -> None:
        self.operation = operation
        self.score = score
        self.result_data = result_data
        self.outcome_response = None  # type: t.Optional['OutcomeResponse']
        self.message_identifier = message_identifier
        self.lis_outcome_service_url = lis_outcome_service_url
        self.lis_result_sourcedid = lis_result_sourcedid
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret
        self.post_request = post_request
models.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_all_permissions(self) -> t.Mapping[str, bool]:
        """Get all course permissions (:class:`Permission`) for this role.

        :returns: A name boolean mapping where the name is the name of the
                  permission and the value indicates if this user has this
                  permission.
        """
        perms: t.Sequence[Permission] = (
            Permission.query.
            filter_by(  # type: ignore
                course_permission=False
            ).all()
        )
        result: t.MutableMapping[str, bool] = {}
        for perm in perms:
            if perm.name in self._permissions:
                result[perm.name] = not perm.default_value
            else:
                result[perm.name] = perm.default_value
        return result
models.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __to_json__(self) -> t.Mapping[str, t.Any]:
        """Creates a JSON serializable representation of this object.

        This object will look like this:

        .. code:: python

            {
                'id':    int, # The id of this user.
                'name':  str, # The full name of this user.
                'email': str, # The email of this user.
                'username': str, # The username of this user.
            }

        :returns: An object as described above.
        """
        return {
            'id': self.id,
            'name': self.name,
            'email': self.email,
            'username': self.username,
        }


问题


面经


文章

微信
公众号

扫码关注公众号