def test_partial_month(self):
start = datetime.datetime(
year=1991,
month=1,
day=1,
hour=0,
minute=0,
tzinfo=pytz.utc)
# 1992 and 1996 were leap years
total_days = 365 * 5 + 2
end = start + datetime.timedelta(days=total_days)
sim_params90s = SimulationParameters(
period_start=start,
period_end=end,
env=self.env,
)
returns = factory.create_returns_from_range(sim_params90s)
returns = returns[:-10] # truncate the returns series to end mid-month
metrics = risk.RiskReport(returns, sim_params90s, env=self.env)
total_months = 60
self.check_metrics(metrics, total_months, start)
python类timedelta()的实例源码
def sell_positions(self):
q = Query()
test_func = lambda closed: not closed
docs = self.position_db.search(q.closed.test(test_func))
# Sell and remove position if >1hr old
for doc in docs:
if arrow.get(doc["at"]) < (arrow.now() - datetime.timedelta(hours=1)):
self.logger.log("Trader/Seller", "informative", "Selling position for contract " + doc["contract_id"] + "!")
if self.web_interface.have_position_in_market(doc["contract_id"]):
self.web_interface.sell(doc["contract_id"], doc["side"], doc["amount"])
self.position_db.update({ "closed": True }, eids=[doc.eid])
# Make a trade based on the result
def __init__(
self,
interval_in_seconds,
service_name,
result_dict,
max_delay_seconds,
disable=False
):
super(SensuAlertManager, self).__init__(interval_in_seconds)
self._service_name = service_name
self._setup_ok_result_dict(result_dict)
self._setup_delayed_result_dict()
self._setup_disabled_alert_dict()
self._log = logging.getLogger('{}.util.sensu_alert_manager'.format(service_name))
self._disable = disable
self._should_send_sensu_disabled_message = False
self._max_delay = timedelta(seconds=max_delay_seconds)
def __init__(self, session, api_id, api_hash,
proxy=None, timeout=timedelta(seconds=5)):
"""Initializes the Telegram client with the specified API ID and Hash.
Session must always be a Session instance, and an optional proxy
can also be specified to be used on the connection.
"""
self.session = session
self.api_id = int(api_id)
self.api_hash = api_hash
self.proxy = proxy
self._timeout = timeout
self._logger = logging.getLogger(__name__)
# Cache "exported" senders 'dc_id: TelegramBareClient' and
# their corresponding sessions not to recreate them all
# the time since it's a (somewhat expensive) process.
self._cached_clients = {}
# These will be set later
self.dc_options = None
self._sender = None
# endregion
# region Connecting
def login():
db = UserDb(app.config['LOCAL_DB'])
form = request.form
user = form.get('user')
pwd = form.get('pwd')
password = db.login(user)
del db
if pwd == password:
# ??????
session.permanent = True
# session????
app.permanent_session_lifetime = timedelta(minutes=30)
session.update(dict(user=user))
return render_template('index.html')
elif password is None:
return render_template('login.html', info="??????!")
else:
return render_template('login.html', info="?????!")
def itermonthdates(self, year, month):
"""
Return an iterator for one month. The iterator will yield datetime.date
values and will always iterate through complete weeks, so it will yield
dates outside the specified month.
"""
date = datetime.date(year, month, 1)
# Go back to the beginning of the week
days = (date.weekday() - self.firstweekday) % 7
date -= datetime.timedelta(days=days)
oneday = datetime.timedelta(days=1)
while True:
yield date
date += oneday
if date.month != month and date.weekday() == self.firstweekday:
break
def use_testing_credentials(args, credentials):
print("Skipping AWS API calls because AWSMFA_TESTING_MODE is set.",
file=sys.stderr)
# AWS returns offset-aware UTC times, so we fake that in order to
# verify consistent code paths between py2 and py3 datetime.
fake_expiration = (datetime.datetime.now(tz=pytz.utc) +
datetime.timedelta(minutes=5))
fake_credentials = {
'AccessKeyId': credentials.get(args.identity_profile,
'aws_access_key_id'),
'SecretAccessKey': credentials.get(args.identity_profile,
'aws_secret_access_key'),
'SessionToken': "420",
'Expiration': fake_expiration,
}
print_expiration_time(fake_expiration)
update_credentials_file(args.aws_credentials,
args.target_profile,
args.identity_profile,
credentials,
fake_credentials)
def is_dst(zonename):
"""Check if current time in a time zone is in dst.
From: http://stackoverflow.com/a/19778845/1489738
"""
tz = pytz.timezone(zonename)
now = pytz.utc.localize(datetime.datetime.utcnow())
return now.astimezone(tz).dst() != datetime.timedelta(0)
def fetch_og_preview(content, urls):
"""Fetch first opengraph entry for a list of urls."""
for url in urls:
# See first if recently cached already
if OpenGraphCache.objects.filter(url=url, modified__gte=now() - datetime.timedelta(days=7)).exists():
opengraph = OpenGraphCache.objects.get(url=url)
Content.objects.filter(id=content.id).update(opengraph=opengraph)
return opengraph
try:
og = OpenGraph(url=url, parser="lxml")
except AttributeError:
continue
if not og or ("title" not in og and "site_name" not in og and "description" not in og and "image" not in og):
continue
try:
title = og.title if "title" in og else og.site_name if "site_name" in og else ""
description = og.description if "description" in og else ""
image = og.image if "image" in og and not content.is_nsfw else ""
try:
with transaction.atomic():
opengraph = OpenGraphCache.objects.create(
url=url,
title=truncate_letters(safe_text(title), 250),
description=safe_text(description),
image=safe_text(image),
)
except DataError:
continue
except IntegrityError:
# Some other process got ahead of us
opengraph = OpenGraphCache.objects.get(url=url)
Content.objects.filter(id=content.id).update(opengraph=opengraph)
return opengraph
Content.objects.filter(id=content.id).update(opengraph=opengraph)
return opengraph
return False
def fetch_oembed_preview(content, urls):
"""Fetch first oembed content for a list of urls."""
for url in urls:
# See first if recently cached already
if OEmbedCache.objects.filter(url=url, modified__gte=now()-datetime.timedelta(days=7)).exists():
oembed = OEmbedCache.objects.get(url=url)
Content.objects.filter(id=content.id).update(oembed=oembed)
return oembed
# Fetch oembed
options = {}
if url.startswith("https://twitter.com/"):
# This probably has little effect since we fetch these on the backend...
# But, DNT is always good to communicate if possible :)
options = {"dnt": "true"}
try:
oembed = PyEmbed(discoverer=OEmbedDiscoverer()).embed(url, **options)
except (PyEmbedError, PyEmbedDiscoveryError, PyEmbedConsumerError, ValueError):
continue
if not oembed:
continue
# Ensure width is 100% not fixed
oembed = re.sub(r'width="[0-9]*"', 'width="100%"', oembed)
oembed = re.sub(r'height="[0-9]*"', "", oembed)
try:
with transaction.atomic():
oembed = OEmbedCache.objects.create(url=url, oembed=oembed)
except IntegrityError:
# Some other process got ahead of us
oembed = OEmbedCache.objects.get(url=url)
Content.objects.filter(id=content.id).update(oembed=oembed)
return oembed
Content.objects.filter(id=content.id).update(oembed=oembed)
return oembed
return False
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self):
with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
self.public_content.save()
self.assertTrue(self.public_content.edited)
def test_dict_for_view_edited_post(self):
with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
self.public_content.save()
self.assertEqual(self.public_content.dict_for_view(self.user), {
"author": self.public_content.author_id,
"author_guid": self.public_content.author.guid,
"author_handle": self.public_content.author.handle,
"author_home_url": self.public_content.author.home_url,
"author_image": self.public_content.author.safer_image_url_small,
"author_is_local": bool(self.public_content.author.user),
"author_name": self.public_content.author.handle,
"author_profile_url": self.public_content.author.get_absolute_url(),
"content_type": self.public_content.content_type.string_value,
"delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}),
"detail_url": self.public_content.get_absolute_url(),
"formatted_timestamp": self.public_content.timestamp,
"guid": self.public_content.guid,
"has_shared": False,
"humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp,
"id": self.public_content.id,
"is_authenticated": True,
"is_author": True,
"is_following_author": False,
"parent": "",
"profile_id": self.public_content.author.id,
"rendered": self.public_content.rendered,
"reply_count": 0,
"reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}),
"shares_count": 0,
"slug": self.public_content.slug,
"through": self.public_content.id,
"update_url": reverse("content:update", kwargs={"pk": self.public_content.id}),
})
def test_if_cached_already_but_older_than_7_days_then_fetch(self, og):
with freeze_time(datetime.date.today() - datetime.timedelta(days=8)):
OpenGraphCacheFactory(url=self.urls[0])
fetch_og_preview(self.content, self.urls)
og.assert_called_once_with(url=self.urls[0], parser="lxml")
def test_cache_updated_if_previous_found_older_than_7_days(self, embed):
with freeze_time(datetime.date.today() - datetime.timedelta(days=8)):
OEmbedCacheFactory(url=self.urls[0])
fetch_oembed_preview(self.content, self.urls)
embed.assert_called_once_with(self.urls[0])
def edited(self):
"""Determine whether Content has been edited.
Because we do multiple saves in some cases on creation, for example for oEmbed or OpenGraph,
and a remote content could be delivered multiple times within a short time period, for example via
relay and original node, we allow 15 minutes before deciding that the content has been edited.
TODO: it would make sense to store an "edited" flag on the model itself.
"""
return self.modified > self.created + datetime.timedelta(minutes=15)
test_timer_collector.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_captures_and_measures_elapsed_time(seconds):
with capture_result_collected() as captured:
with freeze_time('2016-09-22 15:57:01') as frozen_time:
with TimeCollector():
frozen_time.tick(timedelta(seconds=seconds))
assert len(captured.calls) == 1
assert pytest.approx(seconds) == captured.calls[0]['results'][0].value
def test_can_limit_elapsed_seconds(seconds):
with freeze_time('2016-09-22 15:57:01') as frozen_time:
with pytest.raises(LimitViolationError) as excinfo:
with TimeLimit(total=0):
frozen_time.tick(timedelta(seconds=seconds))
assert excinfo.value.base_error_msg == \
'Too many ({}) total elapsed seconds (limit: 0)'.format(seconds)
test_integrates_with_django_testrunner.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def code_that_fails(self):
self.frozen_time.tick(timedelta(seconds=5))
test_template_limit_blocks.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def __str__(self):
self.frozen_time.tick(timedelta(seconds=self.render_in_seconds))
return 'rendered slowly in {} seconds'.format(
self.render_in_seconds)
def test(self):
last_used_times = []
if self.user_dict['access_key_1_active'] == 'true':
last_used_times.append(
dateutil.parser.parse(
self.user_dict['access_key_1_last_used_date']
)
)
if self.user_dict['access_key_2_active'] == 'true':
last_used_times.append(
dateutil.parser.parse(
self.user_dict['access_key_2_last_used_date']
)
)
if self.user_dict['password_enabled'] in ['true', 'not_supported'] and \
self.user_dict['password_last_used'] != 'no_information':
last_used_times.append(
dateutil.parser.parse(
self.user_dict['password_last_used']
)
)
if len(last_used_times) == 0:
self.reason = 'Account has never been used'
self.status = common.CheckState.FAIL
return
last_used = max(last_used_times)
now = datetime.datetime.utcnow()
now = now.replace(tzinfo=last_used.tzinfo)
delta = datetime.timedelta(days=config.config['ACCOUNT_INACTIVE_DAYS'])
difference = now - last_used
if delta < difference:
self.reason = 'Account last used {0} days ago.'.format(difference.days)
self.status = common.CheckState.FAIL
else:
self.status = common.CheckState.PASS