python类now()的实例源码

trader.py 文件源码 项目:algo-trading-pipeline 作者: NeuralKnot 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def sell_positions(self):
        q = Query()
        test_func = lambda closed: not closed
        docs = self.position_db.search(q.closed.test(test_func))

        # Sell and remove position if >1hr old
        for doc in docs:
            if arrow.get(doc["at"]) < (arrow.now() - datetime.timedelta(hours=1)):
                self.logger.log("Trader/Seller", "informative", "Selling position for contract " + doc["contract_id"] + "!")

                if self.web_interface.have_position_in_market(doc["contract_id"]):
                    self.web_interface.sell(doc["contract_id"], doc["side"], doc["amount"])

                self.position_db.update({ "closed": True }, eids=[doc.eid])

    # Make a trade based on the result
data_input.py 文件源码 项目:algo-trading-pipeline 作者: NeuralKnot 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def poll_for_articles(self):
        while True:
            at = 0
            for source in self.sources:
                self.logger.log("Data Input", "informative", "Polling: " + source["news_api_name"])

                articles = source["news_api_instance"].get_articles()
                if articles is not None:
                    for article in articles:
                        # Skip duplicates
                        q = Query()
                        if len(self.sources[at]["articles_db"].search(q.title == article["title"])) == 0:
                            self.queue_article(article)
                            self.sources[at]["articles_db"].insert({"title": article["title"], "at": str(arrow.now())})

                at = at + 1

            # Sleep for interval time
            time.sleep(self.config["data_input"]["poll_interval"])

    # Adds the given article to the queue
tools.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_new_article(pages):
    """
    Get random new tale or scp article.

    Return random article yonger than 30 days, with rating of at least
    40 points for a skip and 20 points for a tale.
    """
    date = arrow.now().replace(days=-30).format('YYYY-MM-DD')
    pages = [p for p in pages if p.created > date]

    skips = [p for p in pages if 'scp' in p.tags and p.rating >= 40]
    tales = [p for p in pages if 'tale' in p.tags and p.rating >= 20]
    goi = [p for p in pages if 'goi-format' in p.tags and p.rating >= 20]
    pages = skips + tales + goi

    return random.choice(pages) if pages else None
tools.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_post_data(api):
    tweets = api.user_timeline(count=100)
    tweets = [i for i in tweets if i.source == core.config.twitter.name]
    urls = [i.entities['urls'] for i in tweets]
    urls = [i[0]['expanded_url'] for i in urls if i]
    posted = [p for p in core.pages if p.url in urls]
    not_posted = [p for p in core.pages if p not in posted]

    new = _get_new_article(not_posted)
    if new:
        # post new articles if there are any
        return (lex.post_on_twitter.new, new)

    if tweets and tweets[0].created_at == arrow.now().naive:
        # if we posted an old article today already, don't post anything
        return None

    if any('scp' in p.tags for p in posted[:2]):
        # post tale, tale, tale, scp, tale, tale, tale, scp, tale...
        old = _get_old_article(not_posted, scp=False)
    else:
        old = _get_old_article(not_posted, scp=True)
    return (lex.post_on_twitter.old, old)
core.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def cooldown(time):
    def decorator(func):
        func._cooldown = {}

        @functools.wraps(func)
        def inner(inp, *args, **kwargs):
            now = arrow.now()

            if inp.channel not in func._cooldown:
                pass
            elif (now - func._cooldown[inp.channel]).seconds < time:
                inp.multiline = False
                return lex.cooldown

            func._cooldown[inp.channel] = now
            return func(inp, *args, **kwargs)
        return inner
    return decorator
