def __compile_policy_value(self, str_value, value_dict):
value = str_value
if not '$' in value:
return value
self.__logger.debug("Compiling value %s", value)
for key in value_dict:
self.__logger.debug("Searching for key %s", key)
val = value_dict[key]
if id(type) and type(val) in (datetime, date):
self.__logger.debug("Value in dictionary: %s -> %s", key, val)
val = time.mktime(self.__get_localtime(val).timetuple())
self.__logger.debug("Timestamp converted: %s -> %s", key, val)
value = value.replace(self.__get_compiled_key(key), str(val))
self.__logger.debug("Value after convertsion: %s", value)
int_value = int(eval(value))
compiled_value = self.__get_localtime(int_value)
self.__logger.debug("Compiled value: %s", compiled_value)
return compiled_value
python类date()的实例源码
def get_date_to_message_statistic(self, message_statistic):
"""
Maps each date between the date of the first message and the date of
the last message, inclusive, to the sum of the values of a message
statistic over all messages from that date.
Args:
message_statistic: A function mapping a Message object to an int or
a float.
Returns:
date_to_message_statistic: A dict mapping a date object between the
date of the first message and the date of the last message to
the sum of the values of message_statistic over all messages in
self.messages from that date.
"""
start_date = self.messages[0].timestamp.date()
end_date = self.messages[-1].timestamp.date()
date_range = [dt.date() for dt in rrule(DAILY, dtstart=start_date, until=end_date)]
date_to_message_statistic = {d: 0 for d in date_range}
for message in self.messages:
date_to_message_statistic[message.timestamp.date()] += message_statistic(message)
return date_to_message_statistic
def _coerce_datetime(maybe_dt):
if isinstance(maybe_dt, datetime.datetime):
return maybe_dt
elif isinstance(maybe_dt, datetime.date):
return datetime.datetime(
year=maybe_dt.year,
month=maybe_dt.month,
day=maybe_dt.day,
tzinfo=pytz.utc,
)
elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3:
year, month, day = maybe_dt
return datetime.datetime(
year=year,
month=month,
day=day,
tzinfo=pytz.utc,
)
else:
raise TypeError('Cannot coerce %s into a datetime.datetime'
% type(maybe_dt).__name__)
def get_last_trading_day_of_month(self, dt, env):
self.month = dt.month
if dt.month == 12:
# Roll the year foward and start in January.
year = dt.year + 1
month = 1
else:
# Increment the month in the same year.
year = dt.year
month = dt.month + 1
self.last_day = env.previous_trading_day(
dt.replace(year=year, month=month, day=1)
).date()
return self.last_day
# Stateful rules
def _to_ts(val):
if isinstance(val, (int, float)):
return val
elif isinstance(val, (datetime, date)):
return (val-EPOCH).total_seconds()
v = TS_RE.match(val)
if v:
return float(v.group(0))
v = DT_RE.match(val)
if v:
# get all of the passed datetime pieces, don't worry about them being
# terribly valid, the datetime constructor will handle that ;)
v = list(filter(None, v.groups()))
if len(v) == 7:
# truncate to microseconds as necessary
v[6] = v[6][:6]
# extend to microseconds as necessary
v[6] += (6 - len(v[6])) * '0'
v = list(map(int, v))
dt = datetime(*v)
return (dt-EPOCH).total_seconds()
raise Exception("Value %r is not a timestamp, datetime, or date"%(val,))
def itermonthdates(self, year, month):
"""
Return an iterator for one month. The iterator will yield datetime.date
values and will always iterate through complete weeks, so it will yield
dates outside the specified month.
"""
date = datetime.date(year, month, 1)
# Go back to the beginning of the week
days = (date.weekday() - self.firstweekday) % 7
date -= datetime.timedelta(days=days)
oneday = datetime.timedelta(days=1)
while True:
yield date
date += oneday
if date.month != month and date.weekday() == self.firstweekday:
break
def test_order_execute_single_no_remaining(self, fulfill):
self.product.sale_set.create(
price=100,
member=self.member
)
self.product.start_date = datetime.date(year=2017, month=1, day=1)
self.product.quantity = 1
order = Order(self.member, self.room)
item = OrderItem(self.product, order, 1)
order.items.add(item)
with self.assertRaises(NoMoreInventoryError):
order.execute()
fulfill.was_not_called()
def test_order_execute_multi_some_remaining(self, fulfill):
self.product.sale_set.create(
price=100,
member=self.member
)
self.product.start_date = datetime.date(year=2017, month=1, day=1)
self.product.quantity = 2
order = Order(self.member, self.room)
item = OrderItem(self.product, order, 2)
order.items.add(item)
with self.assertRaises(NoMoreInventoryError):
order.execute()
fulfill.was_not_called()
def index(request):
latest_files = File.objects.order_by("-posted_date")[:10]
latest_updated_families = ProductFamily.objects \
.annotate(last_posted_date=Max('file__posted_date')) \
.order_by('-last_posted_date')[:10]
groups = ProductGroup.objects.order_by("name")
total_count = File.objects.count()
fcu_banner_expiration_date = datetime.date(2017, 11, 10)
show_fcu_banner = datetime.date.today() < fcu_banner_expiration_date
context = {
'show_fcu_banner': show_fcu_banner,
'latest_files': latest_files,
'latest_updated_families': latest_updated_families,
'groups': groups, 'total_count': total_count,
}
return render(request, 'msdn/index.html', context)
def test_activity_sets_sla_triaged_at():
r = new_report()
r.save()
assert r.sla_triaged_at is None
# An activity that shouldn't update sla_triaged_at
d1 = now()
r.activities.create(id=1, type='activity-comment', created_at=d1)
assert r.sla_triaged_at is None
# And now one that should
d2 = d1 + datetime.timedelta(hours=3)
r.activities.create(id=2, type='activity-bug-not-applicable', created_at=d2)
assert r.sla_triaged_at == d2
# And now another aciivity that would update the date, if it wasn't already set
d3 = d2 + datetime.timedelta(hours=3)
r.activities.create(id=3, type='activity-bug-resolved', created_at=d3)
assert r.sla_triaged_at == d2
def load_all_url_files(_dir, file_name_prefix):
url_list = []
for file_name in os.listdir(_dir):
if fnmatch.fnmatch(file_name, file_name_prefix +'*.txt'):
file_name = osp.join(_dir, file_name)
fp_urls = open(file_name, 'r') #Open the text file called database.txt
print 'load URLs from file: ' + file_name
i = 0
for line in fp_urls:
line = line.strip()
if len(line)>0:
splits = line.split('\t')
url_list.append(splits[0].strip())
i=i+1
print str(i) + ' URLs loaded'
fp_urls.close()
return url_list
########### End of Functions to Load downloaded urls ###########
############## Functions to get date/time strings ############
def _check_annotations(value):
"""
Recursively check that value is either of a "simple" type (number, string,
date/time) or is a (possibly nested) dict, list or numpy array containing
only simple types.
"""
if isinstance(value, np.ndarray):
if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. NumPy arrays with dtype %s"
"are not allowed" % value.dtype.type)
elif isinstance(value, dict):
for element in value.values():
_check_annotations(element)
elif isinstance(value, (list, tuple)):
for element in value:
_check_annotations(element)
elif not isinstance(value, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. Annotations of type %s are not"
"allowed" % type(value))
def _check_annotations(value):
"""
Recursively check that value is either of a "simple" type (number, string,
date/time) or is a (possibly nested) dict, list or numpy array containing
only simple types.
"""
if isinstance(value, np.ndarray):
if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. NumPy arrays with dtype %s"
"are not allowed" % value.dtype.type)
elif isinstance(value, dict):
for element in value.values():
_check_annotations(element)
elif isinstance(value, (list, tuple)):
for element in value:
_check_annotations(element)
elif not isinstance(value, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. Annotations of type %s are not"
"allowed" % type(value))
def __init__(self, value):
"""
Initializer value can be:
- integer_type: absolute days from epoch (1970, 1, 1). Can be negative.
- datetime.date: built-in date
- string_type: a string time of the form "yyyy-mm-dd"
"""
if isinstance(value, six.integer_types):
self.days_from_epoch = value
elif isinstance(value, (datetime.date, datetime.datetime)):
self._from_timetuple(value.timetuple())
elif isinstance(value, six.string_types):
self._from_datestring(value)
else:
raise TypeError('Date arguments must be a whole number, datetime.date, or string')
def test_date():
"""Test a simple dateline"""
date_chart = DateLine(truncate_label=1000)
date_chart.add('dates', [
(date(2013, 1, 2), 300),
(date(2013, 1, 12), 412),
(date(2013, 2, 2), 823),
(date(2013, 2, 22), 672)
])
q = date_chart.render_pyquery()
assert list(
map(lambda t: t.split(' ')[0],
q(".axis.x text").map(texts))) == [
'2013-01-12',
'2013-01-24',
'2013-02-04',
'2013-02-16']
def test_date_xrange():
"""Test dateline with xrange"""
datey = DateLine(truncate_label=1000)
datey.add('dates', [
(date(2013, 1, 2), 300),
(date(2013, 1, 12), 412),
(date(2013, 2, 2), 823),
(date(2013, 2, 22), 672)
])
datey.xrange = (date(2013, 1, 1), date(2013, 3, 1))
q = datey.render_pyquery()
assert list(
map(lambda t: t.split(' ')[0],
q(".axis.x text").map(texts))) == [
'2013-01-01',
'2013-01-12',
'2013-01-24',
'2013-02-04',
'2013-02-16',
'2013-02-27']
def test_date_labels():
"""Test dateline with xrange"""
datey = DateLine(truncate_label=1000)
datey.add('dates', [
(date(2013, 1, 2), 300),
(date(2013, 1, 12), 412),
(date(2013, 2, 2), 823),
(date(2013, 2, 22), 672)
])
datey.x_labels = [
date(2013, 1, 1),
date(2013, 2, 1),
date(2013, 3, 1)
]
q = datey.render_pyquery()
assert list(
map(lambda t: t.split(' ')[0],
q(".axis.x text").map(texts))) == [
'2013-01-01',
'2013-02-01',
'2013-03-01']
def bind_processor(self, dialect):
datetime_date = datetime.date
format = self._storage_format
def process(value):
if value is None:
return None
elif isinstance(value, datetime_date):
return format % {
'year': value.year,
'month': value.month,
'day': value.day,
}
else:
raise TypeError("SQLite Date type only accepts Python "
"date objects as input.")
return process
def result_processor(self, dialect, coltype):
def process(value):
if isinstance(value, datetime.datetime):
return value.date()
elif isinstance(value, util.string_types):
m = self._reg.match(value)
if not m:
raise ValueError(
"could not parse %r as a date value" % (value, ))
return datetime.date(*[
int(x or 0)
for x in m.groups()
])
else:
return value
return process
def _expression_adaptations(self):
return {
operators.add: {
Integer: self.__class__,
Interval: DateTime,
Time: DateTime,
},
operators.sub: {
# date - integer = date
Integer: self.__class__,
# date - date = integer.
Date: Integer,
Interval: DateTime,
# date - datetime = interval,
# this one is not in the PG docs
# but works
DateTime: Interval,
},
}
def test():
"Runs several groups of tests of the database engine."
# Test simple statements in SQL.
persons = test_basic_sql()
# Test various ways to select rows.
test_row_selection(persons)
# Test the four different types of joins in SQL.
orders = test_all_joins(persons)
# Test unstructured ways of joining tables together.
test_table_addition(persons, orders)
# Test creation and manipulation of databases.
test_database_support()
# Load and run some test on the sample Northwind database.
northwind = test_northwind()
# Test different date operations that can be performed.
test_date_functionality()
# Test various functions that operate on specified column.
test_column_functions()
if northwind:
# Test ability to select columns with function processing.
test_generic_column_functions(persons, northwind)
# Test Database2 instances that support transactions.
nw2 = test_transactional_database()
# Allow for interaction at the end of the test.
globals().update(locals())
def test_date_functionality():
"Tests different date operations that can be performed."
# Create an orderz table to test the date type.
orderz = Table(('OrderId', int), ('ProductName', str), ('OrderDate', date))
orderz.insert(1, 'Geitost', date(2008, 11, 11))
orderz.insert(2, 'Camembert Pierrot', date(2008, 11, 9))
orderz.insert(3, 'Mozzarella di Giovanni', date(2008, 11, 11))
orderz.insert(4, 'Mascarpone Fabioloi', date(2008, 10, 29))
# Query the table for a specific date.
orderz.where(ROW.OrderDate == date(2008, 11, 11)).print()
# Update the orderz table so that times are present with the dates.
orderz.alter_column('OrderDate', datetime)
orderz.where(ROW.OrderId == 1) \
.update(OrderDate=datetime(2008, 11, 11, 13, 23, 44))
orderz.where(ROW.OrderId == 2) \
.update(OrderDate=datetime(2008, 11, 9, 15, 45, 21))
orderz.where(ROW.OrderId == 3) \
.update(OrderDate=datetime(2008, 11, 11, 11, 12, 1))
orderz.where(ROW.OrderId == 4) \
.update(OrderDate=datetime(2008, 10, 29, 14, 56, 59))
# Query the table with a datetime object this time.
orderz.where(ROW.OrderDate == datetime(2008, 11, 11)).print()
def add_one_month(t):
"""Return a `datetime.date` or `datetime.datetime` (as given) that is
one month earlier.
Note that the resultant day of the month might change if the following
month has fewer days:
>>> add_one_month(datetime.date(2010, 1, 31))
datetime.date(2010, 2, 28)
"""
import datetime
one_day = datetime.timedelta(days=1)
one_month_later = t + one_day
while one_month_later.month == t.month: # advance to start of next month
one_month_later += one_day
target_month = one_month_later.month
while one_month_later.day < t.day: # advance to appropriate day
one_month_later += one_day
if one_month_later.month != target_month: # gone too far
one_month_later -= one_day
break
return one_month_later
def subtract_one_month(t):
"""Return a `datetime.date` or `datetime.datetime` (as given) that is
one month later.
Note that the resultant day of the month might change if the following
month has fewer days:
>>> subtract_one_month(datetime.date(2010, 3, 31))
datetime.date(2010, 2, 28)
"""
import datetime
one_day = datetime.timedelta(days=1)
one_month_earlier = t - one_day
while one_month_earlier.month == t.month or one_month_earlier.day > t.day:
one_month_earlier -= one_day
return one_month_earlier
def importData(fileName):
"""
Main entry point - opens import data Excel sheet (assumed to be first
sheet) and processes each record, one at a time.
"""
global importSheet, importRow, headers
book = xlrd.open_workbook(fileName)
importSheet = book.sheet_by_index(0)
print "importing from %s" % importSheet.name
# get cell value types from first-non header data row (2nd), since headers are all of type text
for colnum in range(0, importSheet.ncols):
headers.append((importSheet.cell(0, colnum).value, importSheet.cell(1, colnum).ctype))
configErrorReporting(headers)
for importRow in range(1, importSheet.nrows):
record = {}
for colnum in range(0, importSheet.ncols):
if headers[colnum][1] == xlrd.XL_CELL_DATE:
dateTuple = xlrd.xldate_as_tuple(importSheet.cell(rowx=importRow, colx=colnum).value, book.datemode)
date = datetime.date(dateTuple[0], dateTuple[1], dateTuple[2])
# required format for xsd:Date type, for web service call
record[headers[colnum][0]] = date.isoformat()
else:
value = importSheet.cell_value(rowx=importRow, colx=colnum)
if isinstance(value, basestring):
record[headers[colnum][0]] = value.strip()
else:
record[headers[colnum][0]] = value
#print "%s: %s" % (type(record[headers[colnum]]), record[headers[colnum]])
record = handleAccountContact(record)
book.unload_sheet(importSheet.name)
def _o(self, obj):
""" Translate object to json.
Dict in python is not json, so don't be confused.
When return object from rpc, should always use _o.
"""
if obj == None:
return obj
elif isinstance(obj, Decimal):
return str(obj)
elif isinstance(obj, datetime):
return obj.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(obj, date):
return obj.strftime("%Y-%m-%d")
elif isinstance(obj, (list, set, tuple)):
return self._ol(obj)
elif isinstance(obj, dict):
return self._od(obj)
elif isinstance(obj, (int, str, bool, float)):
return obj
else:
return self._oo(obj)
def itermonthdates(self, year, month):
"""
Return an iterator for one month. The iterator will yield datetime.date
values and will always iterate through complete weeks, so it will yield
dates outside the specified month.
"""
date = datetime.date(year, month, 1)
# Go back to the beginning of the week
days = (date.weekday() - self.firstweekday) % 7
date -= datetime.timedelta(days=days)
oneday = datetime.timedelta(days=1)
while True:
yield date
date += oneday
if date.month != month and date.weekday() == self.firstweekday:
break
def _datetime(d):
if isinstance(d, datetime.datetime):
return d
if isinstance(d, datetime.date):
return datetime.datetime(d.year, d.month, d.day)
try:
n = datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
n = datetime.datetime.strptime(d, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
try:
n = datetime.datetime.strptime(d, "%Y-%m-%d")
except ValueError:
n = datetime.datetime.strptime(d, "%Y-%m-%d %H:%M")
return n
def test_only_update_with_existent_fields(self):
messages = [PatientUpdateMessage(
first_name="Mary"
)]
container = MessageContainer(
message_type=PatientUpdateMessage,
messages=messages,
hospital_number="50092915",
issuing_source="uclh"
)
subscription = UclhPatientUpdateSubscription()
subscription.notify(container)
patient = self.session.query(Patient).one()
# only update first name
self.assertEqual("Mary", patient.first_name)
self.assertEqual("Smith", patient.surname)
self.assertEqual(None, patient.middle_name)
self.assertEqual("Ms", patient.title)
self.assertEqual(date(1983, 12, 12), patient.date_of_birth)
def format_date_as_string(val, fmt='%m-%d-%Y'):
"""
:rtype str:
:return the input value formatted as '%Y-%m-%d'
:param val: datetime or string
:param fmt: the input format for the date
"""
if isinstance(val, date):
return val.strftime(fmt)
da = format_date(val, fmt)
if not da:
return ''
return da.strftime(FORMAT_DATABASE_DATE)