def get_price_quote(self, d=None, column='adj_close'):
"""
Get the price of an Asset.
:param date or datetime d: The datetime of when to retrieve the price quote from.
(default: ``date.today()``)
:param str column: The header of the column to use to get the price quote from.
(default: ``adj_close``
:return: namedtuple with price and the the datetime
:rtype: namedtuple
"""
quote = namedtuple('Quote', 'price time')
if d is None:
df = web.get_quote_yahoo(self.ticker)
d = date.today()
time = dt_utils.parse_date(df['time'][0]).time()
dt = datetime.combine(d, time=time)
return quote(price=df['last'], time=dt)
else:
price = self.ohlcv.ix[d][column][0]
return quote(price=price, time=d)
python类combine()的实例源码
def test_combine(self):
d = date(2002, 3, 4)
t = time(18, 45, 3, 1234)
expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234)
combine = self.theclass.combine
dt = combine(d, t)
self.assertEqual(dt, expected)
dt = combine(time=t, date=d)
self.assertEqual(dt, expected)
self.assertEqual(d, dt.date())
self.assertEqual(t, dt.time())
self.assertEqual(dt, combine(dt.date(), dt.time()))
self.assertRaises(TypeError, combine) # need an arg
self.assertRaises(TypeError, combine, d) # need two args
self.assertRaises(TypeError, combine, t, d) # args reversed
self.assertRaises(TypeError, combine, d, t, 1) # too many args
self.assertRaises(TypeError, combine, "date", "time") # wrong types
self.assertRaises(TypeError, combine, d, "time") # wrong type
self.assertRaises(TypeError, combine, "date", t) # wrong type
def create_future_events(self, date_stop=None):
if not self.date_stop and not date_stop:
raise ValidationError(_("Stop date should be specified."))
date_stop = min(filter(None, [date_stop, self.date_stop]))
current_date = max(self.time_start, timezone.now())
last_event = self.events.order_by('-date_start').first()
if last_event:
current_date = max(current_date, last_event.date_start + timedelta(days=1))
current_date = make_aware(datetime.combine(current_date, time.min))
added_events = []
for day in iter_daterange(current_date, date_stop):
if not getattr(self, 'on_day%d' % day.weekday()):
continue
event = self.gen_future_event(day)
event.full_clean()
event.save(force_insert=True)
added_events.append(event)
return added_events
def format_date(utc, isoformat=False):
"""Parse Twitter's UTC date into UTC or local time."""
u = datetime.strptime(utc.replace('+0000','UTC'), '%a %b %d %H:%M:%S %Z %Y')
# This is the least painful way I could find to create a non-naive
# datetime including a UTC timezone. Alternative suggestions
# welcome.
unew = datetime.combine(u.date(), time(u.time().hour,
u.time().minute, u.time().second, tzinfo=UTC))
# Convert to localtime
unew = unew.astimezone(Local)
if isoformat:
return unew.isoformat()
else:
return unew.strftime('%Y-%m-%d %H:%M:%S %Z')
def format_date(utc, isoformat=False):
"""Parse Twitter's UTC date into UTC or local time."""
u = datetime.strptime(utc.replace('+0000','UTC'), '%a %b %d %H:%M:%S %Z %Y')
# This is the least painful way I could find to create a non-naive
# datetime including a UTC timezone. Alternative suggestions
# welcome.
unew = datetime.combine(u.date(), time(u.time().hour,
u.time().minute, u.time().second, tzinfo=UTC))
# Convert to localtime
unew = unew.astimezone(Local)
if isoformat:
return unew.isoformat()
else:
return unew.strftime('%Y-%m-%d %H:%M:%S %Z')
payments.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_govuk_capture_time(self, govuk_payment):
try:
capture_submit_time = parse_datetime(
govuk_payment['settlement_summary'].get('capture_submit_time', '')
)
captured_date = parse_date(
govuk_payment['settlement_summary'].get('captured_date', '')
)
if captured_date is not None:
capture_submit_time = (
capture_submit_time or timezone.now()
).astimezone(timezone.utc)
if capture_submit_time.date() < captured_date:
return datetime.combine(
captured_date, time.min
).replace(tzinfo=timezone.utc)
elif capture_submit_time.date() > captured_date:
return datetime.combine(
captured_date, time.max
).replace(tzinfo=timezone.utc)
else:
return capture_submit_time
except (KeyError, TypeError):
pass
raise GovUkPaymentStatusException(
'Capture date not yet available for payment %s' % govuk_payment['reference']
)
def __get_occur_time_list(self, weekday_table, course_time_table):
occur_time_list = []
for split_time_string in self.__occur_time_str.split():
weekday_chinese_str = split_time_string[0]
try:
weekday = weekday_table[weekday_chinese_str]
except KeyError:
continue
occur_indexes = re.findall(pattern=r'[0-9]+', string=split_time_string)
start_index = int(occur_indexes[0]) - 1
end_index = int(occur_indexes[1]) - 1
occur_time_list_in_a_day = [datetime.combine(date=weekday, time=course_time_table[i])
for i in range(start_index, end_index + 1)]
occur_time_list.append(occur_time_list_in_a_day)
return occur_time_list
def get_user_free_intervals(account, day, start = time(0,0,0), end = time(23,59,59)):
busy_intervals = []
weekday = day.strftime("%a")[0:2] # Get the first two characters of weekday string
tasks = account.tasks.filter(category__in=[0,3,5,6], repeat__contains=weekday, )
busy_intervals += [(task.start.time(), task.end.time()) for task in tasks]
# all_non_repeat_tasks = account.tasks.filter(category=6, start__year=day.year, start__month=day.month, start__day=day.day)
all_non_repeat_tasks = account.tasks.filter(category=6, start=day)
busy_intervals += [(task.start.time(), task.end.time()) for task in all_non_repeat_tasks]
intervals = combine(busy_intervals) # union
# compute the complement of all the intervals
free_intervals = complement(intervals, first=start, last=end)
return free_intervals
# Copied from http://nullege.com/codes/search/Intervals.complement
def Create(user, title, due=None):
if not due:
tz = user.get_timezone()
local_now = tools.local_time(tz)
task_prefs = user.get_setting_prop(['tasks', 'preferences'], {})
same_day_hour = tools.safe_number(task_prefs.get('same_day_hour', 16), default=16, integer=True)
due_hour = tools.safe_number(task_prefs.get('due_hour', 22), default=22, integer=True)
schedule_for_same_day = local_now.hour < same_day_hour
dt_due = local_now
if due_hour > 23:
due_hour = 0
dt_due += timedelta(days=1)
if due_hour < 0:
due_hour = 0
time_due = time(due_hour, 0)
due = datetime.combine(dt_due.date(), time_due)
if not schedule_for_same_day:
due += timedelta(days=1)
if due:
due = tools.server_time(tz, due)
return Task(title=tools.capitalize(title), dt_due=due, parent=user.key)
def _tasks_request(self):
tasks = Task.Recent(self.user)
tasks_undone = []
n_done = Task.CountCompletedSince(self.user, datetime.combine(datetime.today(), time(0,0)))
for task in tasks:
if not task.is_done():
tasks_undone.append(task.title)
if n_done:
text = "You've completed %d %s for today." % (n_done, tools.pluralize('task', n_done))
else:
text = "You haven't completed any tasks yet."
if tasks_undone:
text += " You still need to do %s." % tools.english_list(tasks_undone)
if not n_done and not tasks_undone:
text += " Try adding tasks by saying 'add task Q2 planning'"
return text
def to_jalali(cls, year, month=None, day=None, hour=None, minute=None,
second=None, microsecond=None, tzinfo=None):
if month is None and isinstance(year, dt):
month = year.month
day = year.day
hour = year.hour
minute = year.minute
second = year.second
microsecond = year.microsecond
tzinfo = year.tzinfo
year = year.year
j_date = JalaliDate.to_jalali(year, month, day)
return cls.combine(j_date,
_time(hour=hour, minute=minute, second=second,
microsecond=microsecond,
tzinfo=tzinfo))
def __add__(self, other):
if not isinstance(other, timedelta):
return NotImplemented
delta = timedelta(self.toordinal(),
hours=self._hour,
minutes=self._minute,
seconds=self._second,
microseconds=self._microsecond)
delta += other
hour, rem = divmod(delta.seconds, 3600)
minute, second = divmod(rem, 60)
if 0 < delta.days <= _MAXORDINAL:
return JalaliDateTime.combine(JalaliDate.fromordinal(delta.days),
_time(hour, minute, second,
delta.microseconds,
tzinfo=self._tzinfo))
raise OverflowError("result out of range")
def parse_datetime(datetimestring):
'''
Parses ISO 8601 date-times into datetime.datetime objects.
This function uses parse_date and parse_time to do the job, so it allows
more combinations of date and time representations, than the actual
ISO 8601:2004 standard allows.
'''
try:
datestring, timestring = datetimestring.split('T')
except ValueError:
raise ISO8601Error("ISO 8601 time designator 'T' missing. Unable to"
" parse datetime string %r" % datetimestring)
tmpdate = parse_date(datestring)
tmptime = parse_time(timestring)
return datetime.combine(tmpdate, tmptime)
def get_queryset(self):
"""returns actions"""
start_date = self.request.GET.get('start')
end_date = self.request.GET.get('end')
if not start_date or not end_date:
raise ParseError("Period frame missing")
queryset = self.model.objects.all()
queryset = self._apply_in_action_type_lookup(queryset)
queryset = self._apply_in_charge_lookup(queryset)
try:
start_date = self._parse_date(start_date)
end_date = self._parse_date(end_date)
except ValueError:
raise ParseError("Invalid period frame")
start_datetime = datetime.combine(start_date, time.min)
end_datetime = datetime.combine(end_date, time.max)
if end_datetime < start_datetime:
return self.model.objects.none()
queryset = queryset.filter(
Q(planned_date__lte=start_datetime, end_datetime__gte=end_datetime) | # starts before, ends after period
Q(planned_date__gte=start_datetime, end_datetime__lte=end_datetime) | # starts and ends during period
Q(planned_date__lte=start_datetime, end_datetime__gte=start_datetime) | # starts before, ends during
Q(planned_date__lte=end_datetime, end_datetime__gte=end_datetime) | # starts during period, ends after
Q(
planned_date__gte=start_datetime,
end_datetime__isnull=True,
planned_date__lte=end_datetime
) # no end, starts during period
)
return queryset
def _get_datetimes(self):
"""return selected date times"""
start_date, end_date = get_date_bounds(self.value)
return datetime.combine(start_date, time.min), datetime.combine(end_date, time.max)
def form_valid(self, form):
"""create a new sale"""
analysis_code = form.cleaned_data['analysis_code']
planned_date = datetime.combine(form.cleaned_data['date'], time.min)
amount = form.cleaned_data['amount']
vat_rate = form.cleaned_data['vat_rate']
action = Action.objects.create(type=analysis_code.action_type, planned_date=planned_date)
if action.sale:
action.sale.analysis_code = analysis_code
action.sale.save()
SaleItem.objects.create(
sale=action.sale, pre_tax_price=amount, text=analysis_code.name, vat_rate=vat_rate, quantity=Decimal(1)
)
return super(AddExtraSaleView, self).form_valid(form)
def download_files(start_date, number_of_days, lon, lat, target_directory):
print('Downloading files...')
if not os.path.exists(f'{target_directory}/originals'):
os.makedirs(f'{target_directory}/originals')
target = '%s/originals/{}' % target_directory
pathlib.Path(target_directory).mkdir(parents=True, exist_ok=True)
start_datetime = datetime.combine(start_date, time.min)
first_day = int(start_datetime.timestamp() / 86400) # days since epoch
BASE_URL = 'https://oceancolor.gsfc.nasa.gov/cgi/browse.pl'
GET_FILE_URL = 'https://oceandata.sci.gsfc.nasa.gov/cgi/getfile/'
url = BASE_URL + '?sub=level1or2list&sen=am&per=DAY&day={}&prm=CHL&n={}&s={}&w={}&e={}'
for d in range(first_day, first_day + number_of_days):
_url = url.format(d, lat[1], lat[0], lon[0], lon[1])
_data = requests.get(_url)
if _data:
content = _data.content
all_a_href = re.findall(r'(?<=<a href=")[^"]*', str(content))
for a_href in all_a_href:
# if 'getfile' in a_href and any((True for x in ['OC', 'SST'] if x in a_href)):
if 'file' in a_href:
try:
response = requests.get(BASE_URL + a_href, timeout=(3, 60))
for link in re.findall(r'(?<=<a href=")[^"]*', str(response.content)):
if 'LAC_OC.nc' in link:
filename = link.split('/')[-1]
r = requests.get(link)
if not os.path.exists(target.format(filename)):
with open(target.format(filename), 'wb') as f:
f.write(r.content)
print('downloaded file {}'.format(filename))
except Exception as e:
print('Failed to download file due to: {}'.format(e))
print('Done downloading files...')
def read(self):
out = {k: None for k in self.FIELDS + self.EXTRA_FIELD}
start = datetime.now()
self.ser.reset_input_buffer()
while not all(out.values()):
line = self.readline()
if (datetime.now() - start).total_seconds() > self.timeout:
break
line = re.sub(r'[\x00-\x1F]|\r|\n|\t|\$', "", line)
cmd = line.split(',')[0]
if cmd not in ['GNGGA', 'GNRMC']:
continue
try:
msg = pynmea2.parse(line)
for key in out:
if hasattr(msg, key):
out[key] = getattr(msg, key)
except pynmea2.ParseError as e:
print("Parse error:", e)
if out['datestamp'] is not None and out['timestamp'] is not None:
timestamp = datetime.combine(out['datestamp'], out['timestamp']).replace(tzinfo=timezone.utc)
out['timestamp'] = timestamp.isoformat()
else:
del out['timestamp']
if out[self.FIELDS[-1]] is not None:
out[self.FIELDS[-1]] *= self.KNOTS_PER_KMPH
if out.get('latitude') is not None and out.get('longitude') is not None:
if out['latitude'] != 0.0 and out['longitude'] != 0.0:
out['pos'] = {
'type': 'Point',
'coordinates': [out['longitude'], out['latitude']]
}
del out['latitude']
del out['longitude']
for f in self.EXTRA_FIELD:
if f in out:
del out[f]
return out
def test_datetime_date_support(self):
today = date.today()
self.DatetimeTest.objects.create(test_id=2, created_at=today)
dt2 = self.DatetimeTest.objects(test_id=2).first()
self.assertEqual(dt2.created_at.isoformat(), datetime(today.year, today.month, today.day).isoformat())
result = self.DatetimeTest.objects.all().allow_filtering().filter(test_id=2).first()
self.assertEqual(result.created_at, datetime.combine(today, datetime.min.time()))
result = self.DatetimeTest.objects.all().allow_filtering().filter(test_id=2, created_at=today).first()
self.assertEqual(result.created_at, datetime.combine(today, datetime.min.time()))
def strftime(self, _date, _format):
"""Convert date to string according to format
:param date _date: date to convert
:param str _format: format to use (same as for datetime.strftime)
:return str: converted date
"""
_datetime = datetime.combine(_date, datetime.min.time())
return _datetime.strftime(_format)