python类utcnow()的实例源码

version_updater.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def version_updater(ev):
    if DevMode:
        with open('VERSION', 'r') as version_file:
            current_version_data = yaml.safe_load(version_file)
        beta = current_version_data['beta']
        build_date = arrow.utcnow().timestamp
        major = current_version_data['version']['major']
        minor = current_version_data['version']['minor']
        patch = current_version_data['version']['patch'] + 1
        codename = current_version_data['codename']
        data_out = {
            'beta': beta,
            'build_date': build_date,
            'version': {
                'major': major,
                'minor': minor,
                'patch': patch
            },
            'codename': codename
        }
        with open('VERSION', 'w') as version_out:
            yaml.dump(data_out, version_out, default_flow_style=False)
        ev.log.info('Updated Version File.')
move_log_join.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def move_log_join(ev, member):
    try:
        log_channel_id = ev.db.get_settings(member.guild.id, 'LoggingChannel')
    except:
        log_channel_id = None
    if log_channel_id:
        log_channel = discord.utils.find(lambda x: x.id == log_channel_id, member.guild.channels)
        if log_channel:
            response = discord.Embed(color=0x66CC66, timestamp=arrow.utcnow().datetime)
            response.set_author(name=f'A Member Has Joined', icon_url=user_avatar(member))
            response.add_field(name='?? Joining Member', value=f'{member.mention}\n{member.name}#{member.discriminator}')
            new_acc, diff_msg = get_time_difference(member)
            if new_acc:
                response.add_field(name='? Account Is New', value=f'Made {diff_msg.title()}', inline=True)
            else:
                response.add_field(name='?? Account Created', value=f'{diff_msg.title()}', inline=True)
            response.set_footer(text=f'UserID: {member.id}')
            await log_channel.send(embed=response)
        else:
            ev.db.set_settings(member.guild.id, 'LoggingChannel', None)
move_log_leave.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def move_log_leave(ev, member):
    try:
        log_channel_id = ev.db.get_settings(member.guild.id, 'LoggingChannel')
    except:
        log_channel_id = None
    if log_channel_id:
        log_channel = discord.utils.find(lambda x: x.id == log_channel_id, member.guild.channels)
        if log_channel:
            response = discord.Embed(color=0xDB0000, timestamp=arrow.utcnow().datetime)
            response.set_author(name=f'A Member Has Left', icon_url=user_avatar(member))
            response.add_field(name='?? Leaving Member', value=f'{member.mention}\n{member.name}#{member.discriminator}')
            new_acc, diff_msg = get_time_difference(member, leave=True)
            response.add_field(name='?? Member Joined', value=f'{diff_msg.title()}', inline=True)
            response.set_footer(text=f'UserID: {member.id}')
            await log_channel.send(embed=response)
        else:
            ev.db.set_settings(member.guild.id, 'LoggingChannel', None)
tasks.py 文件源码 项目:hardware-lab 作者: Buzzvil 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def update_public_url():
    response = requests.get('http://localhost:4040/api/tunnels')

    public_url = response.json()['tunnels'][0]['public_url']
    # TODO: ???? http/https ??? ???? ???? ? ??? ?? ???? - by zune
    public_url = public_url.replace('http://', 'https://')
    logger.info('public url {}'.format(public_url))

    PublicUrl(
        machine_id=get_machine_id(),
        location=settings.AC_LOCATION,
        public_url=public_url,
        updated_at=arrow.utcnow().datetime,
        ttl=arrow.utcnow().shift(days=1).datetime,
        hostname=os.environ.get('HOSTNAME') or socket.gethostname(),
    ).save()
test_utils.py 文件源码 项目:remote-docker 作者: phizaz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_Job_dict(self):
        import arrow
        a = arrow.utcnow()
        d = {
            'tag': 'tag',
            'hosts': ['host1', 'host2'],
            'using_host': 'host1',
            'step': 'step',
            'docker': 'nvidia-docker',
            'remote_path': 'remote_path',
            'command': 'command',
            'start_time': str(a),
            'container': 'container',
            'oth': dict(a=10),
        }
        job = utils.Job.parse(d)
        self.assertDictEqual(job.dict(), d)