test_date_utils.py 文件源码 项目:glog-cli 作者: globocom 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_datetime_parser(self):
        now = arrow.now()
        ts_tuples = [
            ("10 minutes ago", lambda x: x.replace(minutes=-10, microsecond=0, tzinfo='local')),
            ("1 day ago", lambda x: x.replace(days=-1, microsecond=0, tzinfo='local')),
            ("yesterday midnight",
             lambda x: x.replace(days=-1, hour=0, minute=0, second=0, microsecond=0, tzinfo='local')),
            ("1986-04-24 00:51:24+02:00", lambda x: arrow.get("1986-04-24 00:51:24+02:00")),
            ("2001-01-01 01:01:01", lambda x: arrow.get("2001-01-01 01:01:01").replace(tzinfo="local")),
            (now, lambda x: now)]

        for (s, ts) in ts_tuples:
            self.assertEquals(datetime_parser(s), ts(arrow.now()))

        with self.assertRaises(ValueError):
            datetime_parser("fdjkldfhskl")
future.py 文件源码 项目:slaveo 作者: lamter 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_tradeday(self, now):
        """
        ?????????
        :param now:
        :return: bool(??????), ?????
        >>> now = datetime.datetime(2016,10, 25, 0, 0, 0) # ??????????
        >>> futureTradeCalendar.get_tradeday(now)
        (True, Timestamp('2016-10-25 00:00:00'))

        """

        t = now.time()
        day = self.calendar.ix[now.date()]
        if DAY_LINE < t < NIGHT_LINE:
            # ??, ?????
            return day.day_trade, day.tradeday
        elif NIGHT_LINE < t:
            # ?????????
            return day.night_trade, day.next_td
        else:
            # ?????????????????
            return day.midnight_trade, day.tradeday
pipelines.py 文件源码 项目:housebot 作者: jbkopecky 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process_item(self, item, domain):
        now = arrow.now()
        seen = self.check_seen_before(item)
        if len(seen) > 0:
            last_seen = max(seen)
            time_limit = now.replace(**self.time_scale).timestamp
            if last_seen < time_limit:
                self.insert_item_price(item, now.timestamp)
            raise DropItem("Already seen %s, %s" % (item['url'], arrow.get(last_seen).humanize()))
        else:
            self.insert_item_price(item, now.timestamp)
            self.insert_item_main(item)
            self.insert_item_tag_list(item)
            self.insert_item_description(item)
            self.conn.commit()
            return item
logger.py 文件源码 项目:InstaBot 作者: nickpettican 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, header, backupUnfollows, bucketUnfollow):
        # initialise the logger variables

        self.path = 'cache/log/'
        self.log_temp = ''
        self.new_line = True
        self.backupUnfollows = backupUnfollows
        self.bucketUnfollow = bucketUnfollow
        self.today = arrow.now().format('DD_MM_YYYY')

        if not path.isdir(self.path):
            makedirs(self.path)

        self.init_log_name()

        print header
analyze.py 文件源码 项目:freqtrade 作者: gcarq 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_signal(pair: str, signal: SignalType) -> bool:
    """
    Calculates current signal based several technical analysis indicators
    :param pair: pair in format BTC_ANT or BTC-ANT
    :return: True if pair is good for buying, False otherwise
    """
    dataframe = analyze_ticker(pair)
    if dataframe.empty:
        return False

    latest = dataframe.iloc[-1]

    # Check if dataframe is out of date
    signal_date = arrow.get(latest['date'])
    if signal_date < arrow.now() - timedelta(minutes=10):
        return False

    result = latest[signal.value] == 1
    logger.debug('%s_trigger: %s (pair=%s, signal=%s)', signal.value, latest['date'], pair, result)
    return result
predict_mult_tar.py 文件源码 项目:RNN_LSTM_Stock_Model 作者: als5ev 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_stock_data(stock_ticker, num_days_back, minimum_days):
    print("GETTING STOCK DATA")

    end_date = arrow.now().format("YYYY-MM-DD")
    start_date = arrow.now()
    start_date = start_date.replace(days=(num_days_back*-1)).format("YYYY-MM-DD")

    quandl_api_key = "DqEaArDZQP8SfgHTd_Ko"
    quandl.ApiConfig.api_key = quandl_api_key

    source = "WIKI/" + stock_ticker

    print("    Retrieving data from quandl API...")
    data = quandl.get(source, start_date=str(start_date), end_date=str(end_date))
    data = data[["Open", "High", "Low", "Volume", "Close"]].as_matrix()

    if len(data) < minimum_days:
        raise quandl.errors.quandl_error.NotFoundError

    return data
