python类suppress()的实例源码

signals.py 文件源码 项目:django-notifs 作者: danidee10 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_notification(**kwargs):
    """Notify signal receiver."""
    # make fresh copy and retain kwargs
    params = kwargs.copy()
    del params['signal']
    del params['sender']
    with suppress(KeyError):
        del params['silent']

    silent = kwargs.get('silent', False)

    # If it's a silent notification create the notification but don't save it
    if silent:
        notification = Notification(**params)
    else:
        notification = Notification.objects.create(**params)

    # send via custom adapters
    for adapter_path in getattr(settings, 'NOTIFICATION_ADAPTERS', []):
        adapter = import_attr(adapter_path)
        adapter(**kwargs).notify()

    if getattr(settings, 'NOTIFICATIONS_WEBSOCKET', False):
        send_to_queue(notification)
db.py 文件源码 项目:quokka_ng 作者: rochacbruno 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def value_set(self, colname, key, filter=None,
                  sort=True, flat=False, **kwargs):
        """Return a set of all values in a key"""
        if filter is not None:
            data = self.get_collection(colname).find(filter, **kwargs)
        else:
            data = self.get_collection(colname).find(**kwargs)

        values = [item.get(key) for item in data if item.get(key) is not None]

        if flat is True:
            values = list(itertools.chain(*values))

        with suppress(TypeError):
            values = list(set(values))

        return sorted(values) if sort is True else values
vmf.py 文件源码 项目:srctools 作者: TeamSpen210 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __delitem__(self, key):
        key = key.casefold()
        if key == 'targetname':
            with suppress(KeyError):
                self.map.by_target[
                    self.keys.get('targetname', None)
                ].remove(self)
            self.map.by_target[None].add(self)

        if key == 'classname':
            with suppress(KeyError):
                self.map.by_class[
                    self.keys.get('classname', None)
                ].remove(self)
            self.map.by_class[None].add(self)

        for k in self.keys:
            if k.casefold() == key:
                del self.keys[k]
                break
common.py 文件源码 项目:aio-pika 作者: mosquito 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def future_with_timeout(loop, timeout, future=None):
    loop = loop or asyncio.get_event_loop()
    f = future or create_future(loop=loop)

    def on_timeout():
        if f.done():
            return
        f.set_exception(TimeoutError)

    if timeout:
        handler = loop.call_later(timeout, on_timeout)

        def on_result(*_):
            with suppress(Exception):
                handler.cancel()

        f.add_done_callback(on_result)

    return f
schedule.py 文件源码 项目:pretalx 作者: pretalx 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def unfreeze(self, user=None):
        from pretalx.schedule.models import TalkSlot
        if not self.version:
            raise Exception('Cannot unfreeze schedule version: not released yet.')

        # collect all talks, which have been added since this schedule (#72)
        submission_ids = self.talks.all().values_list('submission_id', flat=True)
        talks = self.event.wip_schedule.talks \
            .exclude(submission_id__in=submission_ids) \
            .union(self.talks.all())

        wip_schedule = Schedule.objects.create(event=self.event)
        new_talks = []
        for talk in talks:
            new_talks.append(talk.copy_to_schedule(wip_schedule, save=False))
        TalkSlot.objects.bulk_create(new_talks)

        self.event.wip_schedule.talks.all().delete()
        self.event.wip_schedule.delete()

        with suppress(AttributeError):
            del wip_schedule.event.wip_schedule

        return self, wip_schedule
views.py 文件源码 项目:pretalx 作者: pretalx 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sort_queryset(self, qs):
        sort_key = self.request.GET.get('sort')
        if sort_key:
            plain_key = sort_key[1:] if sort_key.startswith('-') else sort_key
            reverse = not (plain_key == sort_key)
            if plain_key in self.sortable_fields:
                is_text = False
                with suppress(FieldDoesNotExist):
                    is_text = isinstance(qs.model._meta.get_field(plain_key), CharField)

                if is_text:
                    # TODO: this only sorts direct lookups case insensitively
                    # A sorting field like 'speaker__name' will not be found
                    qs = qs.annotate(key=Lower(plain_key)).order_by('-key' if reverse else 'key')
                else:
                    qs = qs.order_by(sort_key)
        return qs
