def _get_season_string(self, season_int=None):
"""Season string generator.
Generates this month's season name, or the season name from a given season int.
A season int is defined by anilist as "First 2 numbers are the year (16 is 2016).
Last number is the season starting at 1 (3 is Summer)."
Keyword Args:
season_int (int): An optional season int from anilist for a season.
Returns:
The season (winter, spring, summer, or fall) as a string.
"""
monthsToSeasons = {(1,2,3):'winter', (4,5,6):'spring', (7,8,9):'summer', (10,11,12):'fall'}
seasonIntToSeasons = {1:'winter', 2:'spring', 3:'summer', 4:'fall'}
if season_int:
season = seasonIntToSeasons[int(str(season_int)[2:])]
else:
targetDate = date.today()
season = next(val for key, val in monthsToSeasons.items() if targetDate.month in key)
return season
python类today()的实例源码
def _rashid_tibia(self):
"""Get Rashid's Location"""
current_date = date.today()
if calendar.day_name[current_date.weekday()] == "Sunday":
await self.bot.say("On Sundays you can find him in Carlin depot, one floor above.")
elif calendar.day_name[current_date.weekday()] == "Monday":
await self.bot.say("On Mondays you can find him in Svargrond, in Dankwart's tavern, south of the temple.")
elif calendar.day_name[current_date.weekday()] == "Tuesday":
await self.bot.say("On Tuesdays you can find him in Liberty Bay, in Lyonel's tavern, west of the depot.")
elif calendar.day_name[current_date.weekday()] == "Wednesday":
await self.bot.say("On Wednesdays you can find him in Port Hope, in Clyde's tavern, north of the ship.")
elif calendar.day_name[current_date.weekday()] == "Thursday":
await self.bot.say("On Thursdays you can find him in Ankrahmun, in Arito's tavern, above the post office.")
elif calendar.day_name[current_date.weekday()] == "Friday":
await self.bot.say("On Fridays you can find him in Darashia, in Miraia's tavern, south of the guildhalls.")
elif calendar.day_name[current_date.weekday()] == "Saturday":
await self.bot.say("On Saturdays you can find him in Edron, in Mirabell's tavern, above the depot.")
else:
pass
test_cache_project_stats.py 文件源码
项目:FRG-Crowdsourcing
作者: 97amarnathk
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_stats_hours(self):
"""Test CACHE PROJECT STATS hours works."""
pr = ProjectFactory.create()
task = TaskFactory.create(n_answers=1)
today = datetime.now(pytz.utc)
TaskFactory.create()
TaskRunFactory.create(project=pr, task=task)
AnonymousTaskRunFactory.create(project=pr)
hours, hours_anon, hours_auth, max_hours, \
max_hours_anon, max_hours_auth = stats_hours(pr.id)
assert len(hours) == 24, len(hours)
assert hours[today.strftime('%H')] == 2, hours[today.strftime('%H')]
assert hours_anon[today.strftime('%H')] == 1, hours_anon[today.strftime('%H')]
assert hours_auth[today.strftime('%H')] == 1, hours_auth[today.strftime('%H')]
assert max_hours == 2
assert max_hours_anon == 1
assert max_hours_auth == 1
test_cache_project_stats.py 文件源码
项目:FRG-Crowdsourcing
作者: 97amarnathk
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_stats_hours_with_period(self):
"""Test CACHE PROJECT STATS hours with period works."""
pr = ProjectFactory.create()
today = datetime.now(pytz.utc)
d = date.today() - timedelta(days=6)
task = TaskFactory.create(n_answers=1, created=d)
TaskRunFactory.create(project=pr, task=task, created=d, finish_time=d)
d = date.today() - timedelta(days=16)
AnonymousTaskRunFactory.create(project=pr, created=d, finish_time=d)
hours, hours_anon, hours_auth, max_hours, \
max_hours_anon, max_hours_auth = stats_hours(pr.id)
assert len(hours) == 24, len(hours)
# We use 00 as the timedelta sets the hour to 00
assert hours['00'] == 1, hours[today.strftime('%H')]
assert hours_anon['00'] == 0, hours_anon[today.strftime('%H')]
assert hours_auth['00'] == 1, hours_auth[today.strftime('%H')]
assert max_hours == 1
assert max_hours_anon is None
assert max_hours_auth == 1
def __init__(self, quote, d=date.today().day, m=date.today().month,
y=date.today().year, strike=None, strict=False, source='google'):
quote = quote.upper()
kw = {'d': d, 'm': m, 'y': y, 'strict': strict, 'source': source}
super().__init__(quote, self.__class__.Option_type, **kw)
self.T = (self._expiration - date.today()).days/365
self.q = self.underlying.dy
self.ticker = quote
self.strike = None
self.strikes = tuple(parse(dic['strike']) for dic in self.data
if dic.get('p') != '-')
if strike:
if strike in self.strikes:
self.set_strike(strike)
else:
if strict:
raise LookupError('No options listed for given strike price.')
else:
closest_strike = min(self.strikes, key=lambda x: abs(x - strike))
print('No option for given strike, using %s instead' % closest_strike)
self.set_strike(closest_strike)
def __init__(self, week_range):
self.date_range = date.today() - timedelta(weeks=week_range)
self.user_obj = requests.get('https://www.toggl.com/api/v8/me?with_related_data=true', auth=(API_KEY, 'api_token')).json()
self.projects = [(x['name'], x['id']) for x in self.user_obj['data']['projects']] # comprehension returning list of (name, id) pairs
# get last [week_range] weeks of entries
self.time_entries = requests.get('https://www.toggl.com/api/v8/time_entries?start_date=' + \
str(self.date_range) + 'T00:00:00' + TIMEZONE_ENCODED, auth=(API_KEY, 'api_token')).json()
self.time_log = OrderedDict()
while self.date_range <= date.today():
self.time_log[str(self.date_range)] = LogEntry(self.date_range)
self.date_range = self.date_range + timedelta(days=1)
for entry in self.time_entries:
entry_date = entry['start'].split('T')[0] # split date from time
self.time_log[entry_date].time = self.time_log[entry_date].time + entry['duration']
for entry in self.time_log.values():
entry.toHours() # after iterating through each individual entry (many days having multiple), convert time to hours
def dated_path(obj, file_data):
try:
prefix = getattr(obj, 'model_name')
except BaseException:
prefix = "undefined"
parts = op.splitext(file_data.filename)
rand = random.getrandbits(16)
filename = u"{name}_{rand}{ext}".format(
rand=rand, name=parts[0], ext=parts[1]
)
filename = secure_filename(filename)
today = date.today()
path = u"{prefix}/{t.year}/{t.month}/{filename}".format(
prefix=prefix, t=today, filename=filename
)
return path
def CheckDate(self):
# Check if image is not to old for update (max 30days)
self.CheckDateDone = True
tmpdate = getEnigmaVersionString()
imageDate = date(int(tmpdate[0:4]), int(tmpdate[5:7]), int(tmpdate[8:10]))
datedelay = imageDate + timedelta(days=30)
message = _("Your image is out of date!\n\n"
"After such a long time, there is a risk that your %s %s will not\n"
"boot after online-update, or will show disfunction in running Image.\n\n"
"A new flash will increase the stability\n\n"
"An online update is done at your own risk !!\n\n\n"
"Do you still want to update?") % (getMachineBrand(), getMachineName())
if datedelay > date.today():
self.updating = True
self.activityTimer.start(100, False)
self.ipkg.startCmd(IpkgComponent.CMD_UPGRADE_LIST)
else:
print"[SOFTWAREMANAGER] Your image is to old (%s), you need to flash new !!" %getEnigmaVersionString()
self.session.openWithCallback(self.checkDateCallback, MessageBox, message, default = False)
return
def test_session_detail(self):
# really minimal test
Conference.objects.get_or_create(id=settings.CONFERENCE_ID)
section = Section.objects.create(
conference=current_conference(),
)
schedule = Schedule.objects.create(
section=section,
)
day = Day.objects.create(
schedule=schedule,
date=date.today(),
)
session = Session.objects.create(
day=day,
)
url = reverse("schedule_session_detail", args=(session.pk,))
rsp = self.client.get(url)
self.assertEqual(200, rsp.status_code)
def id(self, state=False):
# ???????????
distcode = self.dist_code
id = distcode['code'] # 6????
id += str(randint(1930, 2017)) # ??????
da = date.today() + timedelta(days=randint(1, 366)) # ??????
id += da.strftime('%m%d') # ??????
id += str(randint(100, 999)) # ????
count = 0
weight = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2] # ???
checkcode = {'0': '1', '1': '0', '2': 'X', '3': '9', '4': '8', '5': '7', '6': '6', '7': '5', '8': '5', '9': '3',
'10': '2'} # ???????
for i in range(0, len(id)):
count = count + int(id[i]) * weight[i]
id += checkcode[str(count % 11)] # ???????
if state:
return {'state': distcode['state'], 'id': id}
else:
return id
def update_or_create_token(token, vegetariano):
"""
Registra device token ou atualiza os seus parametros "last_used" e/ou "vegetariano".
:param token: token a ser registrado ou atualizado.
:param vegetariano: preferencia de cardapio do usuario.
:return: True caso nao haja erros durante o processo.
"""
new_dict = {"last_used": date.today().strftime("%y-%m-%d"), "vegetariano": vegetariano }
db = setup_firebase()
db.child('tokens').child(token).set(new_dict)
print("Device token {} registrado com sucesso.".format(token))
return True
def main():
today = datetime.utcnow()
hour = today.hour
# horario em UTC! horario Brasil = UTC - 3h (talvez diferente em horario de verao).
if hour >= 13 and hour <= 15:
refeicao = "almoço"
elif hour >= 19 and hour <= 21:
refeicao = "jantar"
else:
print("Tentativa de envio de notificação em horário impropio.")
return
mandar_proxima_refeicao(refeicao)
def next_weekdays(next, start_date_string = date.today().strftime("%Y-%m-%d")):
start_date = datetime.strptime(start_date_string, URL_DATE_FORMAT)
date_strings = []
counter = 0
while(len(date_strings) < next):
d = start_date + timedelta(days=counter)
if(d.weekday() in range(0, 5)):
date_strings.append(d.strftime(URL_DATE_FORMAT))
counter += 1
return date_strings
def save_today_new_articles(self, sub_class, name, new_dict):
"""
:param name: site name
:param new_dict: article dict of today
:return: article dict eliminated articles of yesterday
"""
new_key = self.generate_updated_key(sub_class, name)
old_key = self.generate_old_key(sub_class, name)
if self.client.exists(old_key):
old_dict = eval(self.client.get(old_key).decode('utf-8'))
for key in old_dict.keys():
try:
new_dict.pop(key)
except:
util_logger.info('(?•??•?)? ? | {team} has updated article ${key}% today!'.format(
team=name, key=key
))
continue
self.client.set(new_key, new_dict, ex=timedelta(days=10))
else:
self.client.set(new_key, new_dict, ex=timedelta(days=10))
def msgDia():
inst_keyboard = keyboard()
msg = ['Coders', 'Jedi\'s', 'Programeiros']
hj = date.today().weekday()
#podem mudar as frases. Não sou tão criativo. ^^
if hj == 0:
hoje = ' Força que hoje é só o primeiro dia da semana!'
elif hj == 2:
hoje = ' Estamos no meio da semana. Tycot deseja-lhes muita sabedoria e paciência'
elif hj == 4:
hoje = ' Hoje é sexta! Não façam besteira ou vão perder o FDS!'
#infelizmente, como está fora do handle, não há como pegar o ID do grupo. Se alguém souber, fique a vontade.
bot.sendMessage(-1001068576090, parse_mode='HTML',
text='<i>Bom dia {}!{}</i>'.format(msg[random.randint(0,len(msg)-1)], hoje), reply_markup = inst_keyboard.keyboard_sugestao())
bot.sendVideo(-1001068576090,
'https://media.giphy.com/media/W4IY7zQdRh7Ow/giphy.gif')
def newhm_submit(club):
'''Input HongMei plan into databse'''
contents = request.form['contents']
try:
actdate = date(int(request.form['year']),
int(request.form['month']),
int(request.form['day']))
except ValueError:
fail('Please input valid date to submit.', 'newhm')
if contents == '' or actdate < date.today():
fail('Please input contents or correct date to submit.', 'newhm')
if form_is_valid():
a = Activity.new()
a.name = contents
a.club = club
a.description = FormattedText.emptytext()
a.date = actdate
a.time = ActivityTime.HONGMEI
a.location = 'HongMei Elementary School'
a.cas = 1
a.post = FormattedText.emptytext()
a.selections = []
a.create()
return redirect(url_for('.newhm', club=club.callsign))
def test_model_to_json(self):
swagger_dict = yaml.load(Tests.yaml_complex_model)
spec = ApiSpec(swagger_dict)
Foo = spec.definitions['Foo']
Bar = spec.definitions['Bar']
f = Foo(
token='abcd',
bar=Bar(
a=1,
b=date.today()
)
)
print("foo: " + pprint.pformat(f))
j = spec.model_to_json(f)
self.assertDictEqual(j, {
'token': 'abcd',
'bar': {
'a': 1,
'b': date.today().isoformat()
}
})
def test_json_to_model(self):
swagger_dict = yaml.load(Tests.yaml_complex_model)
spec = ApiSpec(swagger_dict)
j = {
'token': 'abcd',
'bar': {
'a': 1,
'b': date.today().isoformat()
}
}
m = spec.json_to_model('Foo', j)
Foo = spec.definitions['Foo']
Bar = spec.definitions['Bar']
self.assertEqual(m.token, 'abcd')
b = m.bar
self.assertEqual(b.__class__.__name__, 'Bar')
self.assertEqual(b.a, 1)
self.assertEqual(str(b.b), str(date.today()) + " 00:00:00")
def test_nested_context_manager():
dt1 = date(2016, 1, 1)
dt2 = date(2014, 10, 12)
assert date.today() != dt1
assert date.today() != dt2
with immobilus('2016-01-01'):
assert date.today() == dt1
with immobilus('2014-10-12'):
assert date.today() == dt2
assert date.today() == dt1
assert date.today() != dt1
assert date.today() != dt2
def test_simple(self):
values = [
'foobar',
['foobar'],
PersistentList(['foobar']),
('foobar',),
frozenset(['foobar']),
set(['foobar']),
{'foo': 'bar'},
PersistentMapping({'foo': 'bar'}),
datetime.utcnow(),
date.today(),
time()
]
for value in values:
getAdapter(value, interface=IValueToJson)
def get_date(self, instance):
user = instance.id
today = date.today()
if instance.date is not None:
return instance.date
# calculate last reported day if no specific date is set
max_absence_date = Absence.objects.filter(
user=user, date__lt=today).aggregate(date=Max('date'))
max_report_date = Report.objects.filter(
user=user, date__lt=today).aggregate(date=Max('date'))
last_reported_date = max(
max_absence_date['date'] or date.min,
max_report_date['date'] or date.min
)
instance.date = last_reported_date
return instance.date
def heatmap_to_twitter():
try:
now = date.today()
d = now.day
if d == 2:
api = connect()
for broker in Brokers.objects.all():
image_filename = join(settings.STATIC_ROOT, 'collector', 'images', \
'heatmap', '{0}=={1}=={2}=={3}=={4}.png'.format(broker.slug, \
'AI50', '1440', 'AI50', 'longs'))
if isfile(image_filename):
media = "https://quantrade.co.uk/static/collector/images/heatmap/{0}=={1}=={2}=={3}=={4}.png".\
format(broker.slug, 'AI50', '1440', 'AI50', 'longs')
else:
media = None
status = "Results including last month index performance for {}.".format(broker.title)
api.PostUpdate(status=status, media=media)
print(colored.green("Heatmap posted."))
except Exception as e:
print(colored.red("At heatmap_to_twitter {}".format(e)))
def gen_time_data(df):
t = {}
now = date.today()
t["ye"] = now.year
t["mo"] = now.month
t["to_day"] = now.day
t["dow"] = now.weekday()
t["prev_day"] = await get_prev_day(d=t["to_day"], mo=t["mo"])
t["prev_mo"] = await get_prev_mo(mo=t["mo"])
t["end_prev_day"] = [30, 31]
df['ts'] = df.index
df['ts'] = to_datetime(df['ts'])
t["df_year"] = df['ts'].ix[-1].to_pydatetime().year
t["df_month"] = df['ts'].ix[-1].to_pydatetime().month
t["df_day"] = df['ts'].ix[-1].to_pydatetime().day
t["df_weekday"] = df['ts'].ix[-1].to_pydatetime().weekday()
return t, df
def get_day(cls, day):
if day in ("aujourd'hui", "today"):
return date.today()
if day in ("demain", "tomorrow"):
return date.today() + timedelta(days=1)
if day in cls.WEEK_DAY:
iterdate = date.today()
while True:
if iterdate.weekday() == cls.WEEK_DAY[day]:
return iterdate
iterdate += timedelta(days=1)
return None
def connect_and_sync_data(self):
self._connect()
self.on_log('??????')
if self._data_update_date != date.today():
self._qry_instrument()
self._qry_account()
self._qry_position()
self._qry_order()
self._data_update_date = date.today()
self._qry_commission()
self._subscribe_all()
self.on_log('???????')
self._env.event_bus.add_listener(EVENT.POST_UNIVERSE_CHANGED, self.on_universe_changed)
def on_trade(self, trade_dict):
self.on_debug('????: %s' % str(trade_dict))
if self._data_update_date != date.today():
self._cache.cache_trade(trade_dict)
else:
account = Environment.get_instance().get_account(trade_dict.order_book_id)
if trade_dict.trade_id in account._backward_trade_set:
return
try:
order = self.order_objects[trade_dict.order_id]
except KeyError:
order = Order.__from_create__(trade_dict.order_book_id,
trade_dict.amount, trade_dict.side, trade_dict.style,
trade_dict.position_effect)
commission = cal_commission(trade_dict, order.position_effect)
trade = Trade.__from_create__(
trade_dict.order_id, trade_dict.price, trade_dict.amount,
trade_dict.side, trade_dict.position_effect, trade_dict.order_book_id, trade_id=trade_dict.trade_id,
commission=commission, frozen_price=trade_dict.price)
order.fill(trade)
self._env.event_bus.publish_event(RqEvent(EVENT.TRADE, account=account, trade=trade))
def can_issue_daily(self):
"""
``Report`` can issue ``DailyReport`` if and only if
- occurs today ( hence ``get_daily`` ),
- daily hasn't been issued yet for day,
- members list is not empty,
- questions list is not empty.
:return: whether daily report can be generated
:rtype: bool
"""
already_issued = self.dailyreport_set.filter(date=date.today()).exists()
group_not_empty = self.team.users.exists()
questions_not_empty = self.question_set.filter(active=True).exists()
return all([self.occurs_today,
group_not_empty,
questions_not_empty,
self.survey_send_time <= now().time(),
not already_issued])
def etl_job(sc, sqlContext, submission_date=None, save=True):
s3_path = 's3n://telemetry-parquet/harter/privacy_prefs_shield/v2'
if submission_date is None:
submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d")
pings = Dataset.from_source(
"telemetry"
).where(
docType="shield-study",
submissionDate=submission_date,
appName="Firefox",
).records(sc)
transformed_event_pings = transform_event_pings(sqlContext, pings)
transformed_state_pings = transform_state_pings(sqlContext, pings)
transformed_pings = transformed_event_pings.union(transformed_state_pings)
if save:
path = s3_path + '/submission_date={}'.format(submission_date)
transformed_pings.repartition(1).write.mode('overwrite').parquet(path)
return transformed_pings
def shield_etl_boilerplate(transform_func, s3_path):
def etl_job(sc, sqlContext, submission_date=None, save=True):
if submission_date is None:
submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d")
pings = Dataset.from_source(
"telemetry"
).where(
docType="shield-study",
submissionDate=submission_date,
appName="Firefox",
).records(sc)
transformed_pings = transform_func(sqlContext, pings)
if save:
path = s3_path + '/submission_date={}'.format(submission_date)
transformed_pings.repartition(1).write.mode('overwrite').parquet(path)
return transformed_pings
return etl_job
def testpilot_etl_boilerplate(transform_func, s3_path):
def etl_job(sc, sqlContext, submission_date=None, save=True):
if submission_date is None:
submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d")
pings = Dataset.from_source(
"telemetry"
).where(
docType="testpilottest",
submissionDate=submission_date,
appName="Firefox",
).records(sc)
transformed_pings = transform_func(sqlContext, pings)
if save:
# path = 's3://telemetry-parquet/testpilot/txp_pulse/v1/submission_date={}'
path = s3_path + '/submission_date={}'.format(submission_date)
transformed_pings.repartition(1).write.mode('overwrite').parquet(path)
return transformed_pings
return etl_job