python类utcnow()的实例源码

test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_event(self, container_factory, rabbit_config):

        from examples.retry import Service

        container = container_factory(Service, rabbit_config)
        container.start()

        timestamp = arrow.utcnow().replace(seconds=+1)

        dispatch = event_dispatcher(rabbit_config)
        with entrypoint_waiter(
            container, 'handle_event', callback=wait_for_result
        ) as result:
            payload = {'timestamp': timestamp.isoformat()}
            dispatch("src_service", "event_type", payload)

        res = result.get()
        assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_message(self, container_factory, rabbit_config):

        from examples.retry import Service

        container = container_factory(Service, rabbit_config)
        container.start()

        timestamp = arrow.utcnow().replace(seconds=+1)

        publish = publisher(rabbit_config)
        with entrypoint_waiter(
            container, 'handle_message', callback=wait_for_result
        ) as result:
            payload = {'timestamp': timestamp.isoformat()}
            publish(payload, routing_key="messages")

        res = result.get()
        assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
test_aws.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_delete_item(self):
        """Method to test receipt index"""
        campaign_table = DynamoTable('campaigns')

        # Add a record
        data = {"campaign_id": "my_campaign",
                "notified_on": arrow.utcnow().isoformat(),
                "campaign_status": "active"
                }
        campaign_table.put_item(data)

        # Verify write
        key = {"campaign_id": "my_campaign"}
        item = campaign_table.get_item(key)
        assert item is not None
        self.assertEqual(item["campaign_status"], "active")

        # Delete
        campaign_table.delete_item(key)

        # Verify it deleted
        item = campaign_table.get_item(key)
        assert item is None
test_aws.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_scan_table(self):
        """Method to test scanning a table"""

        def scan_func(items, input_val):
            for item in items:
                if item["campaign_status"]["S"] == "complete":
                    input_val['count'] += 1
            return input_val

        campaign_table = DynamoTable('campaigns')

        # Add a record
        for idx in range(0, 10):
            data = {"campaign_id": "my_campaign_{}".format(idx),
                    "notified_on": arrow.utcnow().isoformat(),
                    "campaign_status": "complete"
                    }
            campaign_table.put_item(data)

        # Scan table
        result = {"count": 0}
        campaign_table.scan_table(scan_func, result, "campaign_status")

        self.assertEqual(result["count"], 10)
jarvis_irc.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send(bot, text, private=False, notice=False):
    """Send irc message."""
    text = str(text)
    tr = bot._trigger
    jarvis.db.Message.create(
        user=bot.config.core.nick,
        channel=tr.sender,
        time=arrow.utcnow().timestamp,
        text=text)
    mode = 'NOTICE' if notice else 'PRIVMSG'
    recipient = tr.nick if private or notice else tr.sender
    try:
        bot.sending.acquire()
        text = textwrap.wrap(text, width=420)[0]
        bot.write((mode, recipient), text)
    finally:
        bot.sending.release()
notes.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def masstell(inp, *, names, separator, text, users, message):
    """Send a single message to several users."""
    if (names and users) or (text and message):
        return lex.masstell.arg_conflict
    names, text = names or users, text or message
    if not names or not text:
        return lex.masstell.missing_args

    time = arrow.utcnow().timestamp
    db.Tell.insert_many([dict(
        recipient=user,
        sender=inp.user,
        text=text,
        time=time,
        topic=None) for user in set(names)]).execute()
    return lex.tell.send
smrt.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def clean_indicator(self, i, rule):
        # check for de-fang'd feed
        if rule.replace:
            for e in i:
                if not rule.replace.get(e):
                    continue

                for k, v in rule.replace[e].items():
                    i[e] = i[e].replace(k, v)

        i = normalize_itype(i)

        if isinstance(i, dict):
            i = Indicator(**i)

        if not i.firsttime:
            i.firsttime = i.lasttime

        if not i.reporttime:
            i.reporttime = arrow.utcnow().datetime

        if not i.group:
            i.group = 'everyone'

        return i