event.py 文件源码 项目:pretalx 作者: pretalx 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _select_locale(self, request):
        supported = request.event.locales if (hasattr(request, 'event') and request.event) else settings.LANGUAGES
        language = (
            self._language_from_user(request, supported)
            or self._language_from_cookie(request, supported)
            or self._language_from_browser(request, supported)
        )
        if hasattr(request, 'event') and request.event:
            language = language or request.event.locale

        translation.activate(language)
        request.LANGUAGE_CODE = translation.get_language()

        with suppress(pytz.UnknownTimeZoneError):
            if request.user.is_authenticated:
                tzname = request.user.timezone
            elif hasattr(request, 'event') and request.event:
                tzname = request.event.timezone
            else:
                tzname = settings.TIME_ZONE
            timezone.activate(pytz.timezone(tzname))
            request.timezone = tzname
invoke_test.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_json(self):
        data = self._get_config_data()

        try:
            json_data = json.loads(data.value)  # pylint: disable=no-member
        except ValueError:
            raise InvokeError('Failed to parse JSON data')

        # simple ordered dict
        command_execution_pairs = [
            ('ab', ApacheBenchExecution),
            ('nginx', NginxExecution),
            ('netperf', NetperfExecution),
            ('iperf', IperfExecution),
        ]

        for command_type, exec_class in command_execution_pairs:
            with suppress(KeyError):
                return exec_class(json_data[command_type], self)

        raise InvokeError('No command found to parse')
pytest_returns.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pytest_report_teststatus(self, report):
        outcome = yield
        if report.when == 'call':
            if report.passed:
                result = self.RESULT_PASSED
            elif report.skipped:
                result = self.RESULT_SKIPPED
            else:  # if report.failed:
                result = self.RESULT_FAILED

            status, letter, msg = self.get_result(result)

            if report.passed:
                # print return result instead of 'PASSED'
                with suppress(AttributeError):
                    msg = report.retval

            outcome.result = status, letter, msg
tkview.py 文件源码 项目:chronophore 作者: mesbahamin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _show_feedback_label(self, message, seconds=None):
        """Display a message in lbl_feedback, which then times out after
        some number of seconds. Use after() to schedule a callback to
        hide the feedback message. This works better than using threads,
        which can cause problems in Tk.
        """
        if seconds is None:
            seconds = CONFIG['MESSAGE_DURATION']

        # cancel any existing callback to clear the feedback
        # label. this prevents flickering and inconsistent
        # timing during rapid input.
        with contextlib.suppress(AttributeError):
            self.root.after_cancel(self.clear_feedback)

        logger.debug('Label feedback: "{}"'.format(message))
        self.feedback.set(message)
        self.clear_feedback = self.root.after(
            1000 * seconds, lambda: self.feedback.set("")
        )
goldbot.py 文件源码 项目:goldmine 作者: Armored-Dragon 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def on_guild_join(self, guild):
        """Send the bot introduction message when invited."""
        self.logger.info('New guild: ' + guild.name)
        if self.selfbot: return
        try:
            await self.send_message(guild.default_channel, join_msg)
        except discord.Forbidden:
            satisfied = False
            c_count = 0
            try_channels = list(guild.channels)
            channel_count = len(try_channels) - 1
            while not satisfied:
                with suppress(discord.Forbidden, discord.HTTPException):
                    await self.send_message(try_channels[c_count], join_msg)
                    satisfied = True
                if c_count > channel_count:
                    self.logger.warning('Couldn\'t announce join to guild ' + guild.name)
                    satisfied = True
                c_count += 1
models.py 文件源码 项目:env-diff 作者: jacebrowning 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def from_code(cls, *lines, index=0):
        line = lines[index].strip()

        match = (cls.RE_ENV_SET.match(line) or
                 cls.RE_QUOTED_CAPITALS.search(line))
        if not match:
            log.debug("Skipped line %s without variable: %r", index + 1, line)
            return None

        name = match.group('name')
        context = line
        with suppress(IndexError):
            if context.endswith('{'):
                context += lines[index + 1].strip()
                context += lines[index + 2].strip()

        variable = cls(name, context=context)
        log.info("Loaded variable: %s", variable)

        return variable
minesweeper.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 72 收藏 0 点赞 0 评论 0
def run(self):
        self._interaction = asyncio.ensure_future(self._game_screen.interact(timeout=None, delete_after=False))
        self._runner = asyncio.ensure_future(self.run_loop())
        # await self._game_screen.wait_until_ready()
        try:
            return await self._runner
        finally:
            # For some reason having all these games hanging around causes lag.
            # Until I properly make a delete_after on the paginator I'll have to
            # settle with this hack.
            async def task():
                await asyncio.sleep(30)
                with contextlib.suppress(discord.HTTPException):
                    await self._game_screen._message.delete()

            self.ctx.bot.loop.create_task(task())
            self._interaction.cancel()