utils.py 文件源码 项目:remote-docker 作者: phizaz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, tag, hosts, using_host,
                 remote_path, command, step, docker='docker',
                 start_time=None, container=None, oth=None):
        assert isinstance(hosts, list)
        import arrow
        self.tag = tag
        self.hosts = hosts
        self.using_host = using_host
        self.remote_path = remote_path
        self.command = command
        self.step = step
        self.docker = docker
        if not start_time:
            self.start_time = arrow.utcnow()
        else:
            self.start_time = arrow.get(start_time)
        self.container = container
        self.oth = oth
wffissures.py 文件源码 项目:apex-sigma-plugins 作者: lu-ci 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def wffissures(cmd, message, args):
    fissure_url = 'https://deathsnacks.com/wf/data/activemissions.json'
    async with aiohttp.ClientSession() as session:
        async with session.get(fissure_url) as data:
            fissure_data = await data.read()
            fissure_list = json.loads(fissure_data)
    response = discord.Embed(color=0x66ccff, title='Currently Ongoing Fissures')
    fissure_list = sorted(fissure_list, key=lambda k: k['Modifier'])
    for fis in fissure_list:
        relic_tier = tier_names[fis['Modifier']]
        fis_desc = f'Location: {fis["Node"]}'
        time_left = fis['Expiry']['sec'] - arrow.utcnow().timestamp
        death_time = str(datetime.timedelta(seconds=time_left))
        fis_desc += f'\nDisappears In: {death_time}'
        response.add_field(name=f'{relic_tier} Void Fissure', value=fis_desc, inline=False)
    response.set_footer(text='Timers are not updated live.')
    response.set_thumbnail(url=fissure_icon)
    await message.channel.send(embed=response)
wfalerts.py 文件源码 项目:apex-sigma-plugins 作者: lu-ci 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def wfalerts(cmd, message, args):
    alert_url = 'https://deathsnacks.com/wf/data/alerts_raw.txt'
    async with aiohttp.ClientSession() as session:
        async with session.get(alert_url) as data:
            alert_data = await data.text()
    alert_list = parse_alert_data(alert_data)
    response = discord.Embed(color=0xFFCC66)
    response.set_author(name='Currently Ongoing Alerts')
    for alert in alert_list:
        alert_desc = f'Levels: {alert["levels"]["low"]} - {alert["levels"]["high"]}'
        alert_desc += f'\nLocation: {alert["node"]} ({alert["planet"]})'
        alert_desc += f'\nReward: {alert["rewards"]["credits"]}cr'
        time_left = alert['stamps']['end'] - arrow.utcnow().timestamp
        death_time = str(datetime.timedelta(seconds=time_left))
        if alert['rewards']['item']:
            alert_desc += f'\nItem: **{alert["rewards"]["item"]}**'
        alert_desc += f'\nDisappears In: {death_time}'
        response.add_field(name=f'Type: {alert["faction"]} {alert["type"]}', value=f'{alert_desc}', inline=False)
        response.set_thumbnail(url='https://i.imgur.com/99ennZD.png')
        response.set_footer(text='Timers are not updated live.')
    await message.channel.send(embed=response)
stats_logger.py 文件源码 项目:apex-sigma-plugins 作者: lu-ci 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def population_insert_clock(ev):
    while True:
        if not ev.bot.cool_down.on_cooldown(ev.name, 'stats_logger'):
            ev.bot.cool_down.set_cooldown(ev.name, 'stats_logger', 3600)
            collection = 'StatisticsLogs'
            database = ev.bot.cfg.db.database
            server_count = len(list(ev.bot.guilds))
            member_count = len(list(ev.bot.get_all_members()))
            channel_count = len(list(ev.bot.get_all_channels()))
            command_count = ev.db[database].CommandStats.count()
            stat_data = {
                'stamp': arrow.utcnow().timestamp,
                'guilds': server_count,
                'users': member_count,
                'channels': channel_count,
                'commands': command_count
            }
            ev.db[database][collection].insert_one(stat_data)
        await asyncio.sleep(300)