test_metric.py 文件源码 项目:cloudwatch-fluent-metrics 作者: awslabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_can_add_multiple_timers():
    name1 = 'test_timer_1'
    name2 = 'test_timer_2'
    m = FluentMetric()
    m.with_timer(name1)
    time.sleep(1)
    t = m.get_timer(name1)
    assert t.start < arrow.utcnow()
    assert t.elapsed_in_ms() > 1000 and t.elapsed_in_ms() < 2000

    m.with_timer(name2)
    time.sleep(1)
    u = m.get_timer(name2)
    assert u.start < arrow.utcnow()
    assert u.elapsed_in_ms() > 1000 and u.elapsed_in_ms() < 2000
    assert t.elapsed_in_ms() > 2000
worker.py 文件源码 项目:scriptworker 作者: mozilla-releng 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    """Scriptworker entry point: get everything set up, then enter the main loop."""
    context, credentials = get_context_from_cmdln(sys.argv[1:])
    log.info("Scriptworker starting up at {} UTC".format(arrow.utcnow().format()))
    cleanup(context)
    conn = aiohttp.TCPConnector(limit=context.config['aiohttp_max_connections'])
    loop = asyncio.get_event_loop()
    with aiohttp.ClientSession(connector=conn) as session:
        context.session = session
        context.credentials = credentials
        while True:
            try:
                loop.run_until_complete(async_main(context))
            except Exception:
                log.critical("Fatal exception", exc_info=1)
                raise
test_artifacts.py 文件源码 项目:scriptworker 作者: mozilla-releng 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_create_artifact(context, fake_session, successful_queue, event_loop):
    path = os.path.join(context.config['artifact_dir'], "one.txt")
    touch(path)
    context.session = fake_session
    expires = arrow.utcnow().isoformat()
    context.temp_queue = successful_queue
    event_loop.run_until_complete(
        create_artifact(context, path, "public/env/one.txt", content_type='text/plain', content_encoding=None, expires=expires)
    )
    assert successful_queue.info == [
        "createArtifact", ('taskId', 'runId', "public/env/one.txt", {
            "storageType": "s3",
            "expires": expires,
            "contentType": "text/plain",
        }), {}
    ]

    # TODO: Assert the content of the PUT request is valid. This can easily be done once MagicMock supports async
    # context managers. See http://bugs.python.org/issue26467 and https://github.com/Martiusweb/asynctest/issues/29.
    context.session.close()
test_gpg.py 文件源码 项目:scriptworker 作者: mozilla-releng 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_create_second_gpg_conf(mocker, tmpdir):
    now = arrow.utcnow()
    with mock.patch.object(arrow, 'utcnow') as p:
        p.return_value = now
        sgpg.create_gpg_conf(
            tmpdir, keyserver=GPG_CONF_PARAMS[0][0], my_fingerprint=GPG_CONF_PARAMS[0][1]
        )
        sgpg.create_gpg_conf(
            tmpdir, keyserver=GPG_CONF_PARAMS[1][0], my_fingerprint=GPG_CONF_PARAMS[1][1]
        )
        with open(os.path.join(tmpdir, "gpg.conf"), "r") as fh:
            assert fh.read() == GPG_CONF_PARAMS[1][2]
        with open(os.path.join(tmpdir, "gpg.conf.{}".format(now.timestamp)), "r") as fh:
            assert fh.read() == GPG_CONF_PARAMS[0][2]