meta.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _inrole(ctx, *roles, members, final='and'):
        joined_roles = human_join(map(str, roles), final=final)
        truncated_title = truncate(f'Members in {pluralize(role=len(roles))} {joined_roles}', 256, '...')

        total_color = map(sum, zip(*(role.colour.to_rgb() for role in roles)))
        average_color = discord.Colour.from_rgb(*map(round, (c / len(roles) for c in total_color)))

        if members:
            entries = sorted(map(str, members))
            # Make the author's name bold (assuming they have that role).
            # We have to do it after the list was built, otherwise the author's name
            # would be at the top.
            with contextlib.suppress(ValueError):
                index = entries.index(str(ctx.author))
                entries[index] = f'**{entries[index]}**'
        else:
            entries = ('There are no members :(', )

        pages = ListPaginator(ctx, entries, colour=average_color, title=truncated_title)
        await pages.interact()
afk.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_afk_embed(self, member):
        message = self.afks.get(member.id)
        if message is None:
            return None

        avatar = member.avatar_url
        colour = await user_color(member)
        title = f"{member.display_name} is AFK"

        embed = (discord.Embed(description=message, colour=colour)
                .set_author(name=title, icon_url=avatar)
                .set_footer(text=f"ID: {member.id}")
                )

        with contextlib.suppress(IndexError):
            embed.timestamp = self.user_message_queues[member.id][-1]
        return embed
ddmbot.py 文件源码 项目:ddmbot 作者: Budovi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def on_voice_state_update(self, before, after):
        if before == self._client.user:
            if after.voice.voice_channel != self._voice_channel:
                log.warning('Client was disconnected from the voice channel')
                await self.connect_voice()
            return
        # joining
        if before.voice.voice_channel != self._voice_channel and after.voice.voice_channel == self._voice_channel:
            with suppress(database.bot.IgnoredUserError):
                if await self._database.interaction_check(int(after.id)):
                    await self._send_welcome_message(after)
                await self._users.add_listener(int(after.id), direct=False)

        # leaving
        elif before.voice.voice_channel == self._voice_channel and after.voice.voice_channel != self._voice_channel:
            try:
                await self._users.remove_listener(int(after.id), direct=False)
            except ValueError:
                log.warning('Tried to remove {0} (ID: {0.id}) from listeners but the user was not listed'.format(after))
backup.py 文件源码 项目:labrat 作者: sdhutchins 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def backup(self):
        """Backup files from source directory to destination."""

        logzero.logfile("directory_backup_%s.log" %
                        str(d.now().strftime(self._f2)))
        sep = 50*'-'
        log.info("#%s" % sep)
        log.info("The script name is %s" % os.path.basename(__file__))
        log.info("The date and time is currently %s" %
                 str(d.now().strftime(self._f1)))

        with contextlib.suppress(OSError):
            # TODO Add SameFileError from shutil
            # TODO Add threading
            log.error(OSError)
            copytree(self.sourcedir, self.backupdir)
            log.info('%s has been backed up.' % self.sourcedir)
replaying.py 文件源码 项目:pyrcrack 作者: XayOn 项目源码 文件源码 阅读 109 收藏 0 点赞 0 评论 0
def __init__(self, attack=False, interface=False, stdout=None, stderr=None, **kwargs):
        self.stdout = stdout
        self.stderr = stderr
        if not stdout:
            self.stdout = DEVNULL
        if not stderr:
            self.stderr = DEVNULL
        self.interface = interface

        if attack not in self._allowed_attacks:
            raise WrongArgument

        self.attack = attack
        extra = tuple()
        with suppress(AttributeError):
            extra = getattr(self, "_allowed_arguments_{}".format(attack))
        if extra:
            self._allowed_arguments += extra
        self._allowed_arguments.append((attack, False))
        kwargs[attack] = True
        super(self.__class__, self).__init__(**kwargs)
