python类timedelta()的实例源码

test_risk_period.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_partial_month(self):

        start = datetime.datetime(
            year=1991,
            month=1,
            day=1,
            hour=0,
            minute=0,
            tzinfo=pytz.utc)

        # 1992 and 1996 were leap years
        total_days = 365 * 5 + 2
        end = start + datetime.timedelta(days=total_days)
        sim_params90s = SimulationParameters(
            period_start=start,
            period_end=end,
            env=self.env,
        )

        returns = factory.create_returns_from_range(sim_params90s)
        returns = returns[:-10]  # truncate the returns series to end mid-month
        metrics = risk.RiskReport(returns, sim_params90s, env=self.env)
        total_months = 60
        self.check_metrics(metrics, total_months, start)
trader.py 文件源码 项目:algo-trading-pipeline 作者: NeuralKnot 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def sell_positions(self):
        q = Query()
        test_func = lambda closed: not closed
        docs = self.position_db.search(q.closed.test(test_func))

        # Sell and remove position if >1hr old
        for doc in docs:
            if arrow.get(doc["at"]) < (arrow.now() - datetime.timedelta(hours=1)):
                self.logger.log("Trader/Seller", "informative", "Selling position for contract " + doc["contract_id"] + "!")

                if self.web_interface.have_position_in_market(doc["contract_id"]):
                    self.web_interface.sell(doc["contract_id"], doc["side"], doc["amount"])

                self.position_db.update({ "closed": True }, eids=[doc.eid])

    # Make a trade based on the result
sensu_alert_manager.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(
        self,
        interval_in_seconds,
        service_name,
        result_dict,
        max_delay_seconds,
        disable=False
    ):
        super(SensuAlertManager, self).__init__(interval_in_seconds)
        self._service_name = service_name
        self._setup_ok_result_dict(result_dict)
        self._setup_delayed_result_dict()
        self._setup_disabled_alert_dict()
        self._log = logging.getLogger('{}.util.sensu_alert_manager'.format(service_name))
        self._disable = disable
        self._should_send_sensu_disabled_message = False
        self._max_delay = timedelta(seconds=max_delay_seconds)
telegram_bare_client.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, session, api_id, api_hash,
                 proxy=None, timeout=timedelta(seconds=5)):
        """Initializes the Telegram client with the specified API ID and Hash.
           Session must always be a Session instance, and an optional proxy
           can also be specified to be used on the connection.
        """
        self.session = session
        self.api_id = int(api_id)
        self.api_hash = api_hash
        self.proxy = proxy
        self._timeout = timeout
        self._logger = logging.getLogger(__name__)

        # Cache "exported" senders 'dc_id: TelegramBareClient' and
        # their corresponding sessions not to recreate them all
        # the time since it's a (somewhat expensive) process.
        self._cached_clients = {}

        # These will be set later
        self.dc_options = None
        self._sender = None

    # endregion

    # region Connecting
run.py 文件源码 项目:apiTest 作者: wuranxu 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def login():
    db = UserDb(app.config['LOCAL_DB'])
    form = request.form
    user = form.get('user')
    pwd = form.get('pwd')
    password = db.login(user)
    del db
    if pwd == password:
        # ??????
        session.permanent = True
        # session????
        app.permanent_session_lifetime = timedelta(minutes=30)
        session.update(dict(user=user))
        return render_template('index.html')
    elif password is None:
        return render_template('login.html', info="??????!")
    else:
        return render_template('login.html', info="?????!")
calendar.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def itermonthdates(self, year, month):
        """
        Return an iterator for one month. The iterator will yield datetime.date
        values and will always iterate through complete weeks, so it will yield
        dates outside the specified month.
        """
        date = datetime.date(year, month, 1)
        # Go back to the beginning of the week
        days = (date.weekday() - self.firstweekday) % 7
        date -= datetime.timedelta(days=days)
        oneday = datetime.timedelta(days=1)
        while True:
            yield date
            date += oneday
            if date.month != month and date.weekday() == self.firstweekday:
                break
__main__.py 文件源码 项目:awsmfa 作者: dcoker 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def use_testing_credentials(args, credentials):
    print("Skipping AWS API calls because AWSMFA_TESTING_MODE is set.",
          file=sys.stderr)
    # AWS returns offset-aware UTC times, so we fake that in order to
    # verify consistent code paths between py2 and py3 datetime.
    fake_expiration = (datetime.datetime.now(tz=pytz.utc) +
                       datetime.timedelta(minutes=5))
    fake_credentials = {
        'AccessKeyId': credentials.get(args.identity_profile,
                                       'aws_access_key_id'),
        'SecretAccessKey': credentials.get(args.identity_profile,
                                           'aws_secret_access_key'),
        'SessionToken': "420",
        'Expiration': fake_expiration,
    }
    print_expiration_time(fake_expiration)
    update_credentials_file(args.aws_credentials,
                            args.target_profile,
                            args.identity_profile,
                            credentials,
                            fake_credentials)
generic.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def is_dst(zonename):
    """Check if current time in a time zone is in dst.

    From: http://stackoverflow.com/a/19778845/1489738
    """
    tz = pytz.timezone(zonename)
    now = pytz.utc.localize(datetime.datetime.utcnow())
    return now.astimezone(tz).dst() != datetime.timedelta(0)
previews.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def fetch_og_preview(content, urls):
    """Fetch first opengraph entry for a list of urls."""
    for url in urls:
        # See first if recently cached already
        if OpenGraphCache.objects.filter(url=url, modified__gte=now() - datetime.timedelta(days=7)).exists():
            opengraph = OpenGraphCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(opengraph=opengraph)
            return opengraph
        try:
            og = OpenGraph(url=url, parser="lxml")
        except AttributeError:
            continue
        if not og or ("title" not in og and "site_name" not in og and "description" not in og and "image" not in og):
            continue
        try:
            title = og.title if "title" in og else og.site_name if "site_name" in og else ""
            description = og.description if "description" in og else ""
            image = og.image if "image" in og and not content.is_nsfw else ""
            try:
                with transaction.atomic():
                    opengraph = OpenGraphCache.objects.create(
                        url=url,
                        title=truncate_letters(safe_text(title), 250),
                        description=safe_text(description),
                        image=safe_text(image),
                    )
            except DataError:
                continue
        except IntegrityError:
            # Some other process got ahead of us
            opengraph = OpenGraphCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(opengraph=opengraph)
            return opengraph
        Content.objects.filter(id=content.id).update(opengraph=opengraph)
        return opengraph
    return False
previews.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fetch_oembed_preview(content, urls):
    """Fetch first oembed content for a list of urls."""
    for url in urls:
        # See first if recently cached already
        if OEmbedCache.objects.filter(url=url, modified__gte=now()-datetime.timedelta(days=7)).exists():
            oembed = OEmbedCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(oembed=oembed)
            return oembed
        # Fetch oembed
        options = {}
        if url.startswith("https://twitter.com/"):
            # This probably has little effect since we fetch these on the backend...
            # But, DNT is always good to communicate if possible :)
            options = {"dnt": "true"}
        try:
            oembed = PyEmbed(discoverer=OEmbedDiscoverer()).embed(url, **options)
        except (PyEmbedError, PyEmbedDiscoveryError, PyEmbedConsumerError, ValueError):
            continue
        if not oembed:
            continue
        # Ensure width is 100% not fixed
        oembed = re.sub(r'width="[0-9]*"', 'width="100%"', oembed)
        oembed = re.sub(r'height="[0-9]*"', "", oembed)
        try:
            with transaction.atomic():
                oembed = OEmbedCache.objects.create(url=url, oembed=oembed)
        except IntegrityError:
            # Some other process got ahead of us
            oembed = OEmbedCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(oembed=oembed)
            return oembed
        Content.objects.filter(id=content.id).update(oembed=oembed)
        return oembed
    return False
test_models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
            self.public_content.save()
            self.assertTrue(self.public_content.edited)
test_models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_dict_for_view_edited_post(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
            self.public_content.save()
            self.assertEqual(self.public_content.dict_for_view(self.user), {
                "author": self.public_content.author_id,
                "author_guid": self.public_content.author.guid,
                "author_handle": self.public_content.author.handle,
                "author_home_url": self.public_content.author.home_url,
                "author_image": self.public_content.author.safer_image_url_small,
                "author_is_local": bool(self.public_content.author.user),
                "author_name": self.public_content.author.handle,
                "author_profile_url": self.public_content.author.get_absolute_url(),
                "content_type": self.public_content.content_type.string_value,
                "delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}),
                "detail_url": self.public_content.get_absolute_url(),
                "formatted_timestamp": self.public_content.timestamp,
                "guid": self.public_content.guid,
                "has_shared": False,
                "humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp,
                "id": self.public_content.id,
                "is_authenticated": True,
                "is_author": True,
                "is_following_author": False,
                "parent": "",
                "profile_id": self.public_content.author.id,
                "rendered": self.public_content.rendered,
                "reply_count": 0,
                "reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}),
                "shares_count": 0,
                "slug": self.public_content.slug,
                "through": self.public_content.id,
                "update_url": reverse("content:update", kwargs={"pk": self.public_content.id}),
            })
test_previews.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_if_cached_already_but_older_than_7_days_then_fetch(self, og):
        with freeze_time(datetime.date.today() - datetime.timedelta(days=8)):
            OpenGraphCacheFactory(url=self.urls[0])
        fetch_og_preview(self.content, self.urls)
        og.assert_called_once_with(url=self.urls[0], parser="lxml")
test_previews.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_cache_updated_if_previous_found_older_than_7_days(self, embed):
        with freeze_time(datetime.date.today() - datetime.timedelta(days=8)):
            OEmbedCacheFactory(url=self.urls[0])
        fetch_oembed_preview(self.content, self.urls)
        embed.assert_called_once_with(self.urls[0])
models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def edited(self):
        """Determine whether Content has been edited.

        Because we do multiple saves in some cases on creation, for example for oEmbed or OpenGraph,
        and a remote content could be delivered multiple times within a short time period, for example via
        relay and original node, we allow 15 minutes before deciding that the content has been edited.

        TODO: it would make sense to store an "edited" flag on the model itself.
        """
        return self.modified > self.created + datetime.timedelta(minutes=15)
test_timer_collector.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_captures_and_measures_elapsed_time(seconds):
    with capture_result_collected() as captured:
        with freeze_time('2016-09-22 15:57:01') as frozen_time:
            with TimeCollector():
                frozen_time.tick(timedelta(seconds=seconds))
    assert len(captured.calls) == 1
    assert pytest.approx(seconds) == captured.calls[0]['results'][0].value
test_timelimit.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_can_limit_elapsed_seconds(seconds):
    with freeze_time('2016-09-22 15:57:01') as frozen_time:
        with pytest.raises(LimitViolationError) as excinfo:
            with TimeLimit(total=0):
                frozen_time.tick(timedelta(seconds=seconds))
    assert excinfo.value.base_error_msg == \
        'Too many ({}) total elapsed seconds (limit: 0)'.format(seconds)
test_integrates_with_django_testrunner.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def code_that_fails(self):
        self.frozen_time.tick(timedelta(seconds=5))
test_template_limit_blocks.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __str__(self):
        self.frozen_time.tick(timedelta(seconds=self.render_in_seconds))
        return 'rendered slowly in {} seconds'.format(
            self.render_in_seconds)
iam_checks.py 文件源码 项目:ThreatPrep 作者: ThreatResponse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test(self):
        last_used_times = []
        if self.user_dict['access_key_1_active'] == 'true':
            last_used_times.append(
                dateutil.parser.parse(
                    self.user_dict['access_key_1_last_used_date']
                )
            )
        if self.user_dict['access_key_2_active'] == 'true':
            last_used_times.append(
                dateutil.parser.parse(
                    self.user_dict['access_key_2_last_used_date']
                )
            )
        if self.user_dict['password_enabled'] in ['true', 'not_supported'] and \
            self.user_dict['password_last_used'] != 'no_information':
            last_used_times.append(
                dateutil.parser.parse(
                    self.user_dict['password_last_used']
                )
            )
        if len(last_used_times) == 0:
            self.reason = 'Account has never been used'
            self.status = common.CheckState.FAIL
            return
        last_used = max(last_used_times)
        now = datetime.datetime.utcnow()
        now = now.replace(tzinfo=last_used.tzinfo)
        delta = datetime.timedelta(days=config.config['ACCOUNT_INACTIVE_DAYS'])
        difference = now - last_used
        if delta < difference:
            self.reason = 'Account last used {0} days ago.'.format(difference.days)
            self.status = common.CheckState.FAIL
        else:
            self.status = common.CheckState.PASS


问题


面经


文章

微信
公众号

扫码关注公众号