# generate_key {{{1
smtp_server.py 文件源码 项目:homingbot 作者: homingbot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process_message(self, mailfrom, rcpttos, data):
        email = None
        messages = []
        emails = []
        splitted = data.split("\r\n", 1)
        subject = splitted[0] if (len(splitted) > 0) else ""
        body = splitted[1] if (len(splitted) > 1) else ""
        try:
            for recipient in rcpttos:
                num_emails = 0
                res = db_service.custom_query(COUNT_QUERY_TEMPLATE.format(recipient['address'])) #TODO async
                if (res is not None and len(res.current_rows) > 0):
                    num_emails = res.current_rows[0]['system.count(address)']
                msg = Message(address=recipient['address'], message_index=num_emails, body=data, size=len(data), sender=mailfrom, created=arrow.utcnow().datetime)
                messages.append(msg)
            db_service.batch_save(messages, ttl=recipient['ttl(created)']) # Save with the ttl of the email account
        except Exception as e:
            logger.error(e)
            return STATUS_ERROR
        return STATUS_OK
api.py 文件源码 项目:science-gateway-middleware 作者: alan-turing-institute 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def post(self):
        job_json = request.json
        if job_json is None:
            abort(400, message="Message body could not be parsed as JSON")
        # Try parsing Job JSON to Job object
        try:
            job = json_to_job(job_json)
        except:
            abort(400, message="Message body is not valid Job JSON")
        if self.jobs.exists(job.id):
            abort(409, message="Job with ID {} already "
                               "exists".format(job.id))
        else:
            # Set certain job properties when first persisted
            job.creation_datetime = arrow.utcnow()
            job.status = "Draft"
            job = self.jobs.create(job)
            return job_to_json(job), 200, {'Content-Type': 'application/json'}
__init__.py 文件源码 项目:freqtrade 作者: gcarq 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def buy(pair: str, rate: float, amount: float) -> str:
    if _CONF['dry_run']:
        global _DRY_RUN_OPEN_ORDERS
        order_id = 'dry_run_buy_{}'.format(randint(0, 10**6))
        _DRY_RUN_OPEN_ORDERS[order_id] = {
            'pair': pair,
            'rate': rate,
            'amount': amount,
            'type': 'LIMIT_BUY',
            'remaining': 0.0,
            'opened': arrow.utcnow().datetime,
            'closed': arrow.utcnow().datetime,
        }
        return order_id

    return _API.buy(pair, rate, amount)
__init__.py 文件源码 项目:freqtrade 作者: gcarq 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sell(pair: str, rate: float, amount: float) -> str:
    if _CONF['dry_run']:
        global _DRY_RUN_OPEN_ORDERS
        order_id = 'dry_run_sell_{}'.format(randint(0, 10**6))
        _DRY_RUN_OPEN_ORDERS[order_id] = {
            'pair': pair,
            'rate': rate,
            'amount': amount,
            'type': 'LIMIT_SELL',
            'remaining': 0.0,
            'opened': arrow.utcnow().datetime,
            'closed': arrow.utcnow().datetime,
        }
        return order_id

    return _API.sell(pair, rate, amount)
cooldown.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def on_cooldown(self, cmd, user):
        if isinstance(user, str):
            cd_name = f'cd_{cmd}_{user}'
        else:
            cd_name = f'cd_{cmd}_{user.id}'
        entry = self.cache.get_cache(cd_name)
        if entry is None:
            entry = await self.cds.find_one({'name': cd_name})
        if entry:
            end_stamp = entry['end_stamp']
            now_stamp = arrow.utcnow().timestamp
            if now_stamp > end_stamp:
                cooldown = False
            else:
                cooldown = True
        else:
            cooldown = False
        return cooldown
cooldown.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def set_cooldown(self, cmd, user, amount):
        if isinstance(user, str):
            cd_name = f'cd_{cmd}_{user}'
        else:
            cd_name = f'cd_{cmd}_{user.id}'
        entry = self.cache.get_cache(cd_name)
        if entry:
            self.cache.del_cache(cd_name)
        else:
            entry = await self.cds.find_one({'name': cd_name})
        end_stamp = arrow.utcnow().timestamp + amount
        if entry:
            await self.cds.update_one({'name': cd_name}, {'$set': {'end_stamp': end_stamp}})
        else:
            cd_data = {
                'name': cd_name,
                'end_stamp': end_stamp
            }
            await self.cds.insert_one(cd_data)