calendar.py 文件源码 项目:vishleva.com 作者: webmalc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, begin=None, end=None):
        if begin:
            self.begin = arrow.Arrow.strptime(begin, '%Y-%m-%d', settings.TIME_ZONE) if begin else arrow.now()
            self.begin = self.begin.floor('day').to('UTC').datetime
        elif end:
            to = arrow.Arrow.strptime(end, '%Y-%m-%d', settings.TIME_ZONE).floor('day').to('UTC').datetime
            self.begin = to - timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD)
        else:
            self.begin = arrow.now()
            self.begin = self.begin.floor('day').to('UTC').datetime

        if end:
            self.end = arrow.Arrow.strptime(end, '%Y-%m-%d', settings.TIME_ZONE).floor('day').to('UTC').datetime
        else:
            self.end = self.begin + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD)

        self.events = Event.objects.get_by_dates(begin=self.begin, end=self.end)
test_models.py 文件源码 项目:vishleva.com 作者: webmalc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_for_closing(self):
        now = arrow.now().floor('day').to('UTC').datetime

        event_before = Event()
        event_before.begin = now - timezone.timedelta(days=3)
        event_before.end = now - timezone.timedelta(days=4)
        event_before.title = 'test_title_now'
        event_before.status = 'open'
        event_before.save()

        event_after = copy.copy(event_before)
        event_after.id = None
        event_after.begin = now + timezone.timedelta(days=3)
        event_after.end = now + timezone.timedelta(days=4)
        event_after.save()

        queryset = Event.objects.get_for_closing()

        self.assertTrue(event_before in queryset)
        self.assertTrue(event_after not in queryset)
        self.assertEqual(queryset.count(), 2)

        for event in queryset:
            self.assertTrue(event.end < now)
            self.assertTrue(event.paid >= event.total)
test_models.py 文件源码 项目:vishleva.com 作者: webmalc 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_calendar(self):
        calendar = Calendar()
        now = arrow.now().floor('day').to('UTC').datetime
        week = now + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD)
        self.assertEqual(now, calendar.begin)
        self.assertEqual(week, calendar.end)

        event = Event()
        event.begin = now + timezone.timedelta(days=3)
        event.end = now + timezone.timedelta(days=4)
        event.title = 'test_title_now'
        event.status = 'open'
        event.save()

        days = calendar.get_days()
        self.assertEqual(settings.EVENTS_CALENDAR_PERIOD, len(days))

        for element in days:
            if event.begin <= element.date < event.end:
                self.assertIn(event, element.events)
            for hour in element.hours:
                if event.begin <= hour.date < event.end:
                    self.assertIn(event, hour.events)
models.py 文件源码 项目:vishleva.com 作者: webmalc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_for_notification(self, begin=None, end=None, hours=24):
        """
        :param begin: from datetime
        :type begin: datetime.datetime
        :param end: to datetime
        :type hours: timedelta hours
        :param hours: int
        :type end: datetime.datetime
        :return: Events
        :rtype: queryset
        """
        begin = begin if begin else timezone.datetime.now()
        end = end if end else begin + timezone.timedelta(hours=hours)
        queryset = self.get_queryset()
        return queryset.filter(
            notified_at__isnull=True,
            status='open',
            begin__gte=begin,
            begin__lte=end)