extension.py 文件源码 项目:flask-openapi 作者: remcohaszing 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _process_rule(self, rule):
        path = {}
        view_func = self.app.view_functions[rule.endpoint]
        schema = self._extract_schema(view_func)
        if schema:
            path['parameters'] = [{
                'in': 'body',
                'name': 'payload',
                'schema': schema
            }]
        with suppress(AttributeError):
            path['responses'] = view_func.responses
        add_optional(path, 'description', self._extract_description(view_func))
        add_optional(
            path,
            'deprecated',
            getattr(view_func, 'deprecated', None))
        with suppress(AttributeError):
            path['tags'] = sorted(view_func.tags)
        return path
__main__.py 文件源码 项目:gateway 作者: wasp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def serve_many(workers=1):
    # thank you sanic
    workers = min(workers, multiprocessing.cpu_count())
    event = multiprocessing.Event()
    signal(SIGINT, lambda *_: event.set())
    signal(SIGTERM, lambda *_: event.set())

    processes = []
    kwargs = dict(reuse_port=True)
    for _ in range(workers):
        # noinspection PyArgumentList
        process = multiprocessing.Process(target=serve, kwargs=kwargs,
                                          daemon=True)
        process.start()
        print('Started subprocess:', process.name, process.pid)
        processes.append(process)

    with contextlib.suppress(Exception):
        while not event.is_set():
            time.sleep(0.5)

    [process.terminate() for process in processes]
    [process.join() for process in processes]
run.py 文件源码 项目:falsy 作者: pingf 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(coro, loop=None):
    async def main_task():
        pycurl_task = aio.ensure_future(curl_loop())
        try:
            r = await coro
        finally:
            pycurl_task.cancel()
            with suppress(aio.CancelledError):
                await pycurl_task
        return r, pycurl_task

    if loop is None:
        loop = uvloop.new_event_loop()
        # loop = aio.get_event_loop()
    aio.set_event_loop(loop)
    loop.set_exception_handler(exception_handler)
    r, _ = loop.run_until_complete(main_task())
    return r
converter.py 文件源码 项目:pyha 作者: gasparka 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def redbaron_pyfor_to_vhdl(red_node):
    def modify_for(red_node):
        # if for range contains call to 'range' -> skip
        with suppress(Exception):
            if red_node.target('call')[0].previous.value == 'range':
                return red_node

        frange = red_node.target
        ite = red_node.iterator

        red_node(ite.__class__.__name__, value=ite.value) \
            .map(lambda x: x.replace(f'{frange}[_i_]'))

        red_node.iterator = '_i_'
        return red_node

    fors = red_node.find_all('for')
    for x in fors:
        modify_for(x)

    return red_node
simulation_interface.py 文件源码 项目:pyha 作者: gasparka 项目源码 文件源码 阅读 70 收藏 0 点赞 0 评论 0
def flush_pipeline(func):
    """ For inputs: adds 'x.get_delay()' dummy samples, to flush out pipeline values
    For outputs: removes the first 'x.get_delay()' samples, as these are initial pipeline values"""

    @wraps(func)
    def flush_pipeline_wrap(self, *args, **kwargs):
        delay = 0
        with suppress(AttributeError):  # no get_delay()
            delay = self.model._delay
        if delay == 0:
            return func(self, *args, **kwargs)

        args = list(args)

        for i in range(delay):
            args.append(args[0])

        ret = func(self, *args, **kwargs)
        ret = ret[delay:]
        return ret

    return flush_pipeline_wrap
utils.py 文件源码 项目:django-rest-witchcraft 作者: shosca 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def field_kwargs(self):
        kwargs = {'label': capfirst(' '.join(self.property.key.split('_'))), 'help_text': self.property.doc}

        with suppress(AttributeError):
            enum_class = self.column.type.enum_class
            if enum_class is not None:
                kwargs['enum_class'] = self.column.type.enum_class
            else:
                kwargs['choices'] = self.column.type.enums

        with suppress(AttributeError):
            kwargs['max_digits'] = self.column.type.precision

        with suppress(AttributeError):
            kwargs['decimal_places'] = self.column.type.scale

        with suppress(AttributeError):
            kwargs['max_length'] = self.column.type.length

        kwargs['required'] = not self.column.nullable
        kwargs['allow_null'] = self.column.nullable

        return kwargs