stats_processing.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def add_cmd_stat(db, cmd, message, args):
    if not message.author.bot:
        if message.guild:
            channel_id = message.channel.id
            guild_id = message.guild.id
        else:
            channel_id = None
            guild_id = None
        stat_data = {
            'command': cmd.name,
            'args': args,
            'author': message.author.id,
            'channel': channel_id,
            'guild': guild_id,
            'timestamp': {
                'created': arrow.get(message.created_at).float_timestamp,
                'executed': arrow.utcnow().float_timestamp
            }
        }
        await db[db.db_cfg.database]['CommandStats'].insert_one(stat_data)
wfalerts.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def wfalerts(cmd, message, args):
    alert_url = 'https://deathsnacks.com/wf/data/alerts_raw.txt'
    async with aiohttp.ClientSession() as session:
        async with session.get(alert_url) as data:
            alert_data = await data.text()
    alert_list = parse_alert_data(alert_data)
    response = discord.Embed(color=0xFFCC66)
    response.set_author(name='Currently Ongoing Alerts')
    for alert in alert_list:
        alert_desc = f'Levels: {alert["levels"]["low"]} - {alert["levels"]["high"]}'
        alert_desc += f'\nLocation: {alert["node"]} ({alert["planet"]})'
        alert_desc += f'\nReward: {alert["rewards"]["credits"]}cr'
        time_left = alert['stamps']['end'] - arrow.utcnow().timestamp
        death_time = str(datetime.timedelta(seconds=time_left))
        if alert['rewards']['item']:
            alert_desc += f'\nItem: **{alert["rewards"]["item"]}**'
        alert_desc += f'\nDisappears In: {death_time}'
        response.add_field(name=f'Type: {alert["faction"]} {alert["type"]}', value=f'{alert_desc}', inline=False)
        response.set_thumbnail(url='https://i.imgur.com/99ennZD.png')
        response.set_footer(text='Timers are not updated live.')
    await message.channel.send(embed=response)
server_data_fill.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def server_data_fill(ev):
    global finished
    if ev.bot.guilds:
        ev.log.info('Filling server details...')
        start_stamp = arrow.utcnow().float_timestamp
        srv_coll = ev.db[ev.db.db_cfg.database].ServerDetails
        await srv_coll.drop()
        server_list = []
        for guild in ev.bot.guilds:
            srv_data = await generate_server_data(guild)
            server_list.append(srv_data)
            if len(server_list) >= 1000:
                await srv_coll.insert_many(server_list)
                server_list = []
                await asyncio.sleep(0.5)
        if server_list:
            await srv_coll.insert_many(server_list)
        end_stamp = arrow.utcnow().float_timestamp
        diff = round(end_stamp - start_stamp, 3)
        finished = True
        ev.log.info(f'Server detail filler finished in {diff}s')
purge.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def generate_log_embed(message, target, channel, deleted):
    response = discord.Embed(color=0x696969, timestamp=arrow.utcnow().datetime)
    response.set_author(name=f'#{channel.name} Has Been Pruned', icon_url=user_avatar(message.author))
    if target:
        target_text = f'{target.mention}\n{target.name}#{target.discriminator}'
    else:
        target_text = 'No Filter'
    response.add_field(name='?? Prune Details',
                       value=f'Amount: {len(deleted)} Messages\nTarget: {target_text}', inline=True)
    author = message.author
    response.add_field(name='?? Responsible',
                       value=f'{author.mention}\n{author.name}#{author.discriminator}',
                       inline=True)
    response.set_footer(text=f'ChannelID: {channel.id}')
    return response


