def get_date_to_message_statistic(self, message_statistic):
"""
Maps each date between the date of the first message and the date of
the last message, inclusive, to the sum of the values of a message
statistic over all messages from that date.
Args:
message_statistic: A function mapping a Message object to an int or
a float.
Returns:
date_to_message_statistic: A dict mapping a date object between the
date of the first message and the date of the last message to
the sum of the values of message_statistic over all messages in
self.messages from that date.
"""
start_date = self.messages[0].timestamp.date()
end_date = self.messages[-1].timestamp.date()
date_range = [dt.date() for dt in rrule(DAILY, dtstart=start_date, until=end_date)]
date_to_message_statistic = {d: 0 for d in date_range}
for message in self.messages:
date_to_message_statistic[message.timestamp.date()] += message_statistic(message)
return date_to_message_statistic
python类DAILY的实例源码
def testRRuleAll(self):
from dateutil.rrule import rrule
from dateutil.rrule import rruleset
from dateutil.rrule import rrulestr
from dateutil.rrule import YEARLY, MONTHLY, WEEKLY, DAILY
from dateutil.rrule import HOURLY, MINUTELY, SECONDLY
from dateutil.rrule import MO, TU, WE, TH, FR, SA, SU
rr_all = (rrule, rruleset, rrulestr,
YEARLY, MONTHLY, WEEKLY, DAILY,
HOURLY, MINUTELY, SECONDLY,
MO, TU, WE, TH, FR, SA, SU)
for var in rr_all:
self.assertIsNot(var, None)
# In the public interface but not in all
from dateutil.rrule import weekday
self.assertIsNot(weekday, None)
test_notify_supervisors_shorttime.py 文件源码
项目:timed-backend
作者: adfinis-sygroup
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_notify_supervisors(db, mailoutbox):
"""Test time range 2017-7-17 till 2017-7-23."""
start = date(2017, 7, 14)
# supervisee with short time
supervisee = UserFactory.create()
supervisor = UserFactory.create()
supervisee.supervisors.add(supervisor)
EmploymentFactory.create(user=supervisee,
start_date=start,
percentage=100)
workdays = rrule(DAILY, dtstart=start, until=date.today(),
# range is excluding last
byweekday=range(MO.weekday, FR.weekday + 1))
for dt in workdays:
ReportFactory.create(user=supervisee, date=dt,
duration=timedelta(hours=7))
call_command('notify_supervisors_shorttime')
# checks
assert len(mailoutbox) == 1
mail = mailoutbox[0]
assert mail.to == [supervisor.email]
body = mail.body
assert 'Time range: 17.07.2017 - 23.07.2017\nRatio: 0.9' in body
expected = (
'{0} 35.0/42.5 (Ratio 0.82 Delta -7.5 Balance -9.0)'
).format(
supervisee.get_full_name()
)
assert expected in body
def rrule_frequency(self):
compatibiliy_dict = {
'DAILY': DAILY,
'MONTHLY': MONTHLY,
'WEEKLY': WEEKLY,
'YEARLY': YEARLY,
'HOURLY': HOURLY,
'MINUTELY': MINUTELY,
'SECONDLY': SECONDLY
}
return compatibiliy_dict[self.frequency]
def process(expression, start_date, end_date=None):
"""Given a cron expression and a start/end date returns an rrule
Works with "naive" datetime objects.
"""
if start_date.tzinfo or (end_date and end_date.tzinfo):
raise TypeError("Timezones are forbidden in this land.")
arguments = parse_cron(expression)
# as rrule will strip out microseconds, we need to do this hack :)
# we could use .after but that changes the iface
# The idea is, as the cron expresion works at minute level, it is fine to
# set the start time one second after the minute. The key is not to generate
# the current minute.
# Ex: if start time is 05:00.500 you should not generate 05:00
if start_date.second == 0 and start_date.microsecond != 0:
start_date = start_date + dt.timedelta(0, 1)
arguments["dtstart"] = start_date
if end_date:
arguments["until"] = end_date
# TODO: This can be optimized to values bigger than minutely
# by checking if the minutes and hours are provided.
# After hours (rrule.DAILY) it gets trickier as we have multiple
# parameters affecting the recurrence (weekday/ month-day)
return rrule.rrule(rrule.MINUTELY, **arguments)
def landsat_overpass_time(lndst_path_row, start_date, satellite):
delta = timedelta(days=20)
end = start_date + delta
if satellite == 'LT5':
if start_date > datetime(2013, 6, 1):
raise InvalidDateForSatelliteError('The date requested is after L5 deactivation')
reference_time = get_l5_overpass_data(lndst_path_row[0], lndst_path_row[1], start_date)
return reference_time
else:
if satellite == 'LE7':
sat_abv = 'L7'
elif satellite == 'LC8':
sat_abv = 'L8'
base = 'https://landsat.usgs.gov/landsat/all_in_one_pending_acquisition/'
for day in rrule(DAILY, dtstart=start_date, until=end):
tail = '{}/Pend_Acq/y{}/{}/{}.txt'.format(sat_abv, day.year,
day.strftime('%b'),
day.strftime('%b-%d-%Y'))
url = '{}{}'.format(base, tail)
r = requests.get(url).text
for line in r.splitlines():
l = line.split()
try:
if l[0] == str(lndst_path_row[0]):
if l[1] == str(lndst_path_row[1]):
# dtime is in GMT
time_str = '{}-{}'.format(day.year, l[2])
ref_time = datetime.strptime(time_str, '%Y-%j-%H:%M:%S')
return ref_time
except IndexError:
pass
except TypeError:
pass
raise OverpassNotFoundError('Did not find overpass data, check your dates...')
get_all_tracking_stats_for_date_range.py 文件源码
项目:nba_db
作者: dblackrun
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def main():
logging.basicConfig(filename='logs/tracking_stats.log',level=logging.ERROR, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
config=json.loads(open('config.json').read())
command_line_args = sys.argv
dates = utils.validate_dates(command_line_args)
start_date = dates[0]
end_date = dates[1]
username = config['username']
password = config['password']
host = config['host']
database = config['database']
engine = create_engine('mysql://'+username+':'+password+'@'+host+'/'+database)
DBSession = scoped_session(sessionmaker(autoflush=True,autocommit=False,bind=engine))
hustle_stats_queue = Queue()
team_tracking_queue = Queue()
player_tracking_queue = Queue()
passes_made_queue = Queue()
# Create worker threads
for x in range(8):
hustle_stats_worker = sportvu.HustleStatsWorker(hustle_stats_queue, DBSession)
team_tracking_worker = sportvu.TeamStatWorker(team_tracking_queue, DBSession)
player_tracking_worker = sportvu.PlayerStatWorker(player_tracking_queue, DBSession)
passes_made_worker = sportvu.PlayerPassesStatWorker(passes_made_queue, DBSession)
# Setting daemon to True will let the main thread exit even though the workers are blocking
hustle_stats_worker.daemon = True
team_tracking_worker.daemon = True
player_tracking_worker.daemon = True
passes_made_worker.daemon = True
hustle_stats_worker.start()
team_tracking_worker.start()
player_tracking_worker.start()
passes_made_worker.start()
# Put the tasks into the queue as a tuple
for dt in rrule(DAILY, dtstart=start_date, until=end_date):
date = dt.strftime("%m/%d/%Y")
game_team_map, player_team_game_map, daily_game_ids = sportvu.get_player_game_team_maps_and_daily_game_ids(date)
if len(daily_game_ids) > 0:
season = utils.get_season_from_game_id(daily_game_ids[0])
season_type = utils.get_season_type_from_game_id(daily_game_ids[0])
if season_type != None:
# hustle stats begin in 2015-16 playoffs
hustle_stats_queue.put((game_team_map, player_team_game_map, date, season, season_type))
for stat_type in constants.SPORTVU_GAME_LOG_STAT_TYPE_TABLE_MAPS.keys():
team_tracking_queue.put((stat_type, date, season, season_type, game_team_map, constants.SPORTVU_GAME_LOG_STAT_TYPE_TABLE_MAPS[stat_type]['Team']))
player_tracking_queue.put((stat_type, date, season, season_type, player_team_game_map, constants.SPORTVU_GAME_LOG_STAT_TYPE_TABLE_MAPS[stat_type]['Player']))
for player_id in player_team_game_map.keys():
passes_made_queue.put((player_id, date, season, season_type, player_team_game_map))
# Causes the main thread to wait for the queue to finish processing all the tasks
hustle_stats_queue.join()
team_tracking_queue.join()
player_tracking_queue.join()
passes_made_queue.join()
DBSession.remove()
get_tracking_shot_stats_for_date_range.py 文件源码
项目:nba_db
作者: dblackrun
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def main():
logging.basicConfig(filename='logs/tracking_shot_stats.log',level=logging.ERROR, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
config=json.loads(open('config.json').read())
command_line_args = sys.argv
dates = utils.validate_dates(command_line_args)
start_date = dates[0]
end_date = dates[1]
username = config['username']
password = config['password']
host = config['host']
database = config['database']
engine = create_engine('mysql://'+username+':'+password+'@'+host+'/'+database)
DBSession = scoped_session(sessionmaker(autoflush=True,autocommit=False,bind=engine))
# get data
for dt in rrule(DAILY, dtstart=start_date, until=end_date):
date = dt.strftime("%m/%d/%Y")
game_team_map, player_team_game_map, daily_game_ids = sportvu.get_player_game_team_maps_and_daily_game_ids(date)
if len(daily_game_ids) > 0:
team_queue = Queue()
player_queue = Queue()
# Create worker threads
for x in range(5):
team_worker = sportvu.TeamShotWorker(team_queue, DBSession)
player_worker = sportvu.PlayerShotWorker(player_queue, DBSession)
# Setting daemon to True will let the main thread exit even though the workers are blocking
team_worker.daemon = True
player_worker.daemon = True
team_worker.start()
player_worker.start()
# Put the tasks into the queue as a tuple
for close_def_dist_range in constants.CLOSE_DEF_DIST_RANGES:
for shot_clock_range in constants.SHOT_CLOCK_RANGES:
for shot_dist_range in constants.SHOT_DIST_RANGES:
for touch_time_range in constants.TOUCH_TIME_RANGES:
for dribble_range in constants.DRIBBLE_RANGES:
for general_range in constants.GENERAL_RANGES:
team_queue.put((date, daily_game_ids, game_team_map, close_def_dist_range, shot_clock_range, shot_dist_range, touch_time_range, dribble_range, general_range))
player_queue.put((date, daily_game_ids, player_team_game_map, close_def_dist_range, shot_clock_range, shot_dist_range, touch_time_range, dribble_range, general_range))
# Causes the main thread to wait for the queue to finish processing all the tasks
team_queue.join()
player_queue.join()
DBSession.remove()
def main():
logging.basicConfig(filename='logs/games.log',level=logging.ERROR, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
config=json.loads(open('config.json').read())
command_line_args = sys.argv
dates = utils.validate_dates(command_line_args)
start_date = dates[0]
end_date = dates[1]
username = config['username']
password = config['password']
host = config['host']
database = config['database']
engine = create_engine('mysql://'+username+':'+password+'@'+host+'/'+database)
DBSession = scoped_session(sessionmaker(autoflush=True,autocommit=False,bind=engine))
game_ids = []
for dt in rrule(DAILY, dtstart=start_date, until=end_date):
date_response = utils.get_scoreboard_response_for_date(dt.strftime("%m/%d/%Y"))
date_data = utils.get_array_of_dicts_from_response(date_response, constants.SCOREBOARD_DATA_INDEX)
for game_data in date_data:
game_ids.append(game_data['GAME_ID'])
if len(game_ids) > 0:
game_data_queue = Queue()
# Create worker threads
for x in range(8):
game_worker = game.GameWorker(game_data_queue, DBSession)
# Setting daemon to True will let the main thread exit even though the workers are blocking
game_worker.daemon = True
game_worker.start()
# Put the tasks into the queue as a tuple
for game_id in game_ids:
season = utils.get_season_from_game_id(game_id)
season_type = utils.get_season_type_from_game_id(game_id)
if season_type != None:
game_data_queue.put((game_id, season, season_type))
# Causes the main thread to wait for the queue to finish processing all the tasks
game_data_queue.join()
DBSession.remove()