python类sub()的实例源码

process_story.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def process_story(text):
  """Processed a story text into an (article, summary) tuple.
  """
  # Split by highlights
  elements = text.split("@highlight")
  elements = [_.strip() for _ in elements]

  story_text = elements[0]
  highlights = elements[1:]

  # Join all highlights into a single blob
  highlights_joined = "; ".join(highlights)
  highlights_joined = re.sub(r"\s+", " ", highlights_joined)
  highlights_joined = highlights_joined.strip()

  # Remove newlines from story
  # story_text = story_text.replace("\n", " ")
  story_text = re.sub(r"\s+", " ", story_text)
  story_text = story_text.strip()

  return story_text, highlights_joined
postgres_sqlalchemy.py 文件源码 项目:database_assetstore 作者: OpenGeoscience 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _get_column_info(self, name, format_type, *args, **kwargs):
    """
    When the PGDialect or its subclasses get column information, if the type
    is unknown and certain conditions are met, create a new type dynamically.
    This wraps the original function (see sqlalchemy's
    dialects/postgresql/base.py file).
    """
    attype = re.sub(r'\(.*\)', '', format_type)
    attype = re.sub(r'\[\]', '', attype)
    info = gci(self, name, format_type, *args, **kwargs)
    if (info['type'] == sqlalchemy.sql.sqltypes.NULLTYPE and
            attype.lower() not in dialect.base.ischema_names and
            attype not in KnownTypes):
        newtype = type(str(attype), (DynamicType,), {'name': str(attype)})
        newtype.__visit_name__ = attype
        dialect.base.ischema_names[attype.lower()] = newtype
        KnownTypes[attype] = newtype
        info = gci(self, name, format_type, *args, **kwargs)
    return info
common.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def format_name(display_name, prefix, prefixes, prefix_format_func=None):
    if prefix is not None and prefix_format_func is not None:
        prefix = prefix_format_func(prefix)

    # Default multi -> '' if no format func given
    if prefix_format_func is None and prefix == cr_constants.MULTI_REFS_PREFIX:
        prefix = ''

    if len(prefixes) > 1 or '%s' in display_name:
        display_name = add_prefix(prefix, display_name)

    # Replace underscores w/ spaces
    display_name = display_name.replace('_', ' ')

    # Collapse whitespace
    display_name = re.sub('\s+', ' ', display_name)

    return display_name
common.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def format_description(description, prefix, prefixes, prefix_format_func=None):
    if prefix is not None and prefix_format_func is not None:
        prefix = prefix_format_func(prefix)

    # Default multi -> '' if no format func given
    if prefix_format_func is None and prefix == cr_constants.MULTI_REFS_PREFIX:
        prefix = ''

    if '%s' in description:
        # Escape stray percents
        description = re.sub('%([^s])', '%%\\1', description)

        # Only add the prefix if there are multiple possibilities
        s = str(prefix) if len(prefixes) > 1 and prefix else ''
        description = description % s

    # Collapse whitespace
    description = re.sub('\s+', ' ', description)

    return description
containers.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, additional_compose_file=None, additional_services=None):
        # To resolve docker client server version mismatch issue.
        os.environ["COMPOSE_API_VERSION"] = "auto"
        dir_name = os.path.split(os.getcwd())[-1]
        self.project = "{}{}".format(
            re.sub(r'[^a-z0-9]', '', dir_name.lower()),
            getpass.getuser()
        )
        self.additional_compose_file = additional_compose_file

        self.services = ["zookeeper", "schematizer", "kafka"]

        if additional_services is not None:
            self.services.extend(additional_services)

        # This variable is meant to capture the running/not-running state of
        # the dependent testing containers when tests start running.  The idea
        # is, we'll only start and stop containers if they aren't already
        # running.  If they are running, we'll just use the ones that exist.
        # It takes a while to start all the containers, so when running lots of
        # tests, it's best to start them out-of-band and leave them up for the
        # duration of the session.
        self.containers_already_running = self._are_containers_already_running()