# noinspection PyBroadException
statistics.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def statistics(cmd, message, args):
    sigma_image = 'https://i.imgur.com/mGyqMe1.png'
    sigma_title = 'Apex Sigma: Statistics'
    support_url = 'https://discordapp.com/invite/aEUCHwX'
    role_count = 0
    for guild in cmd.bot.guilds:
        role_count += len(guild.roles)
    time_dif = arrow.utcnow().timestamp - cmd.bot.start_time.timestamp
    command_rate = str(cmd.bot.command_count / time_dif)[:5]
    message_rate = str(cmd.bot.message_count / time_dif)[:5]
    pop_text = f'Servers: **{len(cmd.bot.guilds)}**'
    pop_text += f'\nChannels: **{len(list(cmd.bot.get_all_channels()))}**'
    pop_text += f'\nRoles: **{role_count}**'
    pop_text += f'\nMembers: **{len(list(cmd.bot.get_all_members()))}**'
    exec_text = f'Commands: **{cmd.bot.command_count}**'
    exec_text += f'\nCommand Rate: **{command_rate}/s**'
    exec_text += f'\nMessages: **{cmd.bot.message_count}**'
    exec_text += f'\nMessage Rate: **{message_rate}/s**'
    response = discord.Embed(color=0x1B6F5F, timestamp=cmd.bot.start_time.datetime)
    response.set_author(name=sigma_title, icon_url=sigma_image, url=support_url)
    response.add_field(name='Population', value=pop_text)
    response.add_field(name='Usage', value=exec_text)
    response.set_footer(text=f'Tracking since {cmd.bot.start_time.humanize()}')
    await message.channel.send(embed=response)
expiration_clock.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cycler(ev):
    poll_coll = ev.db[ev.db.db_cfg.database].ShadowPolls
    while True:
        now = arrow.utcnow().timestamp
        poll_files = await poll_coll.find({'settings.expires': {'$lt': now}, 'settings.active': True}).to_list(None)
        for poll_file in poll_files:
            poll_id = poll_file['id']
            poll_file['settings'].update({'active': False})
            await ev.db[ev.db.db_cfg.database].ShadowPolls.update_one({'id': poll_id}, {'$set': poll_file})
            author = discord.utils.find(lambda x: x.id == poll_file['origin']['author'], ev.bot.get_all_members())
            if author:
                response = discord.Embed(color=0xff3333, title=f'? Your poll {poll_file["id"]} has expired.')
                try:
                    await author.send(embed=response)
                except discord.Forbidden:
                    pass
        await asyncio.sleep(1)
voting.py 文件源码 项目:Chaos 作者: Chaosthebot 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_vote_weight(api, username, contributors):
    """ for a given username, determine the weight that their -1 or +1 vote
    should be scaled by """
    user = users.get_user(api, username)

    # we don't want new spam malicious spam accounts to have an influence on the project
    # if they've got a PR merged, they get a free pass
    if user["login"] not in contributors:
        # otherwise, check their account age
        now = arrow.utcnow()
        created = arrow.get(user["created_at"])
        age = (now - created).total_seconds()
        if age < settings.MIN_VOTER_AGE:
            return 0

    if username.lower() == "smittyvb":
        return 0.50002250052

    return 1
test_views.py 文件源码 项目:vishleva.com 作者: webmalc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_calendar(self):
        """
        Test calendar page
        """
        now = arrow.utcnow()
        event = Event()
        event.begin = now.replace(hours=+24).datetime
        event.end = now.replace(hours=+36).datetime
        event.title = 'test_title_now'
        event.status = 'open'
        event.save()

        self._login_superuser()
        url = reverse('admin:events_calendar')
        response = self.client.get(url)
        self._is_succesful(
            response, title='Events calendar | Django site admin')
        self.assertContains(response, 'test_title_now')
        self.assertContains(
            response, 'events-calendar-date">' + now.format('D MMM').lower())
