def test_sync_executed_billing_agreement():
ba = get_fixture("rest.billingagreement.execute.json")
inst, created = models.BillingAgreement.get_or_update_from_api_data(ba, always_sync=True)
assert created
assert inst.id == ba["id"]
assert inst.last_payment_date == parse_date("2017-08-24T11:47:17Z")
assert inst.calculate_end_of_period() == parse_date("2017-09-24T11:47:17Z")
python类parse_date()的实例源码
def to_python(self, value, timezone_in_use):
if isinstance(value, datetime.datetime):
return value.astimezone(pytz.utc) if value.tzinfo else value.replace(tzinfo=pytz.utc)
if isinstance(value, datetime.date):
return datetime.datetime(value.year, value.month, value.day, tzinfo=pytz.utc)
if isinstance(value, int):
return datetime.datetime.utcfromtimestamp(value).replace(tzinfo=pytz.utc)
if isinstance(value, string_types):
if value == '0000-00-00 00:00:00':
return self.class_default
if len(value) == 10:
try:
value = int(value)
return datetime.datetime.utcfromtimestamp(value).replace(tzinfo=pytz.utc)
except ValueError:
pass
try:
# left the date naive in case of no tzinfo set
dt = iso8601.parse_date(value, default_timezone=None)
except iso8601.ParseError as e:
raise ValueError(text_type(e))
# convert naive to aware
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
dt = timezone_in_use.localize(dt)
return dt.astimezone(pytz.utc)
raise ValueError('Invalid value for %s - %r' % (self.__class__.__name__, value))
def parse_isotime(timestr, default=None):
"""This duplicates oslo timeutils parse_isotime but with a
@register.filter annotation and a silent fallback on error.
"""
try:
return iso8601.parse_date(timestr)
except (iso8601.ParseError, TypeError):
return default or ''
def load_keys(args):
"""
Get the Facebook API keys. Order of precedence is command line,
environment, config file.
"""
config = {}
input_app_id = None
input_app_secret = None
input_short_access_token = None
if args.config:
config = load_config(args)
if not config:
input_app_id, input_app_secret, input_short_access_token = input_keys(args)
if not input_short_access_token:
save_config(args, input_app_id, input_app_secret)
app_id = args.app_id or os.environ.get('APP_ID') or config.get('app_id') or input_app_id
app_secret = args.app_secret or os.environ.get('APP_SECRET') or config.get('app_secret') or input_app_secret
short_access_token = args.access_token or os.environ.get('ACCESS_TOKEN') or input_short_access_token
long_access_token = config.get('access_token')
expires_at = None
if 'expires_at' in config:
expires_at = iso8601.parse_date(config['expires_at'])
if not (app_id and app_secret):
sys.exit('App id and secret are required.')
return app_id, app_secret, short_access_token, long_access_token, expires_at
def str_to_datetime(s):
"""
:param str s:
:return datetime.datetime:
"""
return parse_date(s)
def parse_isotime(timestr):
"""Parse time from ISO 8601 format"""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
except TypeError as e:
raise ValueError(e.message)
def datetime_u(s):
fmt = "%Y-%m-%dT%H:%M:%S"
try:
return _strptime(s, fmt)
except ValueError:
try:
# strip zulu timezone suffix or utc offset
if s[-1] == "Z" or (s[-3] == ":" and s[-6] in (' ', '-', '+')):
try:
import iso8601
return iso8601.parse_date(s)
except ImportError:
pass
try:
import isodate
return isodate.parse_datetime(s)
except ImportError:
pass
try:
import dateutil.parser
return dateutil.parser.parse(s)
except ImportError:
pass
warnings.warn('removing unsupported "Z" suffix or UTC offset. Install `iso8601`, `isodate` or `python-dateutil` package to support it', RuntimeWarning)
s = s[:-1] if s[-1] == "Z" else s[:-6]
# parse microseconds
try:
return _strptime(s, fmt + ".%f")
except:
return _strptime(s, fmt)
except ValueError:
# strip microseconds (not supported in this platform)
if "." in s:
warnings.warn('removing unsuppported microseconds', RuntimeWarning)
s = s[:s.index(".")]
return _strptime(s, fmt)
def parse_isotime(timestr):
"""Parse time from ISO 8601 format."""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(six.text_type(e))
except TypeError as e:
raise ValueError(six.text_type(e))
def parse_wit_datime(dt):
value = dt['value']
return iso8601.parse_date(value)
def timestamp_from_string(str):
print(str)
if (str.startswith( 'last ', 0, 5 )):
# sample queries: 1m; 1m,2s; 1d,2h,3m,4s
query = Timeseries_query(str.split('last ')[1])
diff = datetime.timedelta(seconds=query.s, minutes=query.m, hours=query.h, days=query.d)
return utcnow() - diff
return iso8601.parse_date(str)
def get_context_data(self, **kwargs):
api_kwargs = self.get_wp_api_kwargs(**kwargs)
page = api_kwargs.get('page_number', 1)
search = api_kwargs.get('search', '')
blogs = WPApiConnector().get_posts(**api_kwargs)
tags = WPApiConnector().get_tags(lang=self.blog_language)
categories = WPApiConnector().get_categories(lang=self.blog_language)
if 'server_error' in blogs or\
'server_error' in tags:
messages.add_message(self.request, messages.ERROR,
blogs['server_error'])
raise Http404
if not blogs['body']:
raise Http404
for blog in blogs['body']:
if blog['excerpt'] is not None:
position = blog['excerpt'].find(
'Continue reading')
if position != -1:
blog['excerpt'] = blog['excerpt'][:position]
blog['slug'] = str(blog['slug'])
blog['bdate'] = iso8601.parse_date(blog['date']).date()
context = {
'blogs': blogs['body'],
'tags': tags,
'categories': categories,
'search': search,
'total_posts': int(blogs['headers']['X-WP-Total']),
'total_pages': int(blogs['headers']['X-WP-TotalPages']),
'current_page': page,
'previous_page': page - 1,
'next_page': page + 1,
}
return context
def get_context_data(self, **kwargs):
api_kwargs = self.get_wp_api_kwargs(**kwargs)
page = api_kwargs.get('page_number', 1)
search = api_kwargs.get('search', '')
if not isinstance(page, int):
page = 1
blogs = WPApiConnector().get_posts(**api_kwargs)
tags = WPApiConnector().get_tags(lang=self.blog_language)
categories = WPApiConnector().get_categories(lang=self.blog_language)
if 'server_error' in blogs or\
'server_error' in tags:
raise Http404
if not blogs['body']:
raise Http404
for blog in blogs['body']:
if blog['excerpt'] is not None:
position = blog['excerpt'].find(
'Continue reading')
if position != -1:
blog['excerpt'] = blog['excerpt'][:position]
blog['slug'] = str(blog['slug'])
blog['bdate'] = iso8601.parse_date(blog['date']).date()
context = {
'blogs': blogs['body'],
'tags': tags,
'categories': categories,
'search': search,
'total_posts': int(blogs['headers']['X-WP-Total']),
'total_pages': int(blogs['headers']['X-WP-TotalPages']),
'current_page': page,
'previous_page': page - 1,
'next_page': page + 1,
}
return context
def parse_date_utc(date, milliseconds=True):
"""Parses dates from ISO8601 or Epoch formats to a standard datetime object.
This is particularly useful since Habitica returns dates in two
formats::
- iso8601 encoded strings
- Long integer Epoch times
Args:
date (str): A date string in either iso8601 or Epoch format.
milliseconds (bool): If True, then epoch times are treated as
millisecond values, otherwise they are evaluated as seconds.
Returns:
datetime: The parsed date time in UTC.
"""
parsed_date = None
try:
parsed_date = iso8601.parse_date(date)
except iso8601.ParseError:
value = int(date)
# utcfromtimestamp expects values in seconds
if milliseconds:
value /= 1000
parsed_date = datetime.utcfromtimestamp(value)
return parsed_date.replace(tzinfo=pytz.utc)
def get(self, bucket_id):
args = request.args
limit = int(args["limit"]) if "limit" in args else 100
start = iso8601.parse_date(args["start"]) if "start" in args else None
end = iso8601.parse_date(args["end"]) if "end" in args else None
events = app.api.get_events(bucket_id, limit=limit, start=start, end=end)
return events, 200
# TODO: How to tell expect that it could be a list of events? Until then we can't use validate.
def get(self, viewname):
args = request.args
start = iso8601.parse_date(args["start"]) if "start" in args else None
end = iso8601.parse_date(args["end"]) if "end" in args else None
result = app.api.query_view(viewname, start, end)
return result, 200
def parse_date_utc(date, milliseconds=True):
"""Parses dates from ISO8601 or Epoch formats to a standard datetime object.
This is particularly useful since Habitica returns dates in two
formats::
- iso8601 encoded strings
- Long integer Epoch times
Args:
date (str): A date string in either iso8601 or Epoch format.
milliseconds (bool): If True, then epoch times are treated as
millisecond values, otherwise they are evaluated as seconds.
Returns:
datetime: The parsed date time in UTC.
"""
parsed_date = None
try:
parsed_date = iso8601.parse_date(date)
except iso8601.ParseError:
value = int(date)
# utcfromtimestamp expects values in seconds
if milliseconds:
value /= 1000
parsed_date = datetime.utcfromtimestamp(value)
return parsed_date.replace(tzinfo=pytz.utc)
def to_native(self, value, context=None):
if isinstance(value, datetime):
return value
try:
date = parse_date(value, None)
if not date.tzinfo:
date = TZ.localize(date)
return date
except ParseError:
raise ConversionError(self.messages['parse'].format(value))
except OverflowError as e:
raise ConversionError(e.message)
def get(self, request):
return Response({
"datetime": iso8601.parse_date(datetime.utcnow().isoformat())
})
def deserialize_primitive(cls, data):
if cls is datetime.datetime:
try:
d = parse_date(data)
except iso8601.ParseError:
raise ValueError("'{}' is not a datetime.".format(data))
elif cls is datetime.date:
try:
d = parse_date(data).date()
except iso8601.ParseError:
raise ValueError("'{}' is not a date.".format(data))
elif cls in {int, float, uuid.UUID, bool}:
d = cls(data)
elif cls is numbers.Integral:
d = data
elif cls is decimal.Decimal:
try:
d = cls(data)
except decimal.InvalidOperation:
raise ValueError("'{}' is not a decimal.".format(data))
elif cls is text_type:
if not isinstance(data, text_type):
raise ValueError("'{}' is not a string.".format(data))
d = cls(data)
else:
raise TypeError(
"'{0}' is not a primitive type.".format(typing._type_repr(cls))
)
return d
def _convert_dates(cls, data):
# NOTE(sileht): browse to aggregates measures dict tree and convert
# date when we found timeseries, dict can looks like
# {"aggregated": ...}, {"metric_id": {"agg": ...}} or
# {"resource_id": {"metric_name": {"agg": ...}}}
for key in data:
if isinstance(data[key], list):
data[key] = [(iso8601.parse_date(ts), g, value)
for ts, g, value in data[key]]
elif isinstance(data[key], dict):
cls._convert_dates(data[key])
else:
raise RuntimeError("Unexpected aggregates API output %s" %
data[key])