playlist_adder.py 文件源码 项目:apex-sigma 作者: lu-ci 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def yt_playlist_adder(sid, cmd, req, playlist_obj):
    music = cmd.music
    counter = 0
    for item in playlist_obj:
        hour, minute, second = item.duration.split(':')
        total_time = (int(hour) * 3600) + (int(minute) * 60) + int(second)
        if total_time <= 600:
            counter += 1
            data = {
                'url': 'https://www.youtube.com/watch?v=' + item.videoid,
                'type': 0,
                'requester': req,
                'sound': item,
                'timestamp': arrow.now().timestamp
            }
            await music.add_to_queue(sid, data)
        if counter >= 200:
            break
    return counter
GoogleFinanceQuoteAdapter.py 文件源码 项目:paperbroker 作者: philipodonnell 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get_options(self, underlying_asset=None, expiration_date=None):
        oc = OptionChain('NASDAQ:' + asset_factory(underlying_asset).symbol)
        underlying_quote = self.get_quote(underlying_asset)

        out = []
        for option in (oc.calls + oc.puts):
            if arrow.get(expiration_date).format('YYMMDD') in option['s']:
                quote = OptionQuote(quote_date=arrow.now().format('YYYY-MM-DD'),
                                asset=option['s'],
                                bid=float(option['b']) if option['b'] != '-' else None,
                                ask=float(option['a']) if option['a'] != '-' else None,
                                underlying_price = underlying_quote.price)
                self._set_cache(quote)
                out.append(quote)

        return out





# the code below is from https://github.com/makmac213/python-google-option-chain
application.py 文件源码 项目:xmind2testlink 作者: tobyqin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def save_file(file):
    if file and allowed_file(file.filename):
        filename = secure_filename(file.filename)
        upload_to = join(app.config['UPLOAD_FOLDER'], filename)

        if exists(upload_to):
            filename = '{}_{}.xmind'.format(filename[:-6], arrow.now().strftime('%Y%m%d_%H%M%S'))
            upload_to = join(app.config['UPLOAD_FOLDER'], filename)

        file.save(upload_to)
        insert_record(filename)
        g.is_success = True

    elif file.filename == '':
        g.is_success = False
        g.error = "Please select a file!"

    else:
        g.is_success = False
        g.invalid_files.append(file.filename)
statnett.py 文件源码 项目:electricitymap 作者: tmrowco 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def fetch_exchange_by_bidding_zone(bidding_zone1='DK1', bidding_zone2='NO2', session=None):
    bidding_zone_a, bidding_zone_b = sorted([bidding_zone1, bidding_zone2])
    r = session or requests.session()
    timestamp = arrow.now().timestamp * 1000
    url = 'http://driftsdata.statnett.no/restapi/PhysicalFlowMap/GetFlow?Ticks=%d' % timestamp
    response = r.get(url)
    obj = response.json()

    exchange = filter(
        lambda x: set([x['OutAreaElspotId'], x['InAreaElspotId']]) == set([bidding_zone_a, bidding_zone_b]),
        obj)[0]

    return {
        'sortedBiddingZones': '->'.join([bidding_zone_a, bidding_zone_b]),
        'netFlow': exchange['Value'] if bidding_zone_a == exchange['OutAreaElspotId'] else -1 * exchange['Value'],
        'datetime': arrow.get(obj[0]['MeasureDate'] / 1000).datetime,
        'source': 'driftsdata.stattnet.no',
    }
ENTSOE.py 文件源码 项目:electricitymap 作者: tmrowco 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def fetch_generation_forecast(country_code, session=None, now=None):
    if not session: session = requests.session()
    domain = ENTSOE_DOMAIN_MAPPINGS[country_code]
    # Grab consumption
    parsed = parse_generation_forecast(query_generation_forecast(domain, session, now))
    if parsed:
        data = []
        values, datetimes = parsed
        for i in range(len(values)):
            data.append({
                'countryCode': country_code,
                'datetime': datetimes[i].datetime,
                'value': values[i],
                'source': 'entsoe.eu'
            })

        return data