time.py 文件源码 项目:dogbot 作者: slice 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def convert(self, ctx: DogbotContext, argument: str) -> str:
        cog: 'Time' = ctx.command.instance

        # resolve another user's timezone
        try:
            member = await MemberConverter().convert(ctx, argument)
            timezone = await cog.get_timezone_for(member)
            if timezone:
                return timezone
        except BadArgument:
            pass

        # hippo checking
        blacklisted = list('`\\<>@')
        if any(character in argument for character in blacklisted) or len(argument) > 30:
            raise BadArgument("That doesn't look like a timezone.")

        # actually check if it's a valid timezone with arrow's parser
        try:
            arrow.utcnow().to(argument)
        except arrow.parser.ParserError:
            raise BadArgument('Invalid timezone.')

        return argument
stats.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def add_cmd_stat(db, cmd, message, args):
    if not message.author.bot:
        command_data = {
            'name': cmd.name,
        }
        for key in ['global', 'sfw', 'admin', 'partner', 'pmable']:
            command_data[key] = cmd.perm[key]
        if message.guild:
            channel_id = message.channel.id
            guild_id = message.guild.id
        else:
            channel_id = None
            guild_id = None
        stat_data = {
            'command': command_data,
            'args': args,
            'author': message.author.id,
            'channel': channel_id,
            'guild': guild_id,
            'timestamp': arrow.utcnow().timestamp
        }
        db.insert_one('CommandStats', stat_data)
refactor.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def refactor_users_node(db, usrgen):
    usrs = list(usrgen)
    db['UserList'].drop()
    db.log.info('UserList Dropped And Starting Refactoring Process...')
    start_time = arrow.utcnow().timestamp
    usercount = 0
    for user in usrs:
        usercount += 1
        user_ava = user_avatar(user)
        data = {
            'UserID': user.id,
            'UserName': user.name,
            'Avatar': user_ava,
            'Discriminator': user.discriminator
        }
        db['UserList'].insert_one(data)
        await asyncio.sleep(0.001)
    elapsed_time = arrow.utcnow().timestamp - start_time
    db.log.info(f'{usercount} Users Updated in {elapsed_time} seconds.')
refactor.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def refactor_servers_node(db, servers):
    srvs = servers
    db['ServerList'].drop()
    db.log.info('ServerList Dropped And Starting Refactoring Process...')
    start_time = arrow.utcnow().timestamp
    for server in srvs:
        owner = server.owner
        if owner:
            owner_name = owner.name
            owner_id = owner.id
        else:
            owner_name = 'None'
            owner_id = 'Unknown'
        data = {
            'ServerID': server.id,
            'Icon': server.icon_url,
            'ServerName': server.name,
            'Owner': owner_name,
            'OwnerID': owner_id
        }
        db['ServerList'].insert_one(data)
        await asyncio.sleep(0.001)
    elapsed_time = arrow.utcnow().timestamp - start_time
    db.log.info(f'{len(servers)} Servers Updated in {elapsed_time} seconds.')
wffissures.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def wffissures(cmd, message, args):
    fissure_url = 'https://deathsnacks.com/wf/data/activemissions.json'
    async with aiohttp.ClientSession() as session:
        async with session.get(fissure_url) as data:
            fissure_data = await data.read()
            fissure_list = json.loads(fissure_data)
    response = discord.Embed(color=0x66ccff)
    response.set_author(name='Current Ongoing Fissures', icon_url='https://i.imgur.com/vANGxqe.png')
    for fis in fissure_list:
        relic_tier = tier_names[fis['Modifier']]
        fis_desc = f'Location: {fis["Node"]}'
        time_left = fis['Expiry']['sec'] - arrow.utcnow().timestamp
        death_time = str(datetime.timedelta(seconds=time_left))
        fis_desc += f'\nDisappears In: {death_time}'
        response.add_field(name=f'{relic_tier} Void Fissure', value=fis_desc)
    response.set_footer(text='Timers are not updated live.')
    await message.channel.send(embed=response)


问题


面经


文章

微信
公众号

扫码关注公众号