def version_updater(ev):
if DevMode:
with open('VERSION', 'r') as version_file:
current_version_data = yaml.safe_load(version_file)
beta = current_version_data['beta']
build_date = arrow.utcnow().timestamp
major = current_version_data['version']['major']
minor = current_version_data['version']['minor']
patch = current_version_data['version']['patch'] + 1
codename = current_version_data['codename']
data_out = {
'beta': beta,
'build_date': build_date,
'version': {
'major': major,
'minor': minor,
'patch': patch
},
'codename': codename
}
with open('VERSION', 'w') as version_out:
yaml.dump(data_out, version_out, default_flow_style=False)
ev.log.info('Updated Version File.')
python类utcnow()的实例源码
def move_log_join(ev, member):
try:
log_channel_id = ev.db.get_settings(member.guild.id, 'LoggingChannel')
except:
log_channel_id = None
if log_channel_id:
log_channel = discord.utils.find(lambda x: x.id == log_channel_id, member.guild.channels)
if log_channel:
response = discord.Embed(color=0x66CC66, timestamp=arrow.utcnow().datetime)
response.set_author(name=f'A Member Has Joined', icon_url=user_avatar(member))
response.add_field(name='?? Joining Member', value=f'{member.mention}\n{member.name}#{member.discriminator}')
new_acc, diff_msg = get_time_difference(member)
if new_acc:
response.add_field(name='? Account Is New', value=f'Made {diff_msg.title()}', inline=True)
else:
response.add_field(name='?? Account Created', value=f'{diff_msg.title()}', inline=True)
response.set_footer(text=f'UserID: {member.id}')
await log_channel.send(embed=response)
else:
ev.db.set_settings(member.guild.id, 'LoggingChannel', None)
def move_log_leave(ev, member):
try:
log_channel_id = ev.db.get_settings(member.guild.id, 'LoggingChannel')
except:
log_channel_id = None
if log_channel_id:
log_channel = discord.utils.find(lambda x: x.id == log_channel_id, member.guild.channels)
if log_channel:
response = discord.Embed(color=0xDB0000, timestamp=arrow.utcnow().datetime)
response.set_author(name=f'A Member Has Left', icon_url=user_avatar(member))
response.add_field(name='?? Leaving Member', value=f'{member.mention}\n{member.name}#{member.discriminator}')
new_acc, diff_msg = get_time_difference(member, leave=True)
response.add_field(name='?? Member Joined', value=f'{diff_msg.title()}', inline=True)
response.set_footer(text=f'UserID: {member.id}')
await log_channel.send(embed=response)
else:
ev.db.set_settings(member.guild.id, 'LoggingChannel', None)
def update_public_url():
response = requests.get('http://localhost:4040/api/tunnels')
public_url = response.json()['tunnels'][0]['public_url']
# TODO: ???? http/https ??? ???? ???? ? ??? ?? ???? - by zune
public_url = public_url.replace('http://', 'https://')
logger.info('public url {}'.format(public_url))
PublicUrl(
machine_id=get_machine_id(),
location=settings.AC_LOCATION,
public_url=public_url,
updated_at=arrow.utcnow().datetime,
ttl=arrow.utcnow().shift(days=1).datetime,
hostname=os.environ.get('HOSTNAME') or socket.gethostname(),
).save()
def test_Job_dict(self):
import arrow
a = arrow.utcnow()
d = {
'tag': 'tag',
'hosts': ['host1', 'host2'],
'using_host': 'host1',
'step': 'step',
'docker': 'nvidia-docker',
'remote_path': 'remote_path',
'command': 'command',
'start_time': str(a),
'container': 'container',
'oth': dict(a=10),
}
job = utils.Job.parse(d)
self.assertDictEqual(job.dict(), d)
def __init__(self, tag, hosts, using_host,
remote_path, command, step, docker='docker',
start_time=None, container=None, oth=None):
assert isinstance(hosts, list)
import arrow
self.tag = tag
self.hosts = hosts
self.using_host = using_host
self.remote_path = remote_path
self.command = command
self.step = step
self.docker = docker
if not start_time:
self.start_time = arrow.utcnow()
else:
self.start_time = arrow.get(start_time)
self.container = container
self.oth = oth
def wffissures(cmd, message, args):
fissure_url = 'https://deathsnacks.com/wf/data/activemissions.json'
async with aiohttp.ClientSession() as session:
async with session.get(fissure_url) as data:
fissure_data = await data.read()
fissure_list = json.loads(fissure_data)
response = discord.Embed(color=0x66ccff, title='Currently Ongoing Fissures')
fissure_list = sorted(fissure_list, key=lambda k: k['Modifier'])
for fis in fissure_list:
relic_tier = tier_names[fis['Modifier']]
fis_desc = f'Location: {fis["Node"]}'
time_left = fis['Expiry']['sec'] - arrow.utcnow().timestamp
death_time = str(datetime.timedelta(seconds=time_left))
fis_desc += f'\nDisappears In: {death_time}'
response.add_field(name=f'{relic_tier} Void Fissure', value=fis_desc, inline=False)
response.set_footer(text='Timers are not updated live.')
response.set_thumbnail(url=fissure_icon)
await message.channel.send(embed=response)
def wfalerts(cmd, message, args):
alert_url = 'https://deathsnacks.com/wf/data/alerts_raw.txt'
async with aiohttp.ClientSession() as session:
async with session.get(alert_url) as data:
alert_data = await data.text()
alert_list = parse_alert_data(alert_data)
response = discord.Embed(color=0xFFCC66)
response.set_author(name='Currently Ongoing Alerts')
for alert in alert_list:
alert_desc = f'Levels: {alert["levels"]["low"]} - {alert["levels"]["high"]}'
alert_desc += f'\nLocation: {alert["node"]} ({alert["planet"]})'
alert_desc += f'\nReward: {alert["rewards"]["credits"]}cr'
time_left = alert['stamps']['end'] - arrow.utcnow().timestamp
death_time = str(datetime.timedelta(seconds=time_left))
if alert['rewards']['item']:
alert_desc += f'\nItem: **{alert["rewards"]["item"]}**'
alert_desc += f'\nDisappears In: {death_time}'
response.add_field(name=f'Type: {alert["faction"]} {alert["type"]}', value=f'{alert_desc}', inline=False)
response.set_thumbnail(url='https://i.imgur.com/99ennZD.png')
response.set_footer(text='Timers are not updated live.')
await message.channel.send(embed=response)
def population_insert_clock(ev):
while True:
if not ev.bot.cool_down.on_cooldown(ev.name, 'stats_logger'):
ev.bot.cool_down.set_cooldown(ev.name, 'stats_logger', 3600)
collection = 'StatisticsLogs'
database = ev.bot.cfg.db.database
server_count = len(list(ev.bot.guilds))
member_count = len(list(ev.bot.get_all_members()))
channel_count = len(list(ev.bot.get_all_channels()))
command_count = ev.db[database].CommandStats.count()
stat_data = {
'stamp': arrow.utcnow().timestamp,
'guilds': server_count,
'users': member_count,
'channels': channel_count,
'commands': command_count
}
ev.db[database][collection].insert_one(stat_data)
await asyncio.sleep(300)
def user_data_fill(ev):
ev.log.info('Filling member details...')
threads = ThreadPoolExecutor(2)
start_stamp = arrow.utcnow().float_timestamp
ev.bot.cool_down.set_cooldown(ev.name, 'member_details', 3600)
mem_coll = ev.db[ev.db.db_cfg.database].UserDetails
mem_coll.drop()
for x in range(0, ev.bot.shard_count):
shard_start = arrow.utcnow().float_timestamp
member_list = []
for guild in ev.bot.guilds:
if guild.shard_id == x:
for member in guild.members:
mem_data = await generate_member_data(member)
member_list.append(mem_data)
task = functools.partial(mem_coll.insert, member_list)
await ev.bot.loop.run_in_executor(threads, task)
shard_end = arrow.utcnow().float_timestamp
shard_diff = round(shard_end - shard_start, 3)
ev.log.info(f'Filled Shard #{x} Members in {shard_diff}s.')
end_stamp = arrow.utcnow().float_timestamp
diff = round(end_stamp - start_stamp, 3)
ev.log.info(f'Member detail filler finished in {diff}s.')
def version_updater(ev):
if ev.bot.cfg.pref.dev_mode:
with open('info/version.yml', 'r') as version_file:
current_version_data = yaml.safe_load(version_file)
beta = current_version_data['beta']
build_date = arrow.utcnow().timestamp
major = current_version_data['version']['major']
minor = current_version_data['version']['minor']
patch = current_version_data['version']['patch'] + 1
codename = current_version_data['codename']
data_out = {
'beta': beta,
'build_date': build_date,
'version': {
'major': major,
'minor': minor,
'patch': patch
},
'codename': codename
}
with open('info/version.yml', 'w') as version_out:
yaml.dump(data_out, version_out, default_flow_style=False)
ev.log.info('Updated Version File.')
def statistics(cmd, message, args):
sigma_image = 'https://i.imgur.com/mGyqMe1.png'
sigma_title = 'Apex Sigma: Statistics'
support_url = 'https://discordapp.com/invite/aEUCHwX'
role_count = 0
for guild in cmd.bot.guilds:
role_count += len(guild.roles)
time_dif = arrow.utcnow().timestamp - cmd.bot.start_time.timestamp
command_rate = str(cmd.bot.command_count / time_dif)[:5]
message_rate = str(cmd.bot.message_count / time_dif)[:5]
pop_text = f'Servers: **{len(cmd.bot.guilds)}**'
pop_text += f'\nChannels: **{len(list(cmd.bot.get_all_channels()))}**'
pop_text += f'\nRoles: **{role_count}**'
pop_text += f'\nMembers: **{len(list(cmd.bot.get_all_members()))}**'
exec_text = f'Commands: **{cmd.bot.command_count}**'
exec_text += f'\nCommand Rate: **{command_rate}/s**'
exec_text += f'\nMessages: **{cmd.bot.message_count}**'
exec_text += f'\nMessage Rate: **{message_rate}/s**'
response = discord.Embed(color=0x1B6F5F, timestamp=cmd.bot.start_time.datetime)
response.set_author(name=sigma_title, icon_url=sigma_image, url=support_url)
response.add_field(name='Population', value=pop_text)
response.add_field(name='Usage', value=exec_text)
response.set_footer(text=f'Tracking since {cmd.bot.start_time.humanize()}')
await message.channel.send(embed=response)
def timeconvert(cmd, message, args):
if args:
conv_input = ' '.join(args).split('>')
if len(conv_input) == 2:
from_pieces = conv_input[0].split()
if len(from_pieces) == 2:
from_time = from_pieces[0]
from_zone = from_pieces[1]
to_zone = conv_input[1]
from_string = f'{arrow.utcnow().format("YYYY-MM-DD")} {from_time}:00'
from_arrow = arrow.get(arrow.get(from_string).datetime, from_zone)
to_arrow = from_arrow.to(to_zone)
time_out = to_arrow.format('YYYY-MM-DD HH:mm:ss (ZZ GMT)')
response = discord.Embed(color=0x696969, title=f'?? {time_out}')
else:
response = discord.Embed(color=0xBE1931, title='? Invalid first argument.')
else:
response = discord.Embed(color=0xBE1931, title='? Invalid input arguments.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
await message.channel.send(embed=response)
def get_count_of_votes_of_user(user, limit_on_today=False):
"""
Returns the count of marked ones of the user
:param user: User
:param limit_on_today: Boolean
:return: Int, Int
"""
if not user:
return 0
db_arg = DBDiscussionSession.query(MarkedArgument).filter(ClickedArgument.author_uid == user.uid)
db_stat = DBDiscussionSession.query(MarkedStatement).filter(ClickedStatement.author_uid == user.uid)
if limit_on_today:
today = arrow.utcnow().to('Europe/Berlin').format('YYYY-MM-DD')
db_arg = db_arg.filter(MarkedArgument.timestamp >= today)
db_stat = db_stat.filter(MarkedStatement.timestamp >= today)
db_arg = db_arg.all()
db_stat = db_stat.all()
return len(db_arg), len(db_stat)
def get_count_of_clicks(user, limit_on_today=False):
"""
Returns the count of clicks of the user
:param user: User
:param limit_on_today: Boolean
:return: Int, Int
"""
if not user:
return 0
db_arg = DBDiscussionSession.query(ClickedArgument).filter(ClickedArgument.author_uid == user.uid)
db_stat = DBDiscussionSession.query(ClickedStatement).filter(ClickedStatement.author_uid == user.uid)
if limit_on_today:
today = arrow.utcnow().to('Europe/Berlin').format('YYYY-MM-DD')
db_arg = db_arg.filter(ClickedArgument.timestamp >= today)
db_stat = db_stat.filter(ClickedStatement.timestamp >= today)
db_arg = db_arg.all()
db_stat = db_stat.all()
return len(db_arg), len(db_stat)
def my_handler(event, context):
utc = arrow.utcnow()
local = utc.to('US/Pacific')
date_to_print = local.humanize()
message = 'Hello world lambda1! at {}'.format(date_to_print)
return {
'message': message
}
def test_rpc(self, container_factory, rabbit_config):
from examples.retry import Service
container = container_factory(Service, rabbit_config)
container.start()
timestamp = arrow.utcnow().replace(seconds=+1)
with ServiceRpcProxy('service', rabbit_config) as service_rpc:
res = service_rpc.method(timestamp.isoformat())
assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
def test_decorated_rpc(self, container_factory, rabbit_config):
from examples.retry import Service
container = container_factory(Service, rabbit_config)
container.start()
timestamp = arrow.utcnow().replace(seconds=+1)
with ServiceRpcProxy('service', rabbit_config) as service_rpc:
res = service_rpc.decorated_method(timestamp.isoformat())
assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
def generate_message(self):
return "Time is {}".format(arrow.utcnow())