user_data_fill.py 文件源码 项目:apex-sigma-plugins 作者: lu-ci 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def user_data_fill(ev):
    ev.log.info('Filling member details...')
    threads = ThreadPoolExecutor(2)
    start_stamp = arrow.utcnow().float_timestamp
    ev.bot.cool_down.set_cooldown(ev.name, 'member_details', 3600)
    mem_coll = ev.db[ev.db.db_cfg.database].UserDetails
    mem_coll.drop()
    for x in range(0, ev.bot.shard_count):
        shard_start = arrow.utcnow().float_timestamp
        member_list = []
        for guild in ev.bot.guilds:
            if guild.shard_id == x:
                for member in guild.members:
                    mem_data = await generate_member_data(member)
                    member_list.append(mem_data)
        task = functools.partial(mem_coll.insert, member_list)
        await ev.bot.loop.run_in_executor(threads, task)
        shard_end = arrow.utcnow().float_timestamp
        shard_diff = round(shard_end - shard_start, 3)
        ev.log.info(f'Filled Shard #{x} Members in {shard_diff}s.')
    end_stamp = arrow.utcnow().float_timestamp
    diff = round(end_stamp - start_stamp, 3)
    ev.log.info(f'Member detail filler finished in {diff}s.')
version_updater.py 文件源码 项目:apex-sigma-plugins 作者: lu-ci 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def version_updater(ev):
    if ev.bot.cfg.pref.dev_mode:
        with open('info/version.yml', 'r') as version_file:
            current_version_data = yaml.safe_load(version_file)
        beta = current_version_data['beta']
        build_date = arrow.utcnow().timestamp
        major = current_version_data['version']['major']
        minor = current_version_data['version']['minor']
        patch = current_version_data['version']['patch'] + 1
        codename = current_version_data['codename']
        data_out = {
            'beta': beta,
            'build_date': build_date,
            'version': {
                'major': major,
                'minor': minor,
                'patch': patch
            },
            'codename': codename
        }
        with open('info/version.yml', 'w') as version_out:
            yaml.dump(data_out, version_out, default_flow_style=False)
        ev.log.info('Updated Version File.')
statistics.py 文件源码 项目:apex-sigma-plugins 作者: lu-ci 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def statistics(cmd, message, args):
    sigma_image = 'https://i.imgur.com/mGyqMe1.png'
    sigma_title = 'Apex Sigma: Statistics'
    support_url = 'https://discordapp.com/invite/aEUCHwX'
    role_count = 0
    for guild in cmd.bot.guilds:
        role_count += len(guild.roles)
    time_dif = arrow.utcnow().timestamp - cmd.bot.start_time.timestamp
    command_rate = str(cmd.bot.command_count / time_dif)[:5]
    message_rate = str(cmd.bot.message_count / time_dif)[:5]
    pop_text = f'Servers: **{len(cmd.bot.guilds)}**'
    pop_text += f'\nChannels: **{len(list(cmd.bot.get_all_channels()))}**'
    pop_text += f'\nRoles: **{role_count}**'
    pop_text += f'\nMembers: **{len(list(cmd.bot.get_all_members()))}**'
    exec_text = f'Commands: **{cmd.bot.command_count}**'
    exec_text += f'\nCommand Rate: **{command_rate}/s**'
    exec_text += f'\nMessages: **{cmd.bot.message_count}**'
    exec_text += f'\nMessage Rate: **{message_rate}/s**'
    response = discord.Embed(color=0x1B6F5F, timestamp=cmd.bot.start_time.datetime)
    response.set_author(name=sigma_title, icon_url=sigma_image, url=support_url)
    response.add_field(name='Population', value=pop_text)
    response.add_field(name='Usage', value=exec_text)
    response.set_footer(text=f'Tracking since {cmd.bot.start_time.humanize()}')
    await message.channel.send(embed=response)