ENTSOE.py 文件源码 项目:electricitymap 作者: tmrowco 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def fetch_consumption_forecast(country_code, session=None, now=None):
    if not session: session = requests.session()
    domain = ENTSOE_DOMAIN_MAPPINGS[country_code]
    # Grab consumption
    parsed = parse_consumption_forecast(query_consumption_forecast(domain, session, now))
    if parsed:
        data = []
        values, datetimes = parsed
        for i in range(len(values)):
            data.append({
                'countryCode': country_code,
                'datetime': datetimes[i].datetime,
                'value': values[i],
                'source': 'entsoe.eu'
            })

        return data
quality.py 文件源码 项目:electricitymap 作者: tmrowco 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validate_production(obj, country_code):
    if not 'datetime' in obj:
        raise Exception('datetime was not returned for %s' % country_code)
    if not 'countryCode' in obj:
        raise Exception('countryCode was not returned for %s' % country_code)
    if not type(obj['datetime']) == datetime.datetime:
        raise Exception('datetime %s is not valid for %s' % (obj['datetime'], country_code))
    if obj.get('countryCode', None) != country_code:
        raise Exception("Country codes %s and %s don't match" % (obj.get('countryCode', None), country_code))
    if arrow.get(obj['datetime']) > arrow.now():
        raise Exception("Data from %s can't be in the future" % country_code)
    if obj.get('production', {}).get('unknown', None) is None and \
        obj.get('production', {}).get('coal', None) is None and \
        obj.get('production', {}).get('oil', None) is None and \
        country_code not in ['CH', 'NO', 'AUS-TAS']:
        raise Exception("Coal or oil or unknown production value is required for %s" % (country_code))
    for k, v in obj['production'].iteritems():
        if v is None: continue
        if v < 0: raise ValueError('%s: key %s has negative value %s' % (country_code, k, v))
AR.py 文件源码 项目:electricitymap 作者: tmrowco 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_datetime(session=None):
    """
    Generation data is updated hourly.  Makes request then finds most recent hour available.
    Returns an arrow datetime object using UTC-3 for timezone and zero for minutes and seconds.
    """

    #Argentina does not currently observe daylight savings time.  This may change from year to year!
    #https://en.wikipedia.org/wiki/Time_in_Argentina
    s = session or requests.Session()
    rt = s.get(url)
    timesoup = BeautifulSoup(rt.content, 'html.parser')
    find_hour = timesoup.find("option", selected = "selected", value = "1" ).getText()
    at = arrow.now('UTC-3').floor('hour')
    datetime = (at.replace(hour = int(find_hour), minute = 0, second = 0)).datetime

    return {'datetime': datetime}
BO.py 文件源码 项目:electricitymap 作者: tmrowco 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fetch_generation_forecast(country_code = 'BO', session=None):
    #Define actual and last day (for midnight data)
    formatted_date = arrow.now(tz=tz_bo).format('YYYY-MM-DD')

    #Define output frame
    data = [dict() for h in range(24)]

    #initial path for url to request
    url_init = 'http://www.cndc.bo/media/archivos/graf/gene_hora/despacho_diario.php?fechag=' 
    url = url_init + formatted_date

    #Request and rearange in DF
    r = session or requests.session()
    response = r.get(url)
    obj = webparser(response)

    for h in range(1,25):
        data_temp = fetch_hourly_generation_forecast('BO', obj, h, formatted_date)
        data[h-1] = data_temp

    return data
DO.py 文件源码 项目:electricitymap 作者: tmrowco 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def merge_production(thermal, total):
    """
    Takes thermal generation and total generation and merges them using 'datetime' key.
    Returns a defaultdict.
    """

    d = defaultdict(dict)
    for each in (thermal, total):
        for elem in each:
            d[elem['datetime']].update(elem)

    final = sorted(d.values(), key=itemgetter("datetime"))

    def get_datetime(hour):
        at = arrow.now('America/Dominica').floor('day')
        dt = (at.shift(hours=int(hour) - 1)).datetime
        return dt

    for item in final:
        i = item['datetime']
        j = get_datetime(i)
        item['datetime'] = j

    return final
