def cast_primitive_value(spec, value):
format = spec.get('format')
type = spec.get('type')
if type == 'boolean':
return (force_text(value).lower() in ('1', 'yes', 'true'))
if type == 'integer' or format in ('integer', 'long'):
return int(value)
if type == 'number' or format in ('float', 'double'):
return float(value)
if format == 'byte': # base64 encoded characters
return base64.b64decode(value)
if format == 'binary': # any sequence of octets
return force_bytes(value)
if format == 'date': # ISO8601 date
return iso8601.parse_date(value).date()
if format == 'dateTime': # ISO8601 datetime
return iso8601.parse_date(value)
if type == 'string':
return force_text(value)
return value
python类parse_date()的实例源码
def cast_primitive_value(spec, value):
format = spec.get('format')
type = spec.get('type')
if type == 'boolean':
return (force_text(value).lower() in ('1', 'yes', 'true'))
if type == 'integer' or format in ('integer', 'long'):
return int(value)
if type == 'number' or format in ('float', 'double'):
return float(value)
if format == 'byte': # base64 encoded characters
return base64.b64decode(value)
if format == 'binary': # any sequence of octets
return force_bytes(value)
if format == 'date': # ISO8601 date
return iso8601.parse_date(value).date()
if format == 'dateTime': # ISO8601 datetime
return iso8601.parse_date(value)
if type == 'string':
return force_text(value)
return value
def _process_event(self, repo, event):
"""Process potentially new event for repository
:param repo: Repository related to event
:type repo: ``repocribro.models.Repository``
:param event: GitHub event data
:type event: dict
:return: If the event was new or already registered before
:rtype: bool
"""
last = pytz.utc.localize(repo.last_event)
if iso8601.parse_date(event['created_at']) <= last:
return False
hook_type = self.event2webhook.get(event['type'], 'uknown')
for event_processor in self.hooks.get(hook_type, []):
try:
event_processor(db=self.db, repo=repo,
payload=event['payload'],
actor=event['actor'])
print('Processed {} from {} event for {}'.format(
event['type'], event['created_at'], repo.full_name
))
except HTTPException:
print('Error while processing #{}'.format(event['id']))
return True
def _is_valid_type(self, t, value):
try:
if t == 'number':
float(value)
elif t == 'integer':
int(value)
elif t == 'boolean':
assert type(value) == bool
elif t == 'timestamp':
iso8601.parse_date(value)
elif t == 'date':
iso8601.parse_date(value + 'T00:00:00Z')
elif t == 'string':
# Allow coercing ints/floats, but nothing else
assert type(value) in [str, int, float]
except:
return False
return True
def linear_check(self):
"""Action to wait until all checks are done
Each check has own timeout equals 300 secs"""
creation_dates = []
for item in self.__linear_order:
resource_repr = getattr(item, '__resource_repr')
helpers.wait(
lambda: self._linear_check(item), timeout=300, interval=2,
timeout_msg="{} creation timeout reached".format(resource_repr)
)
k8s_obj = self.get_k8s_object(resource_repr)
creation_date = iso8601.parse_date(
k8s_obj.metadata.creation_timestamp)
creation_dates.append(creation_date)
if len(creation_dates) > 1:
assert creation_dates[-2] <= creation_dates[-1], (
"The order of linear objects is broken!")
LOG.info("Linear check passed!")
def parse_operators(args):
""" Avoid problem that mongoengine for some reason the operators
gte, gt, lt, lte doesn't work with dates in isoformat
"""
args = {k: v for k, v in args.items() if k not in ['skip', 'limit']}
for k, v in args.items():
try:
is_data = iso8601.parse_date(v)
except:
is_data = False
if is_data:
args[k] = is_data if is_data else v
return args
def make_node(cls, data):
try:
key = data['key']
except KeyError:
key, kwargs = u'/', {}
else:
kwargs = {'modified_index': int(data['modifiedIndex']),
'created_index': int(data['createdIndex'])}
ttl = data.get('ttl')
if ttl is not None:
expiration = iso8601.parse_date(data['expiration'])
kwargs.update(ttl=ttl, expiration=expiration)
if 'value' in data:
node_cls = Value
args = (data['value'],)
elif data.get('dir', False):
node_cls = Directory
args = ([cls.make_node(n) for n in data.get('nodes', ())],)
else:
node_cls, args = Node, ()
return node_cls(key, *args, **kwargs)
def run(self, query, args):
self.start = args.get(constants.PARAM_START_DATE)
self.end = args.get(constants.PARAM_END_DATE)
self.validate_start_end_dates()
if self.errors:
return
start_date, end_date = None, None
if self.start:
start_date = str(iso8601.parse_date(self.start).date())
if self.end:
end_date = iso8601.parse_date(self.end).date()
if '-' not in self.end: # Solo especifica año
end_date = datetime.date(end_date.year, 12, 31)
if self.end.count('-') == 1: # Especifica año y mes
# Obtengo el último día del mes, monthrange devuelve
# tupla (month, last_day)
days = monthrange(end_date.year, end_date.month)[1]
end_date = datetime.date(end_date.year, end_date.month, days)
query.add_filter(start_date, end_date)
def validate_date(self, _date, param):
"""Valida y parsea la fecha pasada.
Args:
_date (str): date string, ISO 8601
param (str): Parámetro siendo parseado
Returns:
date con la fecha parseada
Raises:
ValueError: si el formato no es válido
"""
try:
parsed_date = iso8601.parse_date(_date)
except iso8601.ParseError:
self._append_error(strings.INVALID_DATE.format(param, _date))
raise ValueError
return parsed_date
def test_add_collapse(self):
"""Testea que luego de agregar un collapse default, los
resultados sean anuales, es decir cada uno a un año de
diferencia con su anterior"""
self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
self.query.add_collapse()
data = self.query.run()
prev_timestamp = None
for row in data:
timestamp = row[0]
parsed_timestamp = iso8601.parse_date(timestamp)
if not prev_timestamp:
prev_timestamp = parsed_timestamp
continue
delta = relativedelta(parsed_timestamp, prev_timestamp)
self.assertTrue(delta.years == 1, timestamp)
prev_timestamp = parsed_timestamp
def test_query_fills_nulls(self):
self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
self.query.add_series(self.delayed_series,
self.rep_mode,
self.series_periodicity)
query = ESQuery(index=settings.TEST_INDEX)
query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
query.sort('asc')
delayed_first_date = iso8601.parse_date(query.run()[0][0])
data = self.query.run()
delayed_series_index = 1 # Primera serie agregada
for row in data:
current_date = iso8601.parse_date(row[0])
if current_date < delayed_first_date:
self.assertEqual(row[delayed_series_index], None)
else:
break
def test_query_fills_nulls_second_series(self):
self.query.add_series(self.delayed_series,
self.rep_mode,
self.series_periodicity)
self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
query = ESQuery(index=settings.TEST_INDEX)
query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
query.sort('asc')
delayed_first_date = iso8601.parse_date(query.run()[0][0])
data = self.query.run()
delayed_series_index = 2 # Segunda serie agregada
for row in data:
current_date = iso8601.parse_date(row[0])
if current_date < delayed_first_date:
self.assertEqual(row[delayed_series_index], None)
else:
break
def test_end_of_period(self):
query = ESQuery(index=settings.TEST_INDEX)
query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
query.add_pagination(start=0, limit=1000)
query.sort('asc')
query.add_filter(start="1970")
orig_data = query.run()
self.query.add_series(self.single_series,
self.rep_mode,
self.series_periodicity,
'end_of_period')
self.query.add_filter(start="1970")
self.query.add_collapse('year')
eop_data = self.query.run()
for eop_row in eop_data:
eop_value = eop_row[1]
year = iso8601.parse_date(eop_row[0]).year
for row in orig_data:
row_date = iso8601.parse_date(row[0])
if row_date.year == year and row_date.month == 12:
self.assertAlmostEqual(eop_value, row[1], 5) # EOP trae pérdida de precisión
break
def test_query_fills_nulls(self):
self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
self.query.add_series(self.delayed_series,
self.rep_mode,
self.series_periodicity)
query = ESQuery(index=settings.TEST_INDEX)
query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
query.sort('asc')
delayed_first_date = iso8601.parse_date(query.run()[0][0])
self.query.sort('asc')
data = self.query.run()
delayed_series_index = 1 # Primera serie agregada
for row in data:
current_date = iso8601.parse_date(row[0])
if current_date < delayed_first_date:
self.assertEqual(row[delayed_series_index], None)
else:
break
def test_query_fills_nulls_second_series(self):
self.query.add_series(self.delayed_series,
self.rep_mode,
self.series_periodicity)
self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
query = ESQuery(index=settings.TEST_INDEX)
query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
query.sort('asc')
delayed_first_date = iso8601.parse_date(query.run()[0][0])
self.query.sort('asc')
data = self.query.run()
delayed_series_index = 2 # Segunda serie agregada
for row in data:
current_date = iso8601.parse_date(row[0])
if current_date < delayed_first_date:
self.assertEqual(row[delayed_series_index], None)
else:
break
def test_index_continuity(self):
self.query.add_series(self.delayed_series,
self.rep_mode,
self.series_periodicity)
self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
self.query.add_filter(start="1910", end="1920")
self.query.sort('asc')
query = ESQuery(index=settings.TEST_INDEX)
query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
query.add_filter(start="1921") # Garantiza datos vacíos entre 1920-1921
query.add_pagination(start=0, limit=1000)
query.sort('asc')
data = self.query.run()
current_date = iso8601.parse_date(data[0][0])
for row in data[1:]:
row_date = iso8601.parse_date(row[0])
self.assertEqual(current_date + relativedelta(months=1), row_date)
current_date = row_date
def get_times(request):
"""Gets start and endtime from request
As we use no timezone in NAV, remove it from parsed timestamps
:param request: django.http.HttpRequest
"""
starttime = request.GET.get('starttime')
endtime = request.GET.get('endtime')
try:
if starttime:
starttime = iso8601.parse_date(starttime).replace(tzinfo=None)
if endtime:
endtime = iso8601.parse_date(endtime).replace(tzinfo=None)
except iso8601.ParseError:
raise Iso8601ParseError
return starttime, endtime
def get_timestamp(filename):
class GMLHandler(xml.sax.ContentHandler):
timestamp = None
def startElement(self, name, attrs):
if name == "wfs:FeatureCollection":
self.timestamp = attrs['timeStamp']
handler = GMLHandler()
parser = xml.sax.make_parser()
parser.setContentHandler(handler)
parser.parse(filename)
timestamp = iso8601.parse_date(handler.timestamp, default_timezone=None)
return pytz.timezone(settings.TIME_ZONE).localize(timestamp)
def _test_get_job_destruction(self, username):
'''
GET /{jobs}/{job-id}/destruction returns the destruction instant for
{job-id} as [std:iso8601].
'''
for job in self.jobs:
url = reverse(self.url_names['destruction'], kwargs={'pk': job.pk})
response = self.client.get(url)
if username == 'user':
self.assertEqual(response.status_code, 200)
if job.destruction_time:
destruction_time = iso8601.parse_date(response.content.decode())
self.assertEqual(destruction_time, job.destruction_time)
else:
self.assertEqual(response.content.decode(), '')
else:
self.assertEqual(response.status_code, 404)
def _test_post_job_destruction(self, username):
'''
POST /{jobs}/{job-id}/destruction with DESTRUCTION={std:iso8601}
(application/x-www-form-urlencoded) sets the destruction instant for
{job-id} and redirects to /{jobs}/{job-id} as 303.
'''
destruction_time = '2016-01-01T00:00:00'
for job in self.jobs:
url = reverse(self.url_names['destruction'], kwargs={'pk': job.pk})
response = self.client.post(url, urlencode({'DESTRUCTION': destruction_time}), content_type='application/x-www-form-urlencoded')
if username == 'user':
redirect_url = 'http://testserver' + reverse(self.url_names['detail'], kwargs={'pk': job.pk})
self.assertRedirects(response, redirect_url, status_code=303)
self.assertEqual(
self.jobs.get(pk=job.pk).destruction_time,
iso8601.parse_date('2016-01-01T00:00:00')
)
else:
self.assertEqual(response.status_code, 404)
def filter_queryset(self, request, queryset, view):
query_dict = make_query_dict_upper_case(request.GET)
# apply only for list
if view.action == 'list_jobs':
phases = query_dict.getlist('PHASE')
if phases:
queryset = queryset.filter(phase__in=phases)
else:
queryset = queryset.exclude(phase__exact=Job.PHASE_ARCHIVED)
after = query_dict.get('AFTER')
if after:
queryset = queryset.filter(creation_time__gt=iso8601.parse_date(after))
last = query_dict.get('LAST')
if last:
queryset = queryset.filter(start_time__isnull=False) \
.order_by('-start_time')[:int(last)]
return queryset
def _timestamp_parse(ts: ConvertableTimestamp) -> datetime:
"""
Takes something representing a timestamp and
returns a timestamp in the representation we want.
"""
if isinstance(ts, str):
ts = iso8601.parse_date(ts)
# Set resolution to milliseconds instead of microseconds
# (Fixes incompability with software based on unix time, for example mongodb)
ts = ts.replace(microsecond=int(ts.microsecond / 1000) * 1000)
# Add timezone if not set
if not ts.tzinfo:
# Needed? All timestamps should be iso8601 so ought to always contain timezone.
# Yes, because it is optional in iso8601
logger.warning("timestamp without timezone found, using UTC: {}".format(ts))
ts = ts.replace(tzinfo=timezone.utc)
return ts
def test_create_auction_auctionPeriod(self):
data = self.initial_data.copy()
#tenderPeriod = data.pop('tenderPeriod')
#data['auctionPeriod'] = {'startDate': tenderPeriod['endDate']}
response = self.app.post_json('/auctions', {'data': data})
self.assertEqual(response.status, '201 Created')
self.assertEqual(response.content_type, 'application/json')
auction = response.json['data']
self.assertIn('tenderPeriod', auction)
self.assertIn('auctionPeriod', auction)
self.assertNotIn('startDate', auction['auctionPeriod'])
self.assertEqual(parse_date(data['auctionPeriod']['startDate']).date(), parse_date(auction['auctionPeriod']['shouldStartAfter'], TZ).date())
if SANDBOX_MODE:
auction_startDate = parse_date(data['auctionPeriod']['startDate'], None)
if not auction_startDate.tzinfo:
auction_startDate = TZ.localize(auction_startDate)
tender_endDate = parse_date(auction['tenderPeriod']['endDate'], None)
if not tender_endDate.tzinfo:
tender_endDate = TZ.localize(tender_endDate)
self.assertLessEqual((auction_startDate - tender_endDate).total_seconds(), 70)
else:
self.assertEqual(parse_date(auction['tenderPeriod']['endDate']).date(), parse_date(data['auctionPeriod']['startDate'], TZ).date() - timedelta(days=1))
self.assertEqual(parse_date(auction['tenderPeriod']['endDate']).time(), time(20, 0))
test_db_api.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_instance_get_all_by_filters_changes_since(self):
i1 = self.create_instance_with_args(updated_at=
'2013-12-05T15:03:25.000000')
i2 = self.create_instance_with_args(updated_at=
'2013-12-05T15:03:26.000000')
changes_since = iso8601.parse_date('2013-12-05T15:03:25.000000')
result = db.instance_get_all_by_filters(self.ctxt,
{'changes-since':
changes_since})
self._assertEqualListsOfInstances([i1, i2], result)
changes_since = iso8601.parse_date('2013-12-05T15:03:26.000000')
result = db.instance_get_all_by_filters(self.ctxt,
{'changes-since':
changes_since})
self._assertEqualListsOfInstances([i2], result)
db.instance_destroy(self.ctxt, i1['uuid'])
filters = {}
filters['changes-since'] = changes_since
filters['marker'] = i1['uuid']
result = db.instance_get_all_by_filters(self.ctxt,
filters)
self._assertEqualListsOfInstances([i2], result)
def test_parse_no_timezone():
"""issue 4 - Handle datetime string without timezone
This tests what happens when you parse a date with no timezone. While not
strictly correct this is quite common. I'll assume UTC for the time zone
in this case.
"""
d = iso8601.parse_date("2007-01-01T08:00:00")
assert d.year == 2007
assert d.month == 1
assert d.day == 1
assert d.hour == 8
assert d.minute == 0
assert d.second == 0
assert d.microsecond == 0
assert d.tzinfo == iso8601.UTC
def load_logs(self, start, end, log_filter):
try:
response = urllib.request.urlopen(self.get_url(start, end, log_filter))
except urllib.error.URLError as err:
print("Error reading from network: %s" % err)
return []
body = response.read().decode('utf-8')
events = []
for line in body.split('\n'):
match = re.match(r'.*?(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}'
r'.\d{6}[-+]\d{2}:\d{2}).*path="([^"]+)".*', line)
if match:
path = self.filter_path(match.groups()[1])
when = iso8601.parse_date(match.groups()[0])
events.append((when, path))
return events
def as_dict(self):
details = json.loads(self.details)
name = "Anonymous Donor"
if 'donorName' in details and details['donorName']:
name = details['donorName']
datetime = iso8601.parse_date(details['createdOn'])
info = {
# general
'name': name,
'comment': details.get('message', "") or '',
'donation_amount': float(details['donationAmount']),
'currency': 'USD',
# Display-friendly
'amount': "$%.2f" % details['donationAmount'],
'timestamp': datetime,
}
return info
def as_dict(self):
details = json.loads(self.details)
name = "Anonymous"
amount = " ".join([str(details['amount']), details['currencyCode']])
if 'user' in details:
name = details['user']['displayName']
elif 'username' in details:
name = details['username']
timestamp = iso8601.parse_date(details['date'])
info = {
'name': name,
'amount': amount,
'comment': details['note'],
'donation_amount': float(details['amount']),
'currency': details['currencyCode'],
'timestamp': timestamp,
}
return info
def parse_datetime(date):
"""
Validates date is in iso8601 format. Returns parsed datetime in UTC as as
native datetime (tzinfo=None).
"""
if not isinstance(date, basestring):
raise Invalid('date is not a string')
try:
return iso8601.parse_date(date).astimezone(iso8601.UTC).replace(
tzinfo=None)
except:
raise Invalid('date is not in iso8601 format')
def test_sync_all_active_plans():
all_plans = get_fixture("rest.billingplan.all.active.json")
models.BillingPlan.objects.sync_data(all_plans["plans"])
assert models.BillingPlan.objects.count() == len(all_plans["plans"])
for plan in all_plans["plans"]:
plan_obj = get_fixture(
"GET/v1/payments/billing-plans/{id}.json".format(id=plan["id"])
)
plan = models.BillingPlan.objects.get(id=plan_obj["id"])
assert plan.id == plan_obj["id"]
assert plan.state == getattr(enums.BillingPlanState, plan_obj["state"])
assert plan.type == getattr(enums.BillingPlanType, plan_obj["type"])
assert plan.name == plan_obj["name"]
assert plan.description == plan_obj["description"]
assert plan.merchant_preferences == plan_obj["merchant_preferences"]
assert plan.create_time == parse_date(plan_obj["create_time"])
assert plan.update_time == parse_date(plan_obj["update_time"])
for definition in plan_obj["payment_definitions"]:
pd = models.PaymentDefinition.objects.get(id=definition["id"])
assert pd.id == definition["id"]
assert pd.name == definition["name"]
assert pd.type == getattr(enums.PaymentDefinitionType, definition["type"])
assert plan.payment_definitions.filter(id=pd.id).count() == 1