timeconvert.py 文件源码 项目:apex-sigma-plugins 作者: lu-ci 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def timeconvert(cmd, message, args):
    if args:
        conv_input = ' '.join(args).split('>')
        if len(conv_input) == 2:
            from_pieces = conv_input[0].split()
            if len(from_pieces) == 2:
                from_time = from_pieces[0]
                from_zone = from_pieces[1]
                to_zone = conv_input[1]
                from_string = f'{arrow.utcnow().format("YYYY-MM-DD")} {from_time}:00'
                from_arrow = arrow.get(arrow.get(from_string).datetime, from_zone)
                to_arrow = from_arrow.to(to_zone)
                time_out = to_arrow.format('YYYY-MM-DD HH:mm:ss (ZZ GMT)')
                response = discord.Embed(color=0x696969, title=f'?? {time_out}')
            else:
                response = discord.Embed(color=0xBE1931, title='? Invalid first argument.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Invalid input arguments.')
    else:
        response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
    await message.channel.send(embed=response)
user.py 文件源码 项目:dbas 作者: hhucn 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_count_of_votes_of_user(user, limit_on_today=False):
    """
    Returns the count of marked ones of the user

    :param user: User
    :param limit_on_today: Boolean
    :return: Int, Int
    """
    if not user:
        return 0

    db_arg = DBDiscussionSession.query(MarkedArgument).filter(ClickedArgument.author_uid == user.uid)
    db_stat = DBDiscussionSession.query(MarkedStatement).filter(ClickedStatement.author_uid == user.uid)

    if limit_on_today:
        today = arrow.utcnow().to('Europe/Berlin').format('YYYY-MM-DD')
        db_arg = db_arg.filter(MarkedArgument.timestamp >= today)
        db_stat = db_stat.filter(MarkedStatement.timestamp >= today)

    db_arg = db_arg.all()
    db_stat = db_stat.all()

    return len(db_arg), len(db_stat)
user.py 文件源码 项目:dbas 作者: hhucn 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_count_of_clicks(user, limit_on_today=False):
    """
    Returns the count of clicks of the user

    :param user: User
    :param limit_on_today: Boolean
    :return: Int, Int
    """
    if not user:
        return 0

    db_arg = DBDiscussionSession.query(ClickedArgument).filter(ClickedArgument.author_uid == user.uid)
    db_stat = DBDiscussionSession.query(ClickedStatement).filter(ClickedStatement.author_uid == user.uid)

    if limit_on_today:
        today = arrow.utcnow().to('Europe/Berlin').format('YYYY-MM-DD')
        db_arg = db_arg.filter(ClickedArgument.timestamp >= today)
        db_stat = db_stat.filter(ClickedStatement.timestamp >= today)

    db_arg = db_arg.all()
    db_stat = db_stat.all()

    return len(db_arg), len(db_stat)
main.py 文件源码 项目:ardy 作者: avara1986 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def my_handler(event, context):

    utc = arrow.utcnow()
    local = utc.to('US/Pacific')
    date_to_print = local.humanize()

    message = 'Hello world lambda1! at {}'.format(date_to_print)
    return {
        'message': message
    }
test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_rpc(self, container_factory, rabbit_config):

        from examples.retry import Service

        container = container_factory(Service, rabbit_config)
        container.start()

        timestamp = arrow.utcnow().replace(seconds=+1)

        with ServiceRpcProxy('service', rabbit_config) as service_rpc:
            res = service_rpc.method(timestamp.isoformat())
        assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_decorated_rpc(self, container_factory, rabbit_config):

        from examples.retry import Service

        container = container_factory(Service, rabbit_config)
        container.start()

        timestamp = arrow.utcnow().replace(seconds=+1)

        with ServiceRpcProxy('service', rabbit_config) as service_rpc:
            res = service_rpc.decorated_method(timestamp.isoformat())
        assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def generate_message(self):
        return "Time is {}".format(arrow.utcnow())


问题


面经


文章

微信
公众号

扫码关注公众号