pending.py 文件源码 项目:virt-backup 作者: Anthony25 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def add_disks(self, *dev_disks):
        """
        Add disk by dev name

        .. warning::

            Adding a disk during a backup is not recommended, as the current
            disks list could be inaccurate. It will pull the informations
            about the current disks attached to the domain, but the backup
            process creates temporary external snapshots, changing the current
            disks attached. This should not be an issue when the backingStore
            property will be correctly handled, but for now it is.

        :param dev_disk: dev name of the new disk to backup. If not indicated,
                         will add all disks.
        """
        dom_all_disks = self._get_self_domain_disks()
        if not dev_disks:
            self.disks = dom_all_disks
        for dev in dev_disks:
            if dev in self.disks:
                continue
            self.disks[dev] = dom_all_disks[dev]
util.py 文件源码 项目:tfatool 作者: TadLeonard 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _parse_date(date_els):
    if len(date_els) == 2:
        # assumed to be year-month or month-year
        a, b = date_els
        if _is_year(a):
            date_vals = a, b, 1  # 1st of month assumed
        elif _is_year(b):
            date_vals = b, a, 1  # 1st of month assumed
        else:
            date_vals = arrow.now().year, a, b  # assumed M/D of this year
    elif len(date_els) == 3:
        # assumed to be year-month-day or month-day-year
        a, b, c = date_els
        if _is_year(a):
            date_vals = a, b, c
        elif _is_year(c):
            date_vals = c, a, b
        else:
            raise ValueError("Date '{}' can't be understood".format(date_els))
    else:
        raise ValueError("Date '{}' can't be understood".format(date_els))
    return map(int, date_vals)
server.py 文件源码 项目:ohlife_replacement_server 作者: paulx3 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def protected_save():
    """
    after approved , save text to entries table
    """
    today = arrow.now().format('YYYY-MM-DD')
    entry = flask.session["entry"]
    user_id = flask.session["user_real_id"]
    file_name = flask.session["file_name"]
    print(entry)
    print(user_id)
    db_entry = Entries.query.filter_by(user_id=user_id, time=today).first()
    if db_entry is None:
        entry = Entries(time=today, text=entry, user_id=user_id, file_name=file_name)
        db.session.add(entry)
    else:
        # only the first photo attachment will be saved
        db_entry.text = db_entry.text + "\n" + entry
    db.session.commit()
    return "Save Success"
todoist.py 文件源码 项目:kino-bot 作者: DongjunLee 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __get_event_counts(self):
        activity_log_list = self.todoist_api.activity.get()
        added_task_count = 0
        completed_task_count = 0
        updated_task_count = 0

        today = arrow.now().to('Asia/Seoul')
        start, end = today.span('day')

        for log in activity_log_list:
            event_date = arrow.get(
                log['event_date'],
                'DD MMM YYYY HH:mm:ss Z').to('Asia/Seoul')
            if event_date < start or event_date > end:
                continue

            event_type = log['event_type']
            if event_type == 'added':
                added_task_count += 1
            elif event_type == 'completed':
                completed_task_count += 1
            elif event_type == 'updated':
                updated_task_count += 1
        return added_task_count, completed_task_count, updated_task_count
arrow.py 文件源码 项目:kino-bot 作者: DongjunLee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_between(start_time: tuple, end_time: tuple, now=None) -> bool:
        if start_time is None and end_time is None:
            return True

        if now is None:
            now = datetime.datetime.now()

        start_h, start_m = start_time
        end_h, end_m = end_time

        if end_h == 24 and end_m == 0:
            end_h = 23
            end_m = 59

        start = now.replace(
            hour=start_h,
            minute=start_m,
            second=0,
            microsecond=0)
        end = now.replace(hour=end_h, minute=end_m, second=0, microsecond=0)
        if (start <= now <= end):
            return True
        else:
            return False


问题


面经


文章

微信
公众号

扫码关注公众号