def test_datetime(self):
# only millis, not micros
self.round_trip({"date": datetime.datetime(2009, 12, 9, 15,
49, 45, 191000, utc)})
jsn = '{"dt": { "$date" : "1970-01-01T00:00:00.000+0000"}}'
self.assertEqual(EPOCH_AWARE, bsonjs_loads(jsn)["dt"])
jsn = '{"dt": { "$date" : "1970-01-01T00:00:00.000Z"}}'
self.assertEqual(EPOCH_AWARE, bsonjs_loads(jsn)["dt"])
# No explicit offset or timezone is not supported by libbson
jsn = '{"dt": { "$date" : "1970-01-01T00:00:00.000"}}'
self.assertRaises(ValueError, bsonjs_loads, jsn)
# Localtime behind UTC
jsn = '{"dt": { "$date" : "1969-12-31T16:00:00.000-0800"}}'
self.assertEqual(EPOCH_AWARE, bsonjs_loads(jsn)["dt"])
# Localtime ahead of UTC
jsn = '{"dt": { "$date" : "1970-01-01T01:00:00.000+0100"}}'
self.assertEqual(EPOCH_AWARE, bsonjs_loads(jsn)["dt"])
dtm = datetime.datetime(1, 1, 1, 1, 1, 1, 0, utc)
jsn = '{"dt": {"$date": -62135593139000}}'
self.assertEqual(dtm, bsonjs_loads(jsn)["dt"])
jsn = '{"dt": {"$date": {"$numberLong": "-62135593139000"}}}'
self.assertEqual(dtm, bsonjs_loads(jsn)["dt"])
python类EPOCH_AWARE的实例源码
def object_hook(dct, compile_re=True):
if "$oid" in dct:
return ObjectId(str(dct["$oid"]))
if "$ref" in dct:
return DBRef(dct["$ref"], dct["$id"], dct.get("$db", None))
if "$date" in dct:
secs = float(dct["$date"]) / 1000.0
return EPOCH_AWARE + datetime.timedelta(seconds=secs)
if "$regex" in dct:
flags = 0
# PyMongo always adds $options but some other tools may not.
for opt in dct.get("$options", ""):
flags |= _RE_OPT_TABLE.get(opt, 0)
if compile_re:
return re.compile(dct["$regex"], flags)
else:
return Regex(dct["$regex"], flags)
if "$minKey" in dct:
return MinKey()
if "$maxKey" in dct:
return MaxKey()
if "$binary" in dct:
if isinstance(dct["$type"], int):
dct["$type"] = "%02x" % dct["$type"]
subtype = int(dct["$type"], 16)
if subtype >= 0xffffff80: # Handle mongoexport values
subtype = int(dct["$type"][6:], 16)
return Binary(base64.b64decode(dct["$binary"].encode()), subtype)
if "$code" in dct:
return Code(dct["$code"], dct.get("$scope"))
if bson.has_uuid() and "$uuid" in dct:
return bson.uuid.UUID(dct["$uuid"])
return dct
def object_hook(dct, compile_re=True):
if "$oid" in dct:
return ObjectId(str(dct["$oid"]))
if "$ref" in dct:
return DBRef(dct["$ref"], dct["$id"], dct.get("$db", None))
if "$date" in dct:
secs = float(dct["$date"]) / 1000.0
return EPOCH_AWARE + datetime.timedelta(seconds=secs)
if "$regex" in dct:
flags = 0
# PyMongo always adds $options but some other tools may not.
for opt in dct.get("$options", ""):
flags |= _RE_OPT_TABLE.get(opt, 0)
if compile_re:
return re.compile(dct["$regex"], flags)
else:
return Regex(dct["$regex"], flags)
if "$minKey" in dct:
return MinKey()
if "$maxKey" in dct:
return MaxKey()
if "$binary" in dct:
if isinstance(dct["$type"], int):
dct["$type"] = "%02x" % dct["$type"]
subtype = int(dct["$type"], 16)
if subtype >= 0xffffff80: # Handle mongoexport values
subtype = int(dct["$type"][6:], 16)
return Binary(base64.b64decode(dct["$binary"].encode()), subtype)
if "$code" in dct:
return Code(dct["$code"], dct.get("$scope"))
if bson.has_uuid() and "$uuid" in dct:
return bson.uuid.UUID(dct["$uuid"])
return dct
def object_hook(dct, compile_re=True):
if "$oid" in dct:
return ObjectId(str(dct["$oid"]))
if "$ref" in dct:
return DBRef(dct["$ref"], dct["$id"], dct.get("$db", None))
if "$date" in dct:
secs = float(dct["$date"]) / 1000.0
return EPOCH_AWARE + datetime.timedelta(seconds=secs)
if "$regex" in dct:
flags = 0
# PyMongo always adds $options but some other tools may not.
for opt in dct.get("$options", ""):
flags |= _RE_OPT_TABLE.get(opt, 0)
if compile_re:
return re.compile(dct["$regex"], flags)
else:
return Regex(dct["$regex"], flags)
if "$minKey" in dct:
return MinKey()
if "$maxKey" in dct:
return MaxKey()
if "$binary" in dct:
if isinstance(dct["$type"], int):
dct["$type"] = "%02x" % dct["$type"]
subtype = int(dct["$type"], 16)
if subtype >= 0xffffff80: # Handle mongoexport values
subtype = int(dct["$type"][6:], 16)
return Binary(base64.b64decode(dct["$binary"].encode()), subtype)
if "$code" in dct:
return Code(dct["$code"], dct.get("$scope"))
if bson.has_uuid() and "$uuid" in dct:
return bson.uuid.UUID(dct["$uuid"])
return dct
def object_hook(dct, compile_re=True):
if "$oid" in dct:
return ObjectId(str(dct["$oid"]))
if "$ref" in dct:
return DBRef(dct["$ref"], dct["$id"], dct.get("$db", None))
if "$date" in dct:
secs = float(dct["$date"]) / 1000.0
return EPOCH_AWARE + datetime.timedelta(seconds=secs)
if "$regex" in dct:
flags = 0
# PyMongo always adds $options but some other tools may not.
for opt in dct.get("$options", ""):
flags |= _RE_OPT_TABLE.get(opt, 0)
if compile_re:
return re.compile(dct["$regex"], flags)
else:
return Regex(dct["$regex"], flags)
if "$minKey" in dct:
return MinKey()
if "$maxKey" in dct:
return MaxKey()
if "$binary" in dct:
if isinstance(dct["$type"], int):
dct["$type"] = "%02x" % dct["$type"]
subtype = int(dct["$type"], 16)
if subtype >= 0xffffff80: # Handle mongoexport values
subtype = int(dct["$type"][6:], 16)
return Binary(base64.b64decode(dct["$binary"].encode()), subtype)
if "$code" in dct:
return Code(dct["$code"], dct.get("$scope"))
if bson.has_uuid() and "$uuid" in dct:
return bson.uuid.UUID(dct["$uuid"])
return dct
def object_hook(dct, compile_re=True):
if "$oid" in dct:
return ObjectId(str(dct["$oid"]))
if "$ref" in dct:
return DBRef(dct["$ref"], dct["$id"], dct.get("$db", None))
if "$date" in dct:
secs = float(dct["$date"]) / 1000.0
return EPOCH_AWARE + datetime.timedelta(seconds=secs)
if "$regex" in dct:
flags = 0
# PyMongo always adds $options but some other tools may not.
for opt in dct.get("$options", ""):
flags |= _RE_OPT_TABLE.get(opt, 0)
if compile_re:
return re.compile(dct["$regex"], flags)
else:
return Regex(dct["$regex"], flags)
if "$minKey" in dct:
return MinKey()
if "$maxKey" in dct:
return MaxKey()
if "$binary" in dct:
if isinstance(dct["$type"], int):
dct["$type"] = "%02x" % dct["$type"]
subtype = int(dct["$type"], 16)
if subtype >= 0xffffff80: # Handle mongoexport values
subtype = int(dct["$type"][6:], 16)
return Binary(base64.b64decode(dct["$binary"].encode()), subtype)
if "$code" in dct:
return Code(dct["$code"], dct.get("$scope"))
if bson.has_uuid() and "$uuid" in dct:
return bson.uuid.UUID(dct["$uuid"])
return dct