pack.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __exit__(self: 'FileChange', *exc) -> None:
        """Clean up after change -- both success and failure."""

        if any(exc):  # Failure -- rollback
            if self.__valid_destination:
                if self.__file_change:
                    # Ignore missing file on deletion
                    with suppress(FileNotFoundError):
                        self.new_path.unlink()

            if self.__valid_source:
                if self.__file_change:
                    self.tmp_path.rename(self.old_path)
                if self.__store_change:
                    self.source[self.old_file.mod.id] = self.old_file

        else:  # Success -- change metadata
            if self.__valid_destination:
                self.destination[self.new_file.mod.id] = self.new_file

            # Clean temporary file
            if self.__valid_source and self.__file_change:
                    self.tmp_path.unlink()
update.py 文件源码 项目:c3nav 作者: c3nav 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def save(self, **kwargs):
        new = self.pk is None
        if not new and (self.was_processed or not self.processed):
            raise TypeError

        super().save(**kwargs)

        with suppress(FileExistsError):
            os.mkdir(os.path.dirname(self._changed_geometries_filename()))

        from c3nav.mapdata.utils.cache.changes import changed_geometries
        pickle.dump(changed_geometries, open(self._changed_geometries_filename(), 'wb'))

        if new:
            transaction.on_commit(
                lambda: cache.set('mapdata:last_update', self.to_tuple, 300)
            )
            if settings.HAS_CELERY:
                transaction.on_commit(
                    lambda: process_map_updates.delay()
                )
geometry.py 文件源码 项目:c3nav 作者: c3nav 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def clean_cut_polygon(polygon: Polygon) -> Polygon:
    interiors = []
    interiors.extend(cut_ring(polygon.exterior))
    exteriors = [(i, ring) for (i, ring) in enumerate(interiors) if ring.is_ccw]

    with suppress(AttributeError):
        delattr(polygon, 'c3nav_cache')

    if len(exteriors) != 1:
        raise ValueError('Invalid cut polygon!')
    exterior = interiors[exteriors[0][0]]
    interiors.pop(exteriors[0][0])

    for ring in polygon.interiors:
        interiors.extend(cut_ring(ring))

    return Polygon(exterior, interiors)
server.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def shutdown(self, timeout=15.0):
        """Worker process is about to exit, we need cleanup everything and
        stop accepting requests. It is especially important for keep-alive
        connections."""
        if self._request_handler is None:
            return
        self._closing = True

        if self._request_count > 1 and not self._reading_request:
            # force-close idle keep-alive connections
            self._request_handler.cancel()
        elif timeout:
            canceller = self._loop.call_later(timeout,
                                              self._request_handler.cancel)
            with suppress(asyncio.CancelledError):
                yield from self._request_handler
            canceller.cancel()
        else:
            self._request_handler.cancel()
methods.py 文件源码 项目:steemdata-mongo 作者: SteemData 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def upsert_comment_chain(mongo, identifier, recursive=False):
    """ Upsert given comments and its parent(s).

    Args:
        mongo: mongodb instance
        identifier: Post identifier
        recursive: (Defaults to False). If True, recursively update all parent comments, incl. root post.
    """
    with suppress(PostDoesNotExist):
        p = Post(identifier)
        if p.is_comment():
            mongo.Comments.update({'identifier': p.identifier}, p.export(), upsert=True)
            parent_identifier = '@%s/%s' % (p.parent_author, p.parent_permlink)
            if recursive:
                upsert_comment_chain(mongo, parent_identifier, recursive)
            else:
                upsert_comment(mongo, parent_identifier)
        else:
            return mongo.Posts.update({'identifier': p.identifier}, p.export(), upsert=True)
proxypool_common.py 文件源码 项目:retr 作者: aikipooh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def load(self, arg):
        '''Always load all proxies, we'll be able to prune and rearrange later
    '''

        self.master_plist=OrderedDict()
        if isinstance(arg, str): # File name    
            lg.info('Loading {}'.format(arg))
            c=Counter()
            with suppress(FileNotFoundError), open(arg) as f:
                rd=reader(f, delimiter='\t')
                for row in rd: # (proxy, status)
                    status=row[1] if len(row) > 1 else ''
                    c.update([status])
                    p=proxy(row[0], status)
                    self.master_plist[p.p]=p                
            lg.info('Loaded {} {}'.format(arg, c))
        else: # Iterable of proxies
            for i in arg:
                p=proxy(i, 'G') # Mark as good
                self.master_plist[p.p]=p
            lg.info('Loaded {} proxies from iterable'.format(len(self.master_plist)))


问题


面经


文章

微信
公众号

扫码关注公众号