def get_lookback(lookback_length):
"""Take a string from the user and return a datetime.timedelta object."""
time_units = {'weeks': 0, 'days': 0, 'hours': 0}
time_shortunits = [x[0] for x in time_units.keys()]
lookback_unit = lookback_length[-1]
lookback_value = lookback_length[:-1]
try:
lookback_unit = next(i for i in list(time_units) if
i.startswith(lookback_unit))
except StopIteration:
panic("hs_history_lookback time unit {lookback_unit} is not "
"suppported. Please choose one of: "
"{time_shortunits}.".format(**locals()))
try:
time_units[lookback_unit] = int(lookback_value)
except ValueError as exc:
panic("hs_history_lookback should be an integer value followed by a "
"single time unit element from {time_shortunits}.\n"
"ValueError: {exc}".format(**locals()))
lookback_timedelta = timedelta(weeks=time_units['weeks'],
days=time_units['days'],
hours=time_units['hours'])
return lookback_timedelta
python类timedelta()的实例源码
def getMaxBuildAge(channel, version_overall = False):
import datetime
if channel == 'release':
return datetime.timedelta(weeks=12)
elif channel == 'beta':
return datetime.timedelta(weeks=4)
elif channel == 'aurora':
if version_overall:
return datetime.timedelta(weeks=9)
else:
return datetime.timedelta(weeks=2)
elif channel == 'nightly':
if version_overall:
return datetime.timedelta(weeks=9)
else:
return datetime.timedelta(weeks=1)
else:
return datetime.timedelta(days=365); # almost forever
def from_frontend_value(key, value):
"""Returns a `SiteConfiguration` object value for the relevant `key` and
JSON-serializable `value`, applying any transformation reversed by to_frontend_value."""
if key == NICETIES_OPEN:
from datetime import timedelta
return timedelta(days=value)
elif key == CLOSING_TIME:
from datetime import datetime
return datetime.strptime(value, '%H:%M').time()
elif key == CLOSING_BUFFER:
from datetime import timedelta
return timedelta(minutes=value)
elif key == CACHE_TIMEOUT:
from datetime import timedelta
return timedelta(seconds=value)
elif key == INCLUDE_FACULTY:
return value
elif key == INCLUDE_RESIDENTS:
return value
else:
raise ValueError('No such config key!')
def format_time(x):
"""Formats date values
This function formats :class:`datetime.datetime` and
:class:`datetime.timedelta` objects (and the corresponding numpy objects)
using the :func:`xarray.core.formatting.format_timestamp` and the
:func:`xarray.core.formatting.format_timedelta` functions.
Parameters
----------
x: object
The value to format. If not a time object, the value is returned
Returns
-------
str or `x`
Either the formatted time object or the initial `x`"""
if isinstance(x, (datetime64, datetime)):
return format_timestamp(x)
elif isinstance(x, (timedelta64, timedelta)):
return format_timedelta(x)
elif isinstance(x, ndarray):
return list(x) if x.ndim else x[()]
return x
def __init__(self, description, default=relativedelta()):
"""
default: may either be provided as a:
* datetime.date object
* string in ISO format (YYYY-mm-dd)
* datetime.timedelta object. The date will be current - delta
* dateutil.relativedelta object. The date will be current - delta
If not specified, it will be the current date.
Note that dateutil is not in the Python standard library. It provides a simpler
API to specify a duration in days, weeks, months, etc. You can install it with pip.
"""
self.description = description
self.default = to_date_model(default)
# see Bootstrap date picker docs for options
# https://bootstrap-datepicker.readthedocs.io/en/stable/#
self.attributes = {
'data-date-format': 'yyyy-mm-dd',
'data-date-orientation': 'left bottom',
'data-date-autoclose': 'true',
'value': self.default.iso(),
}
def update_job_statuses(db, jobs):
# job_names_param = ",".join("gasp_" + x.id for x in jobs)
p = subprocess.Popen(["/usr/cluster/bin/sacct", "-u", pwd.getpwuid(os.getuid())[0], "--format", "jobid,state,exitcode,jobname", "--noheader", "-P", "-S", (datetime.date.today() - datetime.timedelta(days=30)).strftime("%Y-%m-%d")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
squeue_out, squeue_err = p.communicate()
fake_data = """29646434 PENDING 0
29646435 COMPLETED 0
"""
# only keep last record for jobs that were re-run
slurm_jobs_found = dict()
for line in squeue_out.rstrip().split("\n"):
if line:
slurm_job = line.strip().split("|")
# strip off "gasp_"
slurm_jobs_found[slurm_job[3][5:]] = slurm_job
for slurm_job in slurm_jobs_found.values():
for j in jobs:
if slurm_job[3][5:] == j.id:
Tracker.update_job_status(db, j, slurm_job[1], slurm_job[2])
break
def schedule2time(schedule):
times = []
start = now = datetime.utcnow()
delta_prev = timedelta(0)
for s in schedule:
name = '{delta}/{period}'.format(**s)
delta = parse_period(s['delta'])
period = parse_period(s['period'])
# note2: we are counting backward
end = start
start = now - period
times.append({"name": name, "start": start, "end": end, "period": period, "delta": delta})
delta_prev = delta
return times
# find the names of existing machines
def stop(self):
"""
To be called when the job is completed. Can be called multiple times, will only be applied once.
Returns
-------
state : EStates
status : EStatuses
duration : datetime.timedelta
Duration of the job
"""
self._set_duration()
if self.state is not AJob.EStates.COMPLETED:
self.status = AJob.EStatuses.FAILURE if self.has_failed() else AJob.EStatuses.SUCCESS
self.progress(AJob.EStates.COMPLETED) # This will also save the job
logger.debug(
"{} terminated in {}s with status '{}'".format(self, self.duration, self.status.label))
return self.state, self.status, self.duration
def __init__(self):
self.ipo=ts.new_stocks()
#print ipo.info()
#????
self.ipo['ipo_date']=self.ipo['ipo_date'].astype('datetime64')
#print ipo.info()
self.start=self.ipo['ipo_date'].values[-1]
self.end=self.ipo['ipo_date'].values[0]
print type(self.end)
#????
#ipo['ipo_date']=ipo['ipo_date'].astype('datetime64')
#self.start_d=datetime.datetime.strptime(self.start,'%Y-%m-%d')
#self.end_d=datetime.datetime.strptime(self.end,'%Y-%m-%d')
#print type(self.start_d)
#period=self.start_d+datetime.timedelta(days=30)
#print period.strftime('%Y-%m-%d')
#print ipo[ipo['ipo_date']<np.datetime64(period)]
def add_timedelta(self, delta):
"""
Add timedelta duration to the instance.
:param delta: The timedelta instance
:type delta: datetime.timedelta
:rtype: Time
"""
if delta.days:
raise TypeError('Cannot timedelta with days to Time.')
return self.add(
seconds=delta.seconds,
microseconds=delta.microseconds
)
def subtract_timedelta(self, delta):
"""
Remove timedelta duration from the instance.
:param delta: The timedelta instance
:type delta: datetime.timedelta
:rtype: Time
"""
if delta.days:
raise TypeError('Cannot timedelta with days to Time.')
return self.subtract(
seconds=delta.seconds,
microseconds=delta.microseconds
)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 47
收藏 0
点赞 0
评论 0
def test_class_ops_dateutil(self):
tm._skip_if_no_dateutil()
from dateutil.tz import tzutc
def compare(x, y):
self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
int(np.round(Timestamp(y).value / 1e9)))
compare(Timestamp.now(), datetime.now())
compare(Timestamp.now('UTC'), datetime.now(tzutc()))
compare(Timestamp.utcnow(), datetime.utcnow())
compare(Timestamp.today(), datetime.today())
current_time = calendar.timegm(datetime.now().utctimetuple())
compare(Timestamp.utcfromtimestamp(current_time),
datetime.utcfromtimestamp(current_time))
compare(Timestamp.fromtimestamp(current_time),
datetime.fromtimestamp(current_time))
date_component = datetime.utcnow()
time_component = (date_component + timedelta(minutes=10)).time()
compare(Timestamp.combine(date_component, time_component),
datetime.combine(date_component, time_component))
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_date_range_normalize(self):
snap = datetime.today()
n = 50
rng = date_range(snap, periods=n, normalize=False, freq='2D')
offset = timedelta(2)
values = np.array([snap + i * offset for i in range(n)],
dtype='M8[ns]')
self.assert_numpy_array_equal(rng, values)
rng = date_range('1/1/2000 08:15', periods=n, normalize=False,
freq='B')
the_time = time(8, 15)
for val in rng:
self.assertEqual(val.time(), the_time)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_timedelta(self):
# this is valid too
index = date_range('1/1/2000', periods=50, freq='B')
shifted = index + timedelta(1)
back = shifted + timedelta(-1)
self.assertTrue(tm.equalContents(index, back))
self.assertEqual(shifted.freq, index.freq)
self.assertEqual(shifted.freq, back.freq)
result = index - timedelta(1)
expected = index + timedelta(-1)
self.assertTrue(result.equals(expected))
# GH4134, buggy with timedeltas
rng = date_range('2013', '2014')
s = Series(rng)
result1 = rng - pd.offsets.Hour(1)
result2 = DatetimeIndex(s - np.timedelta64(100000000))
result3 = rng - np.timedelta64(100000000)
result4 = DatetimeIndex(s - pd.offsets.Hour(1))
self.assertTrue(result1.equals(result4))
self.assertTrue(result2.equals(result3))
def testReadTimeout(self):
"""Test if read timeouts work and group_read operations
group_read operations should never block
"""
tunnel = KNXIPTunnel(gwip)
tunnel.connect()
# Read from some random address and hope nothing responds here
tick = datetime.now()
res = tunnel.group_read(37000, timeout=1)
tock = datetime.now()
diff = tock - tick # the result is a datetime.timedelta object
self.assertTrue(diff.total_seconds() >= 1 and diff.total_seconds() < 3)
self.assertIsNone(res)
# Read from some random address and hope nothing responds here
tick = datetime.now()
res = tunnel.group_read(37000, timeout=5)
tock = datetime.now()
diff = tock - tick # the result is a datetime.timedelta object
self.assertTrue(diff.total_seconds() >= 5 and diff.total_seconds() < 6)
self.assertIsNone(res)
tunnel.disconnect()
def test_lookup_symbol(self):
# Incrementing by two so that start and end dates for each
# generated Asset don't overlap (each Asset's end_date is the
# day after its start date.)
dates = pd.date_range('2013-01-01', freq='2D', periods=5, tz='UTC')
df = pd.DataFrame.from_records(
[
{
'sid': i,
'symbol': 'existing',
'start_date': date.value,
'end_date': (date + timedelta(days=1)).value,
'exchange': 'NYSE',
}
for i, date in enumerate(dates)
]
)
self.env.write_data(equities_df=df)
finder = self.asset_finder_type(self.env.engine)
for _ in range(2): # Run checks twice to test for caching bugs.
with self.assertRaises(SymbolNotFound):
finder.lookup_symbol('NON_EXISTING', dates[0])
with self.assertRaises(MultipleSymbolsFound):
finder.lookup_symbol('EXISTING', None)
for i, date in enumerate(dates):
# Verify that we correctly resolve multiple symbols using
# the supplied date
result = finder.lookup_symbol('EXISTING', date)
self.assertEqual(result.symbol, 'EXISTING')
self.assertEqual(result.sid, i)
def test_offset(self):
""" Test the offset method of FutureChain.
"""
cl = FutureChain(self.asset_finder, lambda: '2005-12-01', 'CL')
# Test that an offset forward sets as_of_date as expected
self.assertEqual(
cl.offset('3 days').as_of_date,
cl.as_of_date + pd.Timedelta(days=3)
)
# Test that an offset backward sets as_of_date as expected, with
# time delta given as str, datetime.timedelta, and pd.Timedelta.
self.assertEqual(
cl.offset('-1000 days').as_of_date,
cl.as_of_date + pd.Timedelta(days=-1000)
)
self.assertEqual(
cl.offset(timedelta(days=-1000)).as_of_date,
cl.as_of_date + pd.Timedelta(days=-1000)
)
self.assertEqual(
cl.offset(pd.Timedelta('-1000 days')).as_of_date,
cl.as_of_date + pd.Timedelta(days=-1000)
)
# An offset of zero should give the original chain.
self.assertEqual(cl[0], cl.offset(0)[0])
self.assertEqual(cl[0], cl.offset("0 days")[0])
# A string that doesn't represent a time delta should raise a
# ValueError.
with self.assertRaises(ValueError):
cl.offset("blah")
def eligible_replays(replays, *, age=None):
"""Filter replays down to just the replays we want to train with.
Parameters
----------
replays : iterable[Replay]
The replays to filter.
age : datetime.timedelta, optional
Only count replays less than this age old.
Yields
------
replay : Replay
The eligible replays in the directory.
Notes
-----
The same beatmap may appear more than once if there are multiple replays
for this beatmap.
"""
for replay in replays:
if age is not None and datetime.utcnow() - replay.timestamp > age:
continue
if not (replay.mode != GameMode.standard or
replay.failed or
replay.autoplay or
replay.auto_pilot or
replay.cinema or
replay.relax or
len(replay.beatmap.hit_objects) < 2):
# ignore plays with mods that are not representative of user skill
yield replay
def utcoffset(self, dt):
""" Offset from UTC.
>>> UTC.utcoffset(None)
datetime.timedelta(0)
"""
return ZERO
def dst(self, dt):
""" DST is not in effect.
>>> UTC.dst(None)
datetime.timedelta(0)
"""
return ZERO
def __init__(self, description, timeuuid, source, source_elapsed, thread_name):
self.description = description
self.datetime = datetime.utcfromtimestamp(unix_time_from_uuid1(timeuuid))
self.source = source
if source_elapsed is not None:
self.source_elapsed = timedelta(microseconds=source_elapsed)
else:
self.source_elapsed = None
self.thread_name = thread_name
def info(self, ctx):
"""Information about the bot"""
msg = await ctx.send('Getting statistics...')
shards = self.bot.shard_count
shard_id = ctx.message.guild.shard_id
guilds = len(list(self.bot.guilds))
users = str(len([m for m in set(self.bot.get_all_members())]))
channels = str(len([m for m in set(self.bot.get_all_channels())]))
# await msg.edit("Getting uptime...")
up = abs(self.bot.uptime - int(time.perf_counter()))
up = str(datetime.timedelta(seconds=up))
data = discord.Embed(title="__**Information**__",
colour=discord.Colour(value=11735575))
data.add_field(name="Version", value="2.5-beta", inline=False)
data.add_field(name="Shard ID", value=ctx.message.guild.shard_id)
data.add_field(name="Total Shards", value=shards)
data.add_field(name="Total Servers", value=guilds)
# data.add_field(name="Servers (total)", value=total_guilds)
data.add_field(name="Users", value=users)
data.add_field(name="Channels", value=channels)
data.add_field(name="Uptime", value="{}".format(up))
data.add_field(name="Support Development",
value="Donate on [Patreon](https://www.patreon.com/franc_ist) or [PayPal](https://paypal.me/MLAutomod/5)")
data.set_footer(
text="Made with \U00002665 by Francis#6565. Support server: https://discord.gg/yp8WpMh")
try:
await msg.edit(content=None, embed=data)
statsd.increment('bot.commands.run', 1)
except discord.HTTPException:
logger.exception("Missing embed links perms")
statsd.increment('bot.commands.errored', 1)
await ctx.send("Looks like the bot doesn't have embed links perms... It kinda needs these, so I'd suggest adding them!")
def uptime(self, ctx):
"""Shows the bot's uptime"""
up = abs(self.bot.uptime - int(time.perf_counter()))
up = str(datetime.timedelta(seconds=up))
await ctx.send("`Uptime: {}`".format(up))
statsd.increment('bot.commands.run', 1)
def get_cur_runtime(self):
"""Returns the how long the test case has been running as a
datetime.timedelta object."""
return datetime.now() - self.test_start_time
# Sorter
def total_seconds(td):
"""
Given a timedelta (*td*) return an integer representing the equivalent of
Python 2.7's :meth:`datetime.timdelta.total_seconds`.
"""
return (((
td.microseconds +
(td.seconds + td.days * 24 * 3600) * 10**6) / 10**6))
# NOTE: This is something I'm investigating as a way to use the new go_async
# module. A work-in-progress. Ignore for now...
def timeout(self):
"""
A `property` that controls how long a key will last before being
automatically removed. May be be given as a `datetime.timedelta`
object or a string like, "1d", "30s" (will be passed through the
`convert_to_timedelta` function).
"""
if not hasattr(self, "_timeout"):
self._timeout = timedelta(hours=1) # Default is 1-hour timeout
return self._timeout
def interval(self):
"""
A `property` that controls how often we check for expired keys. May be
given as milliseconds (integer), a `datetime.timedelta` object, or a
string like, "1d", "30s" (will be passed through the
`convert_to_timedelta` function).
"""
if not hasattr(self, "_interval"):
self._interval = 10000 # Default is every 10 seconds
return self._interval
def interval(self, value):
if isinstance(value, basestring):
value = convert_to_timedelta(value)
if isinstance(value, timedelta):
value = total_seconds(value) * 1000 # PeriodicCallback uses ms
self._interval = value
# Restart the PeriodicCallback
if hasattr(self, '_key_watcher'):
self._key_watcher.stop()
self._key_watcher = PeriodicCallback(
self._timeout_checker, value, io_loop=self.io_loop)
def memorized_timedelta(seconds):
'''Create only one instance of each distinct timedelta'''
try:
return _timedelta_cache[seconds]
except KeyError:
delta = timedelta(seconds=seconds)
_timedelta_cache[seconds] = delta
return delta
def memorized_datetime(seconds):
'''Create only one instance of each distinct datetime'''
try:
return _datetime_cache[seconds]
except KeyError:
# NB. We can't just do datetime.utcfromtimestamp(seconds) as this
# fails with negative values under Windows (Bug #90096)
dt = _epoch + timedelta(seconds=seconds)
_datetime_cache[seconds] = dt
return dt