python类rrule()的实例源码

conversation.py 文件源码 项目:facebook-message-analysis 作者: szheng17 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_date_to_message_statistic(self, message_statistic):
        """
        Maps each date between the date of the first message and the date of
        the last message, inclusive, to the sum of the values of a message
        statistic over all messages from that date. 

        Args:
            message_statistic: A function mapping a Message object to an int or
                a float.

        Returns:
            date_to_message_statistic: A dict mapping a date object between the
                date of the first message and the date of the last message to
                the sum of the values of message_statistic over all messages in
                self.messages from that date.
        """
        start_date = self.messages[0].timestamp.date()
        end_date = self.messages[-1].timestamp.date()
        date_range = [dt.date() for dt in rrule(DAILY, dtstart=start_date, until=end_date)]
        date_to_message_statistic = {d: 0 for d in date_range}
        for message in self.messages:
            date_to_message_statistic[message.timestamp.date()] += message_statistic(message)
        return date_to_message_statistic
tradingcalendar_bmf.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_early_closes(start, end):
    # TSX closed at 1:00 PM on december 24th.

    start = canonicalize_datetime(start)
    end = canonicalize_datetime(end)

    early_close_rules = []

    early_close_rules.append(quarta_cinzas)

    early_close_ruleset = rrule.rruleset()

    for rule in early_close_rules:
        early_close_ruleset.rrule(rule)
    early_closes = early_close_ruleset.between(start, end, inc=True)

    early_closes.sort()
    return pd.DatetimeIndex(early_closes)
test_imports.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testRRuleAll(self):
        from dateutil.rrule import rrule
        from dateutil.rrule import rruleset
        from dateutil.rrule import rrulestr
        from dateutil.rrule import YEARLY, MONTHLY, WEEKLY, DAILY
        from dateutil.rrule import HOURLY, MINUTELY, SECONDLY
        from dateutil.rrule import MO, TU, WE, TH, FR, SA, SU

        rr_all = (rrule, rruleset, rrulestr,
                  YEARLY, MONTHLY, WEEKLY, DAILY,
                  HOURLY, MINUTELY, SECONDLY,
                  MO, TU, WE, TH, FR, SA, SU)

        for var in rr_all:
            self.assertIsNot(var, None)

        # In the public interface but not in all
        from dateutil.rrule import weekday
        self.assertIsNot(weekday, None)
tzcron.py 文件源码 项目:tzcron 作者: bloomberg 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __next__(self):
        """
        Returns the next occurrence or raises StopIteration
        This method adds some extra validation for the returned
        iteration that are not natively handled by rrule
        """
        while True:
            next_it = next(self.__rrule_iterator)
            next_it = self.t_zone.localize(next_it, is_dst=None)

            if not all([filt(next_it) for filt in self.filters]):
                continue

            return next_it


# Private helpers
tzcron.py 文件源码 项目:tzcron 作者: bloomberg 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def parse_cron(expression):
    """parses a cron expression into a dict"""
    try:
        minute, hour, monthday, month, weekday, _ = expression.split(' ')
    except ValueError:
        raise InvalidExpression("Invalid number of items in expression: {}"
                                .format(expression))
    result = dict()
    result["bysecond"] = [0]
    if minute != "*":
        result["byminute"] = MinuteParser.parse(minute)
    if hour != "*":
        result["byhour"] = HourParser.parse(hour)
    if monthday != "*":
        result["bymonthday"] = MonthDayParser.parse(monthday)
    if month != "*":
        result["bymonth"] = MonthParser.parse(month)
    if weekday != "*":
        # rrule uses 0 to 6 for monday to sunday
        result["byweekday"] = [d - 1 for d in WeekDayParser.parse(weekday)]

    return result
icalendar.py 文件源码 项目:python-card-me 作者: tBaxter 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def gettzinfo(self):
        # workaround for dateutil failing to parse some experimental properties
        good_lines = ('rdate', 'rrule', 'dtstart', 'tzname', 'tzoffsetfrom',
                      'tzoffsetto', 'tzid')
        # serialize encodes as utf-8, cStringIO will leave utf-8 alone
        buffer = six.StringIO()
        # allow empty VTIMEZONEs
        if len(self.contents) == 0:
            return None

        def customSerialize(obj):
            if isinstance(obj, Component):
                foldOneLine(buffer, u"BEGIN:" + obj.name)
                for child in obj.lines():
                    if child.name.lower() in good_lines:
                        child.serialize(buffer, 75, validate=False)
                for comp in obj.components():
                    customSerialize(comp)
                foldOneLine(buffer, u"END:" + obj.name)
        customSerialize(self)
        buffer.seek(0)  # tzical wants to read a stream
        return tz.tzical(buffer).get()
utils.py 文件源码 项目:p2-odoo 作者: Pdosplusplus 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def workDays(start_date):

    feriados= 5, 6

    laborales = [dia for dia in range(7) if dia not in feriados]

    end_date = datetime.now().date()

    date_ini = datetime.strptime(str(start_date), FORMA_DATE)

    date_fin = datetime.strptime(str(end_date), FORMA_DATE)

    totalDias= rrule.rrule(rrule.DAILY,
                        dtstart=date_ini, 
                        until=date_fin, 
                        byweekday=laborales)

    return totalDias.count()
utils.py 文件源码 项目:p2-odoo 作者: Pdosplusplus 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def workDays(start_date, end_date):

    feriados= 5, 6

    laborales = [dia for dia in range(7) if dia not in feriados]

    date_ini = datetime.strptime(str(start_date), FORMA_DATE)

    date_fin = datetime.strptime(str(end_date), FORMA_DATE)

    totalDias= rrule.rrule(rrule.DAILY,
                        dtstart=date_ini, 
                        until=date_fin, 
                        byweekday=laborales)

    return totalDias.count()
icalendar.py 文件源码 项目:vobject 作者: eventable 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def gettzinfo(self):
        # workaround for dateutil failing to parse some experimental properties
        good_lines = ('rdate', 'rrule', 'dtstart', 'tzname', 'tzoffsetfrom',
                      'tzoffsetto', 'tzid')
        # serialize encodes as utf-8, cStringIO will leave utf-8 alone
        buffer = six.StringIO()
        # allow empty VTIMEZONEs
        if len(self.contents) == 0:
            return None
        def customSerialize(obj):
            if isinstance(obj, Component):
                foldOneLine(buffer, u"BEGIN:" + obj.name)
                for child in obj.lines():
                    if child.name.lower() in good_lines:
                        child.serialize(buffer, 75, validate=False)
                for comp in obj.components():
                    customSerialize(comp)
                foldOneLine(buffer, u"END:" + obj.name)
        customSerialize(self)
        buffer.seek(0)  # tzical wants to read a stream
        return tz.tzical(buffer).get()
icalendar.py 文件源码 项目:vobject 作者: eventable 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def gettzinfo(self):
        # workaround for dateutil failing to parse some experimental properties
        good_lines = ('rdate', 'rrule', 'dtstart', 'tzname', 'tzoffsetfrom',
                      'tzoffsetto', 'tzid')
        # serialize encodes as utf-8, cStringIO will leave utf-8 alone
        buffer = six.StringIO()
        # allow empty VTIMEZONEs
        if len(self.contents) == 0:
            return None

        def customSerialize(obj):
            if isinstance(obj, Component):
                foldOneLine(buffer, u"BEGIN:" + obj.name)
                for child in obj.lines():
                    if child.name.lower() in good_lines:
                        child.serialize(buffer, 75, validate=False)
                for comp in obj.components():
                    customSerialize(comp)
                foldOneLine(buffer, u"END:" + obj.name)
        customSerialize(self)
        buffer.seek(0)  # tzical wants to read a stream
        return tz.tzical(buffer).get()
tests.py 文件源码 项目:vobject 作者: eventable 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_timezone_serializing():
        """
        Serializing with timezones test
        """
        tzs = dateutil.tz.tzical("test_files/timezones.ics")
        pacific = tzs.get('US/Pacific')
        cal = base.Component('VCALENDAR')
        cal.setBehavior(icalendar.VCalendar2_0)
        ev = cal.add('vevent')
        ev.add('dtstart').value = datetime.datetime(2005, 10, 12, 9,
                                                    tzinfo=pacific)
        evruleset = rruleset()
        evruleset.rrule(rrule(WEEKLY, interval=2, byweekday=[2,4],
                              until=datetime.datetime(2005, 12, 15, 9)))
        evruleset.rrule(rrule(MONTHLY, bymonthday=[-1,-5]))
        evruleset.exdate(datetime.datetime(2005, 10, 14, 9, tzinfo=pacific))
        ev.rruleset = evruleset
        ev.add('duration').value = datetime.timedelta(hours=1)

        apple = tzs.get('America/Montreal')
        ev.dtstart.value = datetime.datetime(2005, 10, 12, 9, tzinfo=apple)
dates.py 文件源码 项目:agdc_statistics 作者: GeoscienceAustralia 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def date_sequence(start, end, stats_duration, step_size):
    """
    Generate a sequence of time span tuples

    :seealso:
        Refer to `dateutil.parser.parse` for details on date parsing.

    :param str start: Start date of first interval
    :param str end: End date. The end of the last time span may extend past this date.
    :param str stats_duration: What period of time should be grouped
    :param str step_size: How far apart should the start dates be
    :return: sequence of (start_date, end_date) tuples
    """
    step_size, freq = parse_interval(step_size)
    stats_duration = parse_duration(stats_duration)
    for start_date in rrule(freq, interval=step_size, dtstart=start, until=end):
        end_date = start_date + stats_duration
        if end_date <= end:
            yield start_date, start_date + stats_duration
conversation.py 文件源码 项目:facebook-message-analysis 作者: szheng17 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_month_to_message_statistic(self, message_statistic):
        """
        Maps each month between the month of the first message and the month of
        the last message, inclusive, to the sum of the values of a message
        statistic over all messages from that month. 

        Args:
            message_statistic: A function mapping a Message object to an int or
                a float.

        Returns:
            month_to_message_statistic: A dict mapping a string of the form
                'YYYY-MM' representing a month between the month of the first
                message and the month of the last message to the sum of the
                values of message_statistic over all messages in self.messages
                from that month.
        """
        start_dt = self.messages[0].timestamp
        end_dt = self.messages[-1].timestamp
        start_date_month_start = datetime(start_dt.year, start_dt.month, 1)
        end_date_month_start = datetime(end_dt.year, end_dt.month, 1)
        dt_month_range = rrule(MONTHLY, dtstart=start_date_month_start,
                               until=end_date_month_start)
        month_range = [dt.date() for dt in dt_month_range]
        month_to_message_statistic = {dt.strftime(self.MONTH_FORMAT): 0 for dt in month_range}
        for message in self.messages:
            month_str = message.timestamp.strftime(self.MONTH_FORMAT)
            month_to_message_statistic[month_str] += message_statistic(message)
        return month_to_message_statistic
tradingcalendar_tse.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_early_closes(start, end):
    # TSX closed at 1:00 PM on december 24th.

    start = canonicalize_datetime(start)
    end = canonicalize_datetime(end)

    start = max(start, datetime(1993, 1, 1, tzinfo=pytz.utc))
    end = max(end, datetime(1993, 1, 1, tzinfo=pytz.utc))

    # Not included here are early closes prior to 1993
    # or unplanned early closes

    early_close_rules = []

    christmas_eve = rrule.rrule(
        rrule.MONTHLY,
        bymonth=12,
        bymonthday=24,
        byweekday=(rrule.MO, rrule.TU, rrule.WE, rrule.TH, rrule.FR),
        cache=True,
        dtstart=start,
        until=end
    )
    early_close_rules.append(christmas_eve)

    early_close_ruleset = rrule.rruleset()

    for rule in early_close_rules:
        early_close_ruleset.rrule(rule)
    early_closes = early_close_ruleset.between(start, end, inc=True)

    early_closes.sort()
    return pd.DatetimeIndex(early_closes)
test_notify_supervisors_shorttime.py 文件源码 项目:timed-backend 作者: adfinis-sygroup 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_notify_supervisors(db, mailoutbox):
    """Test time range 2017-7-17 till 2017-7-23."""
    start = date(2017, 7, 14)
    # supervisee with short time
    supervisee = UserFactory.create()
    supervisor = UserFactory.create()
    supervisee.supervisors.add(supervisor)

    EmploymentFactory.create(user=supervisee,
                             start_date=start,
                             percentage=100)
    workdays = rrule(DAILY, dtstart=start, until=date.today(),
                     # range is excluding last
                     byweekday=range(MO.weekday, FR.weekday + 1))
    for dt in workdays:
        ReportFactory.create(user=supervisee, date=dt,
                             duration=timedelta(hours=7))

    call_command('notify_supervisors_shorttime')

    # checks
    assert len(mailoutbox) == 1
    mail = mailoutbox[0]
    assert mail.to == [supervisor.email]
    body = mail.body
    assert 'Time range: 17.07.2017 - 23.07.2017\nRatio: 0.9' in body
    expected = (
        '{0} 35.0/42.5 (Ratio 0.82 Delta -7.5 Balance -9.0)'
    ).format(
        supervisee.get_full_name()
    )
    assert expected in body
events.py 文件源码 项目:wagtail_room_booking 作者: Tamriel 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_rrule_object(self):
        if self.rule is not None:
            params = self.rule.get_params()
            frequency = self.rule.rrule_frequency()
            return rrule.rrule(frequency, dtstart=self.start, **params)
tzcron.py 文件源码 项目:tzcron 作者: bloomberg 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process(expression, start_date, end_date=None):
    """Given a cron expression and a start/end date returns an rrule
    Works with "naive" datetime objects.
    """
    if start_date.tzinfo or (end_date and end_date.tzinfo):
        raise TypeError("Timezones are forbidden in this land.")

    arguments = parse_cron(expression)

    # as rrule will strip out microseconds, we need to do this hack :)
    # we could use .after but that changes the iface
    # The idea is, as the cron expresion works at minute level, it is fine to
    # set the start time one second after the minute. The key is not to generate
    # the current minute.
    # Ex: if start time is 05:00.500 you should not generate 05:00
    if start_date.second == 0 and start_date.microsecond != 0:
        start_date = start_date + dt.timedelta(0, 1)

    arguments["dtstart"] = start_date
    if end_date:
        arguments["until"] = end_date

    # TODO: This can be optimized to values bigger than minutely
    # by checking if the minutes and hours are provided.
    # After hours (rrule.DAILY) it gets trickier as we have multiple
    # parameters affecting the recurrence (weekday/ month-day)
    return rrule.rrule(rrule.MINUTELY, **arguments)
_performance.py 文件源码 项目:bigfishtrader 作者: xingetouzi 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _workdays(start, end, day_off=None):
    if day_off is None:
        day_off = 5, 6
    workdays = [x for x in range(7) if x not in day_off]
    days = rrule.rrule(rrule.DAILY, dtstart=start, until=end, byweekday=workdays)
    return days.count()
tests.py 文件源码 项目:python-card-me 作者: tBaxter 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_importing(self):
        cal = get_test_file("standard_test.ics")
        c = base.readOne(cal, validate=True)
        self.assertEqual(
            str(c.vevent.valarm.trigger),
            "<TRIGGER{}-1 day, 0:00:00>"
        )

        self.assertEqual(
            str(c.vevent.dtstart.value),
            "2002-10-28 14:00:00-08:00"
        )
        self.assertTrue(
            isinstance(c.vevent.dtstart.value, datetime.datetime)
        )
        self.assertEqual(
            str(c.vevent.dtend.value),
            "2002-10-28 15:00:00-08:00"
        )
        self.assertTrue(
            isinstance(c.vevent.dtend.value, datetime.datetime)
        )
        self.assertEqual(
            c.vevent.dtstamp.value,
            datetime.datetime(2002, 10, 28, 1, 17, 6, tzinfo=tzutc())
        )

        vevent = c.vevent.transformFromNative()
        self.assertEqual(
            str(vevent.rrule),
            "<RRULE{}FREQ=Weekly;COUNT=10>"
        )
tests.py 文件源码 项目:python-card-me 作者: tBaxter 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_recurring_component(self):
        vevent = RecurringComponent(name='VEVENT')

        # init
        self.assertTrue(vevent.isNative)

        # rruleset should be None at this point.
        # No rules have been passed or created.
        self.assertEqual(vevent.rruleset, None)

        # Now add start and rule for recurring event
        vevent.add('dtstart').value = datetime.datetime(2005, 1, 19, 9)
        vevent.add('rrule').value =u"FREQ=WEEKLY;COUNT=2;INTERVAL=2;BYDAY=TU,TH"
        self.assertEqual(
            list(vevent.rruleset),
            [datetime.datetime(2005, 1, 20, 9, 0), datetime.datetime(2005, 2, 1, 9, 0)]
        )
        self.assertEqual(
            list(vevent.getrruleset(addRDate=True)),
            [datetime.datetime(2005, 1, 19, 9, 0), datetime.datetime(2005, 1, 20, 9, 0)]
        )

        # Also note that dateutil will expand all-day events (datetime.date values)
        # to datetime.datetime value with time 0 and no timezone.
        vevent.dtstart.value = datetime.date(2005,3,18)
        self.assertEqual(
            list(vevent.rruleset),
            [datetime.datetime(2005, 3, 29, 0, 0), datetime.datetime(2005, 3, 31, 0, 0)]
        )
        self.assertEqual(
            list(vevent.getrruleset(True)),
            [datetime.datetime(2005, 3, 18, 0, 0), datetime.datetime(2005, 3, 29, 0, 0)]
        )
models.py 文件源码 项目:Odin 作者: HackSoftware 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def duration_in_weeks(self):
        weeks = rrule.rrule(
            rrule.WEEKLY,
            dtstart=self.start_date,
            until=self.end_date
        )
        return weeks.count()
utils.py 文件源码 项目:p2-odoo 作者: Pdosplusplus 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def diff_days():

    start_date = datetime.now().date()

    end_date = "2018-03-10"

    date_ini = datetime.strptime(str(start_date), FORMA_DATE)

    date_fin = datetime.strptime(str(end_date), FORMA_DATE)

    totalDias= rrule.rrule(rrule.DAILY,
                        dtstart=date_ini, 
                        until=date_fin)

    return totalDias.count()
tournaments.py 文件源码 项目:Penny-Dreadful-Tools 作者: PennyDreadfulMTG 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_all_next_tournament_dates(start, index=0):
    until = start + timedelta(days=7)
    pdsat_time = ['Saturday', rrule.rrule(rrule.WEEKLY, byhour=13, byminute=30, bysecond=0, dtstart=start, until=until, byweekday=rrule.SA)[index]]
    pds_time = ['Sunday', rrule.rrule(rrule.WEEKLY, byhour=13, byminute=30, bysecond=0, dtstart=start, until=until, byweekday=rrule.SU)[index]]
    pdm_time = ['Monday', rrule.rrule(rrule.WEEKLY, byhour=19, byminute=0, bysecond=0, dtstart=start, until=until, byweekday=rrule.MO)[index]]
    pdt_time = ['Thursday', rrule.rrule(rrule.WEEKLY, byhour=19, byminute=0, bysecond=0, dtstart=start, until=until, byweekday=rrule.TH)[index]]
    return [pdsat_time, pds_time, pdm_time, pdt_time]
tests.py 文件源码 项目:vobject 作者: eventable 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_importing(self):
        """
        Test importing ics
        """
        cal = get_test_file("standard_test.ics")
        c = base.readOne(cal, validate=True)
        self.assertEqual(
            str(c.vevent.valarm.trigger),
            "<TRIGGER{}-1 day, 0:00:00>"
        )

        self.assertEqual(
            str(c.vevent.dtstart.value),
            "2002-10-28 14:00:00-08:00"
        )
        self.assertTrue(
            isinstance(c.vevent.dtstart.value, datetime.datetime)
        )
        self.assertEqual(
            str(c.vevent.dtend.value),
            "2002-10-28 15:00:00-08:00"
        )
        self.assertTrue(
            isinstance(c.vevent.dtend.value, datetime.datetime)
        )
        self.assertEqual(
            c.vevent.dtstamp.value,
            datetime.datetime(2002, 10, 28, 1, 17, 6, tzinfo=tzutc())
        )

        vevent = c.vevent.transformFromNative()
        self.assertEqual(
            str(vevent.rrule),
            "<RRULE{}FREQ=Weekly;COUNT=10>"
        )
tests.py 文件源码 项目:vobject 作者: eventable 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_recurring_component(self):
        """
        Test recurring events
        """
        vevent = RecurringComponent(name='VEVENT')

        # init
        self.assertTrue(vevent.isNative)

        # rruleset should be None at this point.
        # No rules have been passed or created.
        self.assertEqual(vevent.rruleset, None)

        # Now add start and rule for recurring event
        vevent.add('dtstart').value = datetime.datetime(2005, 1, 19, 9)
        vevent.add('rrule').value =u"FREQ=WEEKLY;COUNT=2;INTERVAL=2;BYDAY=TU,TH"
        self.assertEqual(
            list(vevent.rruleset),
            [datetime.datetime(2005, 1, 20, 9, 0), datetime.datetime(2005, 2, 1, 9, 0)]
        )
        self.assertEqual(
            list(vevent.getrruleset(addRDate=True)),
            [datetime.datetime(2005, 1, 19, 9, 0), datetime.datetime(2005, 1, 20, 9, 0)]
        )

        # Also note that dateutil will expand all-day events (datetime.date values)
        # to datetime.datetime value with time 0 and no timezone.
        vevent.dtstart.value = datetime.date(2005,3,18)
        self.assertEqual(
            list(vevent.rruleset),
            [datetime.datetime(2005, 3, 29, 0, 0), datetime.datetime(2005, 3, 31, 0, 0)]
        )
        self.assertEqual(
            list(vevent.getrruleset(True)),
            [datetime.datetime(2005, 3, 18, 0, 0), datetime.datetime(2005, 3, 29, 0, 0)]
        )
calendar_ope.py 文件源码 项目:python_utils 作者: Jayhello 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_all_day_v1():
    from datetime import datetime
    d1 = '20171030'
    d2 = '20171102'
    for dt in rrule.rrule(rrule.DAILY,
                          dtstart=datetime.strptime(d1, '%Y%m%d'),
                          until=datetime.strptime(d2, '%Y%m%d')):
        print dt.strftime('%Y%m%d')

    # 20171030 20171031 20171101 20171102
web_tools.py 文件源码 项目:Landsat578 作者: dgketchum 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def landsat_overpass_time(lndst_path_row, start_date, satellite):

    delta = timedelta(days=20)
    end = start_date + delta

    if satellite == 'LT5':

        if start_date > datetime(2013, 6, 1):
            raise InvalidDateForSatelliteError('The date requested is after L5 deactivation')

        reference_time = get_l5_overpass_data(lndst_path_row[0], lndst_path_row[1], start_date)
        return reference_time

    else:
        if satellite == 'LE7':
            sat_abv = 'L7'
        elif satellite == 'LC8':
            sat_abv = 'L8'

        base = 'https://landsat.usgs.gov/landsat/all_in_one_pending_acquisition/'
        for day in rrule(DAILY, dtstart=start_date, until=end):

            tail = '{}/Pend_Acq/y{}/{}/{}.txt'.format(sat_abv, day.year,
                                                      day.strftime('%b'),
                                                      day.strftime('%b-%d-%Y'))

            url = '{}{}'.format(base, tail)
            r = requests.get(url).text

            for line in r.splitlines():
                l = line.split()
                try:
                    if l[0] == str(lndst_path_row[0]):
                        if l[1] == str(lndst_path_row[1]):
                            # dtime is in GMT
                            time_str = '{}-{}'.format(day.year, l[2])
                            ref_time = datetime.strptime(time_str, '%Y-%j-%H:%M:%S')

                            return ref_time

                except IndexError:
                    pass
                except TypeError:
                    pass

        raise OverpassNotFoundError('Did not find overpass data, check your dates...')
get_all_tracking_stats_for_date_range.py 文件源码 项目:nba_db 作者: dblackrun 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    logging.basicConfig(filename='logs/tracking_stats.log',level=logging.ERROR, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
    config=json.loads(open('config.json').read())

    command_line_args = sys.argv

    dates = utils.validate_dates(command_line_args)
    start_date = dates[0]
    end_date = dates[1]

    username = config['username']
    password = config['password']
    host = config['host']
    database = config['database']

    engine = create_engine('mysql://'+username+':'+password+'@'+host+'/'+database)

    DBSession = scoped_session(sessionmaker(autoflush=True,autocommit=False,bind=engine))

    hustle_stats_queue = Queue()
    team_tracking_queue = Queue()
    player_tracking_queue = Queue()
    passes_made_queue = Queue()
    # Create worker threads
    for x in range(8):
        hustle_stats_worker = sportvu.HustleStatsWorker(hustle_stats_queue, DBSession)
        team_tracking_worker = sportvu.TeamStatWorker(team_tracking_queue, DBSession)
        player_tracking_worker = sportvu.PlayerStatWorker(player_tracking_queue, DBSession)
        passes_made_worker = sportvu.PlayerPassesStatWorker(passes_made_queue, DBSession)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        hustle_stats_worker.daemon = True
        team_tracking_worker.daemon = True
        player_tracking_worker.daemon = True
        passes_made_worker.daemon = True
        hustle_stats_worker.start()
        team_tracking_worker.start()
        player_tracking_worker.start()
        passes_made_worker.start()
    # Put the tasks into the queue as a tuple
    for dt in rrule(DAILY, dtstart=start_date, until=end_date):
        date = dt.strftime("%m/%d/%Y")
        game_team_map, player_team_game_map, daily_game_ids = sportvu.get_player_game_team_maps_and_daily_game_ids(date)
        if len(daily_game_ids) > 0:
            season = utils.get_season_from_game_id(daily_game_ids[0])
            season_type = utils.get_season_type_from_game_id(daily_game_ids[0])
            if season_type != None:
                # hustle stats begin in 2015-16 playoffs
                hustle_stats_queue.put((game_team_map, player_team_game_map, date, season, season_type))
                for stat_type in constants.SPORTVU_GAME_LOG_STAT_TYPE_TABLE_MAPS.keys():
                    team_tracking_queue.put((stat_type, date, season, season_type, game_team_map, constants.SPORTVU_GAME_LOG_STAT_TYPE_TABLE_MAPS[stat_type]['Team']))
                    player_tracking_queue.put((stat_type, date, season, season_type, player_team_game_map, constants.SPORTVU_GAME_LOG_STAT_TYPE_TABLE_MAPS[stat_type]['Player']))
                for player_id in player_team_game_map.keys():
                    passes_made_queue.put((player_id, date, season, season_type, player_team_game_map))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    hustle_stats_queue.join()
    team_tracking_queue.join()
    player_tracking_queue.join()
    passes_made_queue.join()

    DBSession.remove()
get_tracking_shot_stats_for_date_range.py 文件源码 项目:nba_db 作者: dblackrun 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    logging.basicConfig(filename='logs/tracking_shot_stats.log',level=logging.ERROR, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
    config=json.loads(open('config.json').read())

    command_line_args = sys.argv

    dates = utils.validate_dates(command_line_args)
    start_date = dates[0]
    end_date = dates[1]

    username = config['username']
    password = config['password']
    host = config['host']
    database = config['database']

    engine = create_engine('mysql://'+username+':'+password+'@'+host+'/'+database)

    DBSession = scoped_session(sessionmaker(autoflush=True,autocommit=False,bind=engine))

    # get data
    for dt in rrule(DAILY, dtstart=start_date, until=end_date):
        date = dt.strftime("%m/%d/%Y")

        game_team_map, player_team_game_map, daily_game_ids = sportvu.get_player_game_team_maps_and_daily_game_ids(date)

        if len(daily_game_ids) > 0:
            team_queue = Queue()
            player_queue = Queue()
            # Create worker threads
            for x in range(5):
                team_worker = sportvu.TeamShotWorker(team_queue, DBSession)
                player_worker = sportvu.PlayerShotWorker(player_queue, DBSession)
                # Setting daemon to True will let the main thread exit even though the workers are blocking
                team_worker.daemon = True
                player_worker.daemon = True
                team_worker.start()
                player_worker.start()
            # Put the tasks into the queue as a tuple
            for close_def_dist_range in constants.CLOSE_DEF_DIST_RANGES:
                for shot_clock_range in constants.SHOT_CLOCK_RANGES:
                    for shot_dist_range in constants.SHOT_DIST_RANGES:
                        for touch_time_range in constants.TOUCH_TIME_RANGES:
                            for dribble_range in constants.DRIBBLE_RANGES:
                                for general_range in constants.GENERAL_RANGES:
                                    team_queue.put((date, daily_game_ids, game_team_map, close_def_dist_range, shot_clock_range, shot_dist_range, touch_time_range, dribble_range, general_range))
                                    player_queue.put((date, daily_game_ids, player_team_game_map, close_def_dist_range, shot_clock_range, shot_dist_range, touch_time_range, dribble_range, general_range))
            # Causes the main thread to wait for the queue to finish processing all the tasks
            team_queue.join()
            player_queue.join()

    DBSession.remove()
get_all_game_stats_for_date_range.py 文件源码 项目:nba_db 作者: dblackrun 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    logging.basicConfig(filename='logs/games.log',level=logging.ERROR, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
    config=json.loads(open('config.json').read())

    command_line_args = sys.argv

    dates = utils.validate_dates(command_line_args)
    start_date = dates[0]
    end_date = dates[1]

    username = config['username']
    password = config['password']
    host = config['host']
    database = config['database']

    engine = create_engine('mysql://'+username+':'+password+'@'+host+'/'+database)

    DBSession = scoped_session(sessionmaker(autoflush=True,autocommit=False,bind=engine))

    game_ids = []
    for dt in rrule(DAILY, dtstart=start_date, until=end_date):
        date_response = utils.get_scoreboard_response_for_date(dt.strftime("%m/%d/%Y"))
        date_data = utils.get_array_of_dicts_from_response(date_response, constants.SCOREBOARD_DATA_INDEX)
        for game_data in date_data:
            game_ids.append(game_data['GAME_ID'])

    if len(game_ids) > 0:
        game_data_queue = Queue()
        # Create worker threads
        for x in range(8):
            game_worker = game.GameWorker(game_data_queue, DBSession)
            # Setting daemon to True will let the main thread exit even though the workers are blocking
            game_worker.daemon = True
            game_worker.start()
        # Put the tasks into the queue as a tuple
        for game_id in game_ids:
            season = utils.get_season_from_game_id(game_id)
            season_type = utils.get_season_type_from_game_id(game_id)
            if season_type != None:
                game_data_queue.put((game_id, season, season_type))
        # Causes the main thread to wait for the queue to finish processing all the tasks
        game_data_queue.join()

    DBSession.remove()


问题


面经


文章

微信
公众号

扫码关注公众号