tl_object.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def infer_id(self):
        representation = self.__repr__(ignore_id=True)

        # Clean the representation
        representation = representation\
            .replace(':bytes ', ':string ')\
            .replace('?bytes ', '?string ')\
            .replace('<', ' ').replace('>', '')\
            .replace('{', '').replace('}', '')

        representation = re.sub(
            r' \w+:flags\.\d+\?true',
            r'',
            representation
        )
        return crc32(representation.encode('ascii'))
jobs.py 文件源码 项目:jobs 作者: josiahcarlson 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _check_inputs_and_outputs(fcn):
    @functools.wraps(fcn)
    def call(conn, inputs, outputs, identifier, *a, **kw):
        assert isinstance(inputs, (list, tuple, set)), inputs
        assert isinstance(outputs, (list, tuple, set)), outputs
        assert '' not in inputs, inputs
        assert '' not in outputs, outputs
        # this is for actually locking inputs/outputs
        inputs, outputs = list(map(str, inputs)), list(map(str, outputs))
        locks = inputs + [''] + outputs

        if kw.pop('history', None):
            igraph = [EDGE_RE.sub('*', inp) for inp in inputs]
            ograph = [EDGE_RE.sub('*', out) for out in outputs]
            graph_id = EDGE_RE.sub('*', str(identifier))
            graph = igraph + [''] + ograph + ['', graph_id]
            if all(x.startswith('test.') for x in igraph + ograph):
                graph = ['', '']
        else:
            graph = ['', '']

        return fcn(conn, locks, graph, str(identifier), *a, **kw)
    return call
cifar10.py 文件源码 项目:ml 作者: hohoins 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _activation_summary(x):
  """Helper to create summaries for activations.

  Creates a summary that provides a histogram of activations.
  Creates a summary that measure the sparsity of activations.

  Args:
    x: Tensor
  Returns:
    nothing
  """
  # Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training
  # session. This helps the clarity of presentation on tensorboard.
  tensor_name = re.sub('%s_[0-9]*/' % TOWER_NAME, '', x.op.name)
  # tf.histogram_summary(tensor_name + '/activations', x)
  tf.summary.histogram(tensor_name + '/activations', x)
  # tf.scalar_summary(tensor_name + '/sparsity', tf.nn.zero_fraction(x))
  tf.summary.scalar(tensor_name + '/sparsity', tf.nn.zero_fraction(x))
cifar10.py 文件源码 项目:ml 作者: hohoins 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _activation_summary(x):
  """Helper to create summaries for activations.

  Creates a summary that provides a histogram of activations.
  Creates a summary that measure the sparsity of activations.

  Args:
    x: Tensor
  Returns:
    nothing
  """
  # Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training
  # session. This helps the clarity of presentation on tensorboard.
  tensor_name = re.sub('%s_[0-9]*/' % TOWER_NAME, '', x.op.name)
  # tf.histogram_summary(tensor_name + '/activations', x)
  tf.summary.histogram(tensor_name + '/activations', x)
  # tf.scalar_summary(tensor_name + '/sparsity', tf.nn.zero_fraction(x))
  tf.summary.scalar(tensor_name + '/sparsity', tf.nn.zero_fraction(x))
filelist.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def glob_to_re(pattern):
    """Translate a shell-like glob pattern to a regular expression.

    Return a string containing the regex.  Differs from
    'fnmatch.translate()' in that '*' does not match "special characters"
    (which are platform-specific).
    """
    pattern_re = fnmatch.translate(pattern)

    # '?' and '*' in the glob pattern become '.' and '.*' in the RE, which
    # IMHO is wrong -- '?' and '*' aren't supposed to match slash in Unix,
    # and by extension they shouldn't match such "special characters" under
    # any OS.  So change all non-escaped dots in the RE to match any
    # character except the special characters.
    # XXX currently the "special characters" are just slash -- i.e. this is
    # Unix-only.
    pattern_re = re.sub(r'((?<!\\)(\\\\)*)\.', r'\1[^/]', pattern_re)

    return pattern_re
util.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def subst_vars (s, local_vars):
    """Perform shell/Perl-style variable substitution on 'string'.  Every
    occurrence of '$' followed by a name is considered a variable, and
    variable is substituted by the value found in the 'local_vars'
    dictionary, or in 'os.environ' if it's not in 'local_vars'.
    'os.environ' is first checked/augmented to guarantee that it contains
    certain values: see 'check_environ()'.  Raise ValueError for any
    variables not found in either 'local_vars' or 'os.environ'.
    """
    check_environ()
    def _subst (match, local_vars=local_vars):
        var_name = match.group(1)
        if var_name in local_vars:
            return str(local_vars[var_name])
        else:
            return os.environ[var_name]

    try:
        return re.sub(r'\$([a-zA-Z_][a-zA-Z_0-9]*)', _subst, s)
    except KeyError, var:
        raise ValueError, "invalid variable '$%s'" % var

# subst_vars ()
utils.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def formataddr(pair):
    """The inverse of parseaddr(), this takes a 2-tuple of the form
    (realname, email_address) and returns the string value suitable
    for an RFC 2822 From, To or Cc header.

    If the first element of pair is false, then the second element is
    returned unmodified.
    """
    name, address = pair
    if name:
        quotes = ''
        if specialsre.search(name):
            quotes = '"'
        name = escapesre.sub(r'\\\g<0>', name)
        return '%s%s%s <%s>' % (quotes, name, quotes, address)
    return address
cookielib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def join_header_words(lists):
    """Do the inverse (almost) of the conversion done by split_header_words.

    Takes a list of lists of (key, value) pairs and produces a single header
    value.  Attribute values are quoted if needed.

    >>> join_header_words([[("text/plain", None), ("charset", "iso-8859/1")]])
    'text/plain; charset="iso-8859/1"'
    >>> join_header_words([[("text/plain", None)], [("charset", "iso-8859/1")]])
    'text/plain, charset="iso-8859/1"'

    """
    headers = []
    for pairs in lists:
        attr = []
        for k, v in pairs:
            if v is not None:
                if not re.search(r"^\w+$", v):
                    v = HEADER_JOIN_ESCAPE_RE.sub(r"\\\1", v)  # escape " and \
                    v = '"%s"' % v
                k = "%s=%s" % (k, v)
            attr.append(k)
        if attr: headers.append("; ".join(attr))
    return ", ".join(headers)
comment.py 文件源码 项目:Instagram 作者: Fastcampus-WPS-5th 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def make_html_content_and_add_tags(self):
        # ????? ???? ?????
        p = re.compile(r'(#\w+)')
        # findall???? ???? ????? ???
        tag_name_list = re.findall(p, self.content)
        # ?? content(Comment??)? ??? ??
        ori_content = self.content
        # ????? ????
        for tag_name in tag_name_list:
            # Tag??? ????? ??, ????? ???? ????? _??
            tag, _ = Tag.objects.get_or_create(name=tag_name.replace('#', ''))
            # ?? content? ??? ??
            change_tag = '<a href="{url}" class="hash-tag">{tag_name}</a>'.format(
                # url=reverse('post:hashtag_post_list', args=[tag_name.replace('#', '')]),
                url=reverse('post:hashtag_post_list',
                            kwargs={'tag_name': tag_name.replace('#', '')}),
                tag_name=tag_name
            )
            ori_content = re.sub(r'{}(?![<\w])'.format(tag_name), change_tag, ori_content, count=1)
            # content? ??? Tag??? ??? tags??? ??
            if not self.tags.filter(pk=tag.pk).exists():
                self.tags.add(tag)
        # ??? ??? ???? html_content? ??
        self.html_content = ori_content
        super().save(update_fields=['html_content'])
test_unit.py 文件源码 项目:ln2sql 作者: FerreroJeremy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _clean_output(s):
    s = s.split("SELECT")[1]  # remove table schema
    s = re.sub("\\033.*?m", "", s)  # remove color codes
    s = s.replace('\n', ' ')  # remove '\n'
    s = s.split(';')[0]  # remove spaces after ;
    s = "SELECT" + s + ';'  # put back lost SELECT and ';'
    return s
test_thesaurus.py 文件源码 项目:ln2sql 作者: FerreroJeremy 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _clean_output(s):
    s = s.split("SELECT")[1]  # remove table schema
    s = re.sub("\\033.*?m", "", s)  # remove color codes
    s = s.replace('\n', ' ')  # remove '\n'
    s = s.split(';')[0]  # remove spaces after ;
    s = "SELECT" + s + ';'  # put back lost SELECT and ';'
    return s
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self, node, platform='', cpus=0, memory=0, disk=0):
        if node.find('*') < 0:
            try:
                info = socket.getaddrinfo(node, None)[0]
                ip_addr = info[4][0]
                if info[0] == socket.AF_INET6:
                    ip_addr = re.sub(r'^0*', '', ip_addr)
                    ip_addr = re.sub(r':0*', ':', ip_addr)
                    ip_addr = re.sub(r'::+', '::', ip_addr)
                node = ip_addr
            except:
                node = ''

        if node:
            self.ip_rex = node.replace('.', '\\.').replace('*', '.*')
        else:
            logger.warning('node "%s" is invalid', node)
            self.ip_rex = ''
        self.platform = platform.lower()
        self.cpus = cpus
        self.memory = memory
        self.disk = disk
router.py 文件源码 项目:django-openapi-gen 作者: Ecognize 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def make_regex(self, path, named = False):
        regex = re.sub(SWAGGER_PARAMS_REGEX, DJANGO_PARAMS_STRING, path) if named else path
        regex = re.sub(URL_SLASHES_REGEX, DJANGO_URL_SUBSTRING, regex)

        return regex

    #: create handler ready for urlization
linux-soft-exploit-suggester.py 文件源码 项目:linux-soft-exploit-suggester 作者: belane 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def cleanName(soft_name):
    """ Clean package name from common strings """
    for badword in badpackages:
        soft_name = re.sub(r'-' + badword, '', soft_name)
    return soft_name
previews.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fetch_oembed_preview(content, urls):
    """Fetch first oembed content for a list of urls."""
    for url in urls:
        # See first if recently cached already
        if OEmbedCache.objects.filter(url=url, modified__gte=now()-datetime.timedelta(days=7)).exists():
            oembed = OEmbedCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(oembed=oembed)
            return oembed
        # Fetch oembed
        options = {}
        if url.startswith("https://twitter.com/"):
            # This probably has little effect since we fetch these on the backend...
            # But, DNT is always good to communicate if possible :)
            options = {"dnt": "true"}
        try:
            oembed = PyEmbed(discoverer=OEmbedDiscoverer()).embed(url, **options)
        except (PyEmbedError, PyEmbedDiscoveryError, PyEmbedConsumerError, ValueError):
            continue
        if not oembed:
            continue
        # Ensure width is 100% not fixed
        oembed = re.sub(r'width="[0-9]*"', 'width="100%"', oembed)
        oembed = re.sub(r'height="[0-9]*"', "", oembed)
        try:
            with transaction.atomic():
                oembed = OEmbedCache.objects.create(url=url, oembed=oembed)
        except IntegrityError:
            # Some other process got ahead of us
            oembed = OEmbedCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(oembed=oembed)
            return oembed
        Content.objects.filter(id=content.id).update(oembed=oembed)
        return oembed
    return False


问题


面经


文章

微信
公众号

扫码关注公众号