python类today()的实例源码

anilist.py 文件源码 项目:aniping 作者: kuruoujou 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_season_string(self, season_int=None):
        """Season string generator.

        Generates this month's season name, or the season name from a given season int.
        A season int is defined by anilist as "First 2 numbers are the year (16 is 2016). 
        Last number is the season starting at 1 (3 is Summer)."

        Keyword Args:
            season_int (int): An optional season int from anilist for a season.

        Returns:
            The season (winter, spring, summer, or fall) as a string.
        """
        monthsToSeasons = {(1,2,3):'winter', (4,5,6):'spring', (7,8,9):'summer', (10,11,12):'fall'}
        seasonIntToSeasons = {1:'winter', 2:'spring', 3:'summer', 4:'fall'}
        if season_int:
            season = seasonIntToSeasons[int(str(season_int)[2:])]
        else:
            targetDate = date.today()
            season = next(val for key, val in monthsToSeasons.items() if targetDate.month in key)
        return season
tibia.py 文件源码 项目:Jumper-Cogs 作者: Redjumpman 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _rashid_tibia(self):
        """Get Rashid's Location"""
        current_date = date.today()
        if calendar.day_name[current_date.weekday()] == "Sunday":
            await self.bot.say("On Sundays you can find him in Carlin depot, one floor above.")
        elif calendar.day_name[current_date.weekday()] == "Monday":
            await self.bot.say("On Mondays you can find him in Svargrond, in Dankwart's tavern, south of the temple.")
        elif calendar.day_name[current_date.weekday()] == "Tuesday":
            await self.bot.say("On Tuesdays you can find him in Liberty Bay, in Lyonel's tavern, west of the depot.")
        elif calendar.day_name[current_date.weekday()] == "Wednesday":
            await self.bot.say("On Wednesdays you can find him in Port Hope, in Clyde's tavern, north of the ship.")
        elif calendar.day_name[current_date.weekday()] == "Thursday":
            await self.bot.say("On Thursdays you can find him in Ankrahmun, in Arito's tavern, above the post office.")
        elif calendar.day_name[current_date.weekday()] == "Friday":
            await self.bot.say("On Fridays you can find him in Darashia, in Miraia's tavern, south of the guildhalls.")
        elif calendar.day_name[current_date.weekday()] == "Saturday":
            await self.bot.say("On Saturdays you can find him in Edron, in Mirabell's tavern, above the depot.")
        else:
            pass
test_cache_project_stats.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_stats_hours(self):
        """Test CACHE PROJECT STATS hours works."""
        pr = ProjectFactory.create()
        task = TaskFactory.create(n_answers=1)
        today = datetime.now(pytz.utc)
        TaskFactory.create()
        TaskRunFactory.create(project=pr, task=task)
        AnonymousTaskRunFactory.create(project=pr)
        hours, hours_anon, hours_auth, max_hours, \
            max_hours_anon, max_hours_auth = stats_hours(pr.id)
        assert len(hours) == 24, len(hours)
        assert hours[today.strftime('%H')] == 2, hours[today.strftime('%H')]
        assert hours_anon[today.strftime('%H')] == 1, hours_anon[today.strftime('%H')]
        assert hours_auth[today.strftime('%H')] == 1, hours_auth[today.strftime('%H')]
        assert max_hours == 2
        assert max_hours_anon == 1
        assert max_hours_auth == 1
test_cache_project_stats.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_stats_hours_with_period(self):
        """Test CACHE PROJECT STATS hours with period works."""
        pr = ProjectFactory.create()
        today = datetime.now(pytz.utc)
        d = date.today() - timedelta(days=6)
        task = TaskFactory.create(n_answers=1, created=d)
        TaskRunFactory.create(project=pr, task=task, created=d, finish_time=d)
        d = date.today() - timedelta(days=16)
        AnonymousTaskRunFactory.create(project=pr, created=d, finish_time=d)
        hours, hours_anon, hours_auth, max_hours, \
            max_hours_anon, max_hours_auth = stats_hours(pr.id)
        assert len(hours) == 24, len(hours)
        # We use 00 as the timedelta sets the hour to 00
        assert hours['00'] == 1, hours[today.strftime('%H')]
        assert hours_anon['00'] == 0, hours_anon[today.strftime('%H')]
        assert hours_auth['00'] == 1, hours_auth[today.strftime('%H')]
        assert max_hours == 1
        assert max_hours_anon is None
        assert max_hours_auth == 1
wallstreet.py 文件源码 项目:wallstreet 作者: mcdallas 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, quote, d=date.today().day, m=date.today().month,
                 y=date.today().year, strike=None, strict=False, source='google'):

        quote = quote.upper()
        kw = {'d': d, 'm': m, 'y': y, 'strict': strict, 'source': source}
        super().__init__(quote, self.__class__.Option_type, **kw)

        self.T = (self._expiration - date.today()).days/365
        self.q = self.underlying.dy
        self.ticker = quote
        self.strike = None
        self.strikes = tuple(parse(dic['strike']) for dic in self.data
                             if dic.get('p') != '-')
        if strike:
            if strike in self.strikes:
                self.set_strike(strike)
            else:
                if strict:
                    raise LookupError('No options listed for given strike price.')
                else:
                    closest_strike = min(self.strikes, key=lambda x: abs(x - strike))
                    print('No option for given strike, using %s instead' % closest_strike)
                    self.set_strike(closest_strike)
toggl_hammer.py 文件源码 项目:toggl-hammer 作者: david-cako 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, week_range):
        self.date_range = date.today() - timedelta(weeks=week_range)
        self.user_obj = requests.get('https://www.toggl.com/api/v8/me?with_related_data=true', auth=(API_KEY, 'api_token')).json()
        self.projects = [(x['name'], x['id']) for x in self.user_obj['data']['projects']] # comprehension returning list of (name, id) pairs
        # get last [week_range] weeks of entries
        self.time_entries = requests.get('https://www.toggl.com/api/v8/time_entries?start_date=' + \
                str(self.date_range) + 'T00:00:00' + TIMEZONE_ENCODED, auth=(API_KEY, 'api_token')).json()
        self.time_log = OrderedDict()
        while self.date_range <= date.today():
            self.time_log[str(self.date_range)] = LogEntry(self.date_range) 
            self.date_range = self.date_range + timedelta(days=1)
        for entry in self.time_entries:
            entry_date = entry['start'].split('T')[0] # split date from time
            self.time_log[entry_date].time = self.time_log[entry_date].time + entry['duration']
        for entry in self.time_log.values():
            entry.toHours()  # after iterating through each individual entry (many days having multiple), convert time to hours
upload.py 文件源码 项目:quokka_ng 作者: rochacbruno 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def dated_path(obj, file_data):
    try:
        prefix = getattr(obj, 'model_name')
    except BaseException:
        prefix = "undefined"

    parts = op.splitext(file_data.filename)
    rand = random.getrandbits(16)
    filename = u"{name}_{rand}{ext}".format(
        rand=rand, name=parts[0], ext=parts[1]
    )
    filename = secure_filename(filename)
    today = date.today()
    path = u"{prefix}/{t.year}/{t.month}/{filename}".format(
        prefix=prefix, t=today, filename=filename
    )
    return path
plugin.py 文件源码 项目:enigma2 作者: OpenLD 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def CheckDate(self):
        # Check if image is not to old for update (max 30days)
        self.CheckDateDone = True
        tmpdate = getEnigmaVersionString()
        imageDate = date(int(tmpdate[0:4]), int(tmpdate[5:7]), int(tmpdate[8:10]))
        datedelay = imageDate +  timedelta(days=30)
        message = _("Your image is out of date!\n\n"
                "After such a long time, there is a risk that your %s %s will not\n"
                "boot after online-update, or will show disfunction in running Image.\n\n"
                "A new flash will increase the stability\n\n"
                "An online update is done at your own risk !!\n\n\n"
                "Do you still want to update?") % (getMachineBrand(), getMachineName())

        if datedelay > date.today():
            self.updating = True
            self.activityTimer.start(100, False)
            self.ipkg.startCmd(IpkgComponent.CMD_UPGRADE_LIST)
        else:
            print"[SOFTWAREMANAGER] Your image is to old (%s), you need to flash new !!" %getEnigmaVersionString()
            self.session.openWithCallback(self.checkDateCallback, MessageBox, message, default = False)
            return
tests.py 文件源码 项目:pyconjp-website 作者: pyconjp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_session_detail(self):
        # really minimal test
        Conference.objects.get_or_create(id=settings.CONFERENCE_ID)
        section = Section.objects.create(
            conference=current_conference(),
        )
        schedule = Schedule.objects.create(
            section=section,
        )
        day = Day.objects.create(
            schedule=schedule,
            date=date.today(),
        )
        session = Session.objects.create(
            day=day,
        )
        url = reverse("schedule_session_detail", args=(session.pk,))
        rsp = self.client.get(url)
        self.assertEqual(200, rsp.status_code)
data_generator.py 文件源码 项目:py_exercise 作者: GoverSky 项目源码 文件源码 阅读 55 收藏 0 点赞 0 评论 0
def id(self, state=False):
        # ???????????
        distcode = self.dist_code
        id = distcode['code']  # 6????
        id += str(randint(1930, 2017))  # ??????
        da = date.today() + timedelta(days=randint(1, 366))  # ??????
        id += da.strftime('%m%d')  # ??????
        id += str(randint(100, 999))  # ????
        count = 0
        weight = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]  # ???
        checkcode = {'0': '1', '1': '0', '2': 'X', '3': '9', '4': '8', '5': '7', '6': '6', '7': '5', '8': '5', '9': '3',
                     '10': '2'}  # ???????
        for i in range(0, len(id)):
            count = count + int(id[i]) * weight[i]
        id += checkcode[str(count % 11)]  # ???????
        if state:
            return {'state': distcode['state'], 'id': id}
        else:
            return id
cardapio_notifications.py 文件源码 项目:CardapioParser 作者: gustavoavena 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def update_or_create_token(token, vegetariano):
    """
    Registra device token ou atualiza os seus parametros "last_used" e/ou "vegetariano".

    :param token: token a ser registrado ou atualizado.
    :param vegetariano: preferencia de cardapio do usuario.
    :return: True caso nao haja erros durante o processo.
    """
    new_dict = {"last_used": date.today().strftime("%y-%m-%d"), "vegetariano": vegetariano }

    db = setup_firebase()
    db.child('tokens').child(token).set(new_dict)


    print("Device token {} registrado com sucesso.".format(token))

    return True
cardapio_notifications.py 文件源码 项目:CardapioParser 作者: gustavoavena 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():

    today = datetime.utcnow()
    hour = today.hour


    # horario em UTC! horario Brasil = UTC - 3h (talvez diferente em horario de verao).

    if hour >= 13 and hour <= 15:
        refeicao = "almoço"
    elif hour >= 19 and hour <= 21:
        refeicao = "jantar"
    else:
        print("Tentativa de envio de notificação em horário impropio.")
        return



    mandar_proxima_refeicao(refeicao)
date_services.py 文件源码 项目:CardapioParser 作者: gustavoavena 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def next_weekdays(next, start_date_string = date.today().strftime("%Y-%m-%d")):
    start_date = datetime.strptime(start_date_string, URL_DATE_FORMAT)


    date_strings = []
    counter = 0
    while(len(date_strings) < next):
        d = start_date + timedelta(days=counter)
        if(d.weekday() in range(0, 5)):
            date_strings.append(d.strftime(URL_DATE_FORMAT))
        counter += 1




    return date_strings
utils.py 文件源码 项目:ArticlePusher 作者: aforwardz 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def save_today_new_articles(self, sub_class, name, new_dict):
        """
        :param name: site name
        :param new_dict: article dict of today
        :return: article dict eliminated articles of yesterday
        """
        new_key = self.generate_updated_key(sub_class, name)
        old_key = self.generate_old_key(sub_class, name)
        if self.client.exists(old_key):
            old_dict = eval(self.client.get(old_key).decode('utf-8'))
            for key in old_dict.keys():
                try:
                    new_dict.pop(key)
                except:
                    util_logger.info('(?•??•?)? ? | {team} has updated article ${key}% today!'.format(
                        team=name, key=key
                    ))
                    continue
            self.client.set(new_key, new_dict, ex=timedelta(days=10))
        else:
            self.client.set(new_key, new_dict, ex=timedelta(days=10))
bot.py 文件源码 项目:TycotBot 作者: Programeiros 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def msgDia():
    inst_keyboard = keyboard()
    msg = ['Coders', 'Jedi\'s', 'Programeiros']
    hj = date.today().weekday()
    #podem mudar as frases. Não sou tão criativo. ^^
    if hj == 0:
        hoje = ' Força que hoje é só o primeiro dia da semana!'
    elif hj == 2:
        hoje = ' Estamos no meio da semana. Tycot deseja-lhes muita sabedoria e paciência'
    elif hj == 4:
        hoje = ' Hoje é sexta! Não façam besteira ou vão perder o FDS!'
        #infelizmente, como está fora do handle, não há como pegar o ID do grupo. Se alguém souber, fique a vontade.
    bot.sendMessage(-1001068576090, parse_mode='HTML', 
        text='<i>Bom dia {}!{}</i>'.format(msg[random.randint(0,len(msg)-1)], hoje), reply_markup = inst_keyboard.keyboard_sugestao()) 
    bot.sendVideo(-1001068576090,
        'https://media.giphy.com/media/W4IY7zQdRh7Ow/giphy.gif')
clubblueprint.py 文件源码 项目:oclubs 作者: SHSIDers 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def newhm_submit(club):
    '''Input HongMei plan into databse'''
    contents = request.form['contents']
    try:
        actdate = date(int(request.form['year']),
                       int(request.form['month']),
                       int(request.form['day']))
    except ValueError:
        fail('Please input valid date to submit.', 'newhm')
    if contents == '' or actdate < date.today():
        fail('Please input contents or correct date to submit.', 'newhm')
    if form_is_valid():
        a = Activity.new()
        a.name = contents
        a.club = club
        a.description = FormattedText.emptytext()
        a.date = actdate
        a.time = ActivityTime.HONGMEI
        a.location = 'HongMei Elementary School'
        a.cas = 1
        a.post = FormattedText.emptytext()
        a.selections = []
        a.create()
    return redirect(url_for('.newhm', club=club.callsign))
test_swagger_spec.py 文件源码 项目:klue-client-server 作者: erwan-lemonnier 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_model_to_json(self):
        swagger_dict = yaml.load(Tests.yaml_complex_model)
        spec = ApiSpec(swagger_dict)

        Foo = spec.definitions['Foo']
        Bar = spec.definitions['Bar']

        f = Foo(
            token='abcd',
            bar=Bar(
                a=1,
                b=date.today()
            )
        )

        print("foo: " + pprint.pformat(f))

        j = spec.model_to_json(f)
        self.assertDictEqual(j, {
            'token': 'abcd',
            'bar': {
                'a': 1,
                'b': date.today().isoformat()
            }
        })
test_swagger_spec.py 文件源码 项目:klue-client-server 作者: erwan-lemonnier 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_json_to_model(self):
        swagger_dict = yaml.load(Tests.yaml_complex_model)
        spec = ApiSpec(swagger_dict)

        j = {
            'token': 'abcd',
            'bar': {
                'a': 1,
                'b': date.today().isoformat()
            }
        }

        m = spec.json_to_model('Foo', j)

        Foo = spec.definitions['Foo']
        Bar = spec.definitions['Bar']

        self.assertEqual(m.token, 'abcd')
        b = m.bar
        self.assertEqual(b.__class__.__name__, 'Bar')
        self.assertEqual(b.a, 1)
        self.assertEqual(str(b.b), str(date.today()) + " 00:00:00")
test_date.py 文件源码 项目:immobilus 作者: pokidovea 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_nested_context_manager():

    dt1 = date(2016, 1, 1)
    dt2 = date(2014, 10, 12)
    assert date.today() != dt1
    assert date.today() != dt2

    with immobilus('2016-01-01'):
        assert date.today() == dt1

        with immobilus('2014-10-12'):
            assert date.today() == dt2

        assert date.today() == dt1

    assert date.today() != dt1
    assert date.today() != dt2
test_adapters.py 文件源码 项目:plone.server 作者: plone 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_simple(self):
        values = [
            'foobar',
            ['foobar'],
            PersistentList(['foobar']),
            ('foobar',),
            frozenset(['foobar']),
            set(['foobar']),
            {'foo': 'bar'},
            PersistentMapping({'foo': 'bar'}),
            datetime.utcnow(),
            date.today(),
            time()
        ]
        for value in values:
            getAdapter(value, interface=IValueToJson)
serializers.py 文件源码 项目:timed-backend 作者: adfinis-sygroup 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_date(self, instance):
        user = instance.id
        today = date.today()

        if instance.date is not None:
            return instance.date

        # calculate last reported day if no specific date is set
        max_absence_date = Absence.objects.filter(
            user=user, date__lt=today).aggregate(date=Max('date'))
        max_report_date = Report.objects.filter(
            user=user, date__lt=today).aggregate(date=Max('date'))

        last_reported_date = max(
            max_absence_date['date'] or date.min,
            max_report_date['date'] or date.min
        )

        instance.date = last_reported_date
        return instance.date
twitter.py 文件源码 项目:Quantrade 作者: quant-trade 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def heatmap_to_twitter():
    try:
        now = date.today()
        d = now.day

        if d == 2:
            api = connect()
            for broker in Brokers.objects.all():
                image_filename = join(settings.STATIC_ROOT, 'collector', 'images', \
                    'heatmap', '{0}=={1}=={2}=={3}=={4}.png'.format(broker.slug, \
                    'AI50', '1440', 'AI50', 'longs'))

                if isfile(image_filename):
                    media = "https://quantrade.co.uk/static/collector/images/heatmap/{0}=={1}=={2}=={3}=={4}.png".\
                        format(broker.slug, 'AI50', '1440', 'AI50', 'longs')
                else:
                    media = None

                status = "Results including last month index performance for {}.".format(broker.title)

                api.PostUpdate(status=status, media=media)
                print(colored.green("Heatmap posted."))
    except Exception as e:
        print(colored.red("At heatmap_to_twitter {}".format(e)))
send_signals.py 文件源码 项目:Quantrade 作者: quant-trade 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def gen_time_data(df):
    t = {}
    now = date.today()
    t["ye"] = now.year
    t["mo"] = now.month
    t["to_day"] = now.day
    t["dow"] = now.weekday()
    t["prev_day"] = await get_prev_day(d=t["to_day"], mo=t["mo"])
    t["prev_mo"] = await get_prev_mo(mo=t["mo"])
    t["end_prev_day"] = [30, 31]
    df['ts'] = df.index
    df['ts'] = to_datetime(df['ts'])
    t["df_year"] = df['ts'].ix[-1].to_pydatetime().year
    t["df_month"] = df['ts'].ix[-1].to_pydatetime().month
    t["df_day"] = df['ts'].ix[-1].to_pydatetime().day
    t["df_weekday"] = df['ts'].ix[-1].to_pydatetime().weekday()

    return t, df
plugin.py 文件源码 项目:taemin 作者: ningirsu 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def get_day(cls, day):
        if day in ("aujourd'hui", "today"):
            return date.today()

        if day in ("demain", "tomorrow"):
            return date.today() + timedelta(days=1)

        if day in cls.WEEK_DAY:
            iterdate = date.today()
            while True:
                if iterdate.weekday() == cls.WEEK_DAY[day]:
                    return iterdate

                iterdate += timedelta(days=1)

        return None
gateway.py 文件源码 项目:rqalpha-mod-vnpy 作者: ricequant 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def connect_and_sync_data(self):
        self._connect()
        self.on_log('??????')

        if self._data_update_date != date.today():
            self._qry_instrument()
            self._qry_account()
            self._qry_position()
            self._qry_order()
            self._data_update_date = date.today()
            self._qry_commission()

        self._subscribe_all()
        self.on_log('???????')

        self._env.event_bus.add_listener(EVENT.POST_UNIVERSE_CHANGED, self.on_universe_changed)
gateway.py 文件源码 项目:rqalpha-mod-vnpy 作者: ricequant 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def on_trade(self, trade_dict):
        self.on_debug('????: %s' % str(trade_dict))
        if self._data_update_date != date.today():
            self._cache.cache_trade(trade_dict)
        else:
            account = Environment.get_instance().get_account(trade_dict.order_book_id)

            if trade_dict.trade_id in account._backward_trade_set:
                return

            try:
                order = self.order_objects[trade_dict.order_id]
            except KeyError:
                order = Order.__from_create__(trade_dict.order_book_id,
                                              trade_dict.amount, trade_dict.side, trade_dict.style,
                                              trade_dict.position_effect)
            commission = cal_commission(trade_dict, order.position_effect)
            trade = Trade.__from_create__(
                trade_dict.order_id, trade_dict.price, trade_dict.amount,
                trade_dict.side, trade_dict.position_effect, trade_dict.order_book_id, trade_id=trade_dict.trade_id,
                commission=commission, frozen_price=trade_dict.price)

            order.fill(trade)
            self._env.event_bus.publish_event(RqEvent(EVENT.TRADE, account=account, trade=trade))
models.py 文件源码 项目:teamreporter 作者: agilentia 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def can_issue_daily(self):
        """
        ``Report`` can issue ``DailyReport`` if and only if
            - occurs today ( hence ``get_daily`` ),
            - daily hasn't been issued yet for day,
            - members list is not empty,
            - questions list is not empty.

        :return: whether daily report can be generated
        :rtype: bool
        """
        already_issued = self.dailyreport_set.filter(date=date.today()).exists()
        group_not_empty = self.team.users.exists()
        questions_not_empty = self.question_set.filter(active=True).exists()
        return all([self.occurs_today,
                    group_not_empty,
                    questions_not_empty,
                    self.survey_send_time <= now().time(),
                    not already_issued])
privacy_prefs.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def etl_job(sc, sqlContext, submission_date=None, save=True):
    s3_path = 's3n://telemetry-parquet/harter/privacy_prefs_shield/v2'
    if submission_date is None:
        submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d")

    pings = Dataset.from_source(
        "telemetry"
    ).where(
        docType="shield-study",
        submissionDate=submission_date,
        appName="Firefox",
    ).records(sc)

    transformed_event_pings = transform_event_pings(sqlContext, pings)

    transformed_state_pings = transform_state_pings(sqlContext, pings)

    transformed_pings = transformed_event_pings.union(transformed_state_pings)

    if save:
        path = s3_path + '/submission_date={}'.format(submission_date)
        transformed_pings.repartition(1).write.mode('overwrite').parquet(path)

    return transformed_pings
utils.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def shield_etl_boilerplate(transform_func, s3_path):
    def etl_job(sc, sqlContext, submission_date=None, save=True):
        if submission_date is None:
            submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d")

        pings = Dataset.from_source(
            "telemetry"
        ).where(
            docType="shield-study",
            submissionDate=submission_date,
            appName="Firefox",
        ).records(sc)

        transformed_pings = transform_func(sqlContext, pings)

        if save:
            path = s3_path + '/submission_date={}'.format(submission_date)
            transformed_pings.repartition(1).write.mode('overwrite').parquet(path)

        return transformed_pings

    return etl_job
utils.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testpilot_etl_boilerplate(transform_func, s3_path):
    def etl_job(sc, sqlContext, submission_date=None, save=True):
        if submission_date is None:
            submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d")

        pings = Dataset.from_source(
            "telemetry"
        ).where(
            docType="testpilottest",
            submissionDate=submission_date,
            appName="Firefox",
        ).records(sc)

        transformed_pings = transform_func(sqlContext, pings)

        if save:
            # path = 's3://telemetry-parquet/testpilot/txp_pulse/v1/submission_date={}'
            path = s3_path + '/submission_date={}'.format(submission_date)
            transformed_pings.repartition(1).write.mode('overwrite').parquet(path)

        return transformed_pings

    return etl_job


问题


面经


文章

微信
公众号

扫码关注公众号