def convert_timestamp(obj, default_tz='America/Chicago'):
"""Makes a Bark timestamp from an object.
If the object is not timezone-aware, the timezone is set to be `default_tz.`
Args:
obj: time object; see :meth:`timestamp_to_datetime` for supported types
default_tz (str): timezone to use if `obj` is timezone-naive; must be a
string from the `tz database
<https://en.wikipedia.org/wiki/List_of_tz_database_time_zones>`_.
Returns:
Arrow: Bark timestamp
"""
dt = timestamp_to_datetime(obj)
if dt.tzinfo:
return arrow.get(dt).isoformat()
else:
return arrow.get(dt, default_tz).isoformat()
python类Arrow()的实例源码
def test_process_basic_information_sanity_checks(device):
pprint.pprint(device)
for key in [
'model',
'name',
'platform',
'serial',
'source',
'last_sync',
'software',
]:
assert key in device
assert isinstance(device['last_sync'], arrow.Arrow)
assert device['source'] == 'jamf'
def local_datetime(utcdatetime, format=None, timezone=None):
"""
Return local datetime based on the timezone
It will automatically format the date.
To not format the date, set format=False
:param utcdatetime: Arrow or string
:param format: string of format or False
:param timezone: string, ie: US/Eastern
:return:
"""
if utcdatetime is None:
return None
timezone = timezone or config("DATETIME_TIMEZONE", "US/Eastern")
dt = utcdatetime.to(timezone) \
if isinstance(utcdatetime, arrow.Arrow) \
else arrow.get(utcdatetime, timezone)
if format is False:
return dt
_ = config("DATETIME_FORMAT")
format = _.get("default") or "MM/DD/YYYY" if not format else _.get(format)
return dt.format(format)
def test_is_weekday(self):
mon = arrow.Arrow(2017, 8, 14).weekday()
tue = arrow.Arrow(2017, 8, 15).weekday()
wed = arrow.Arrow(2017, 8, 16).weekday()
thu = arrow.Arrow(2017, 8, 17).weekday()
fri = arrow.Arrow(2017, 8, 18).weekday()
sat = arrow.Arrow(2017, 8, 19).weekday()
sun = arrow.Arrow(2017, 8, 20).weekday()
self.assertEqual(mon < 5, True)
self.assertEqual(tue < 5, True)
self.assertEqual(wed < 5, True)
self.assertEqual(thu < 5, True)
self.assertEqual(fri < 5, True)
self.assertEqual(sat < 5, False)
self.assertEqual(sun < 5, False)
def test_create_fails_if_pending_account_has_same_email():
"""
Test checks to see if an email is tied to a pending account creation entry
in logincreate. If so, login.create() will not permit another account to be
made for the same address.
"""
d.engine.execute(d.meta.tables["logincreate"].insert(), {
"token": 40 * "a",
"username": "existing",
"login_name": "existing",
"hashpass": login.passhash(raw_password),
"email": email_addr,
"birthday": arrow.Arrow(2000, 1, 1),
"unixtime": arrow.now(),
})
form = Bag(username="test", password='0123456789', passcheck='0123456789',
email=email_addr, emailcheck=email_addr,
day='12', month='12', year=arrow.now().year - 19)
with pytest.raises(WeasylError) as err:
login.create(form)
assert 'emailExists' == err.value.value
def timestamp_to_datetime(obj):
"""Converts an object to a `datetime.datetime` object.
Note that because floating point values are approximate, some conversions
may not be reversible.
Args:
obj: may be any of the following:
1. `datetime.datetime` object
2. Arrow object
3. ISO 8601 string
4. `time.struct_time` object
5. integer (epoch time)
6. float (epoch time)
7. 2-tuple of integers (seconds and microseconds, epoch time)
Returns:
datetime.datetime: (local) timezone-aware object
Raises:
TypeError: if `obj` cannot be interpreted according to the list above
"""
import numbers
from time import mktime, struct_time
if isinstance(obj, datetime):
dt = obj
elif isinstance(obj, arrow.Arrow):
dt = obj.datetime
elif isinstance(obj, str):
dt = arrow.get(obj).datetime
elif isinstance(obj, struct_time):
dt = mktime(obj)
elif isinstance(obj, numbers.Number):
dt = datetime.fromtimestamp(obj)
elif hasattr(obj, '__getitem__'):
dt = datetime.fromtimestamp(obj[0])
dt += timedelta(microseconds=int(obj[1]))
else:
raise TypeError("unable to convert %s to timestamp" % obj)
return dt
gremlin_formatting.py 文件源码
项目:graphql-compiler
作者: kensho-technologies
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def _safe_gremlin_argument(expected_type, argument_value):
"""Return a Gremlin string representing the given argument value."""
if GraphQLString.is_same_type(expected_type):
return _safe_gremlin_string(argument_value)
elif GraphQLID.is_same_type(expected_type):
# IDs can be strings or numbers, but the GraphQL library coerces them to strings.
# We will follow suit and treat them as strings.
if not isinstance(argument_value, six.string_types):
if isinstance(argument_value, bytes): # should only happen in py3
argument_value = argument_value.decode('utf-8')
else:
argument_value = six.text_type(argument_value)
return _safe_gremlin_string(argument_value)
elif GraphQLFloat.is_same_type(expected_type):
return represent_float_as_str(argument_value)
elif GraphQLInt.is_same_type(expected_type):
# Special case: in Python, isinstance(True, int) returns True.
# Safeguard against this with an explicit check against bool type.
if isinstance(argument_value, bool):
raise GraphQLInvalidArgumentError(u'Attempting to represent a non-int as an int: '
u'{}'.format(argument_value))
return type_check_and_str(int, argument_value)
elif GraphQLBoolean.is_same_type(expected_type):
return type_check_and_str(bool, argument_value)
elif GraphQLDate.is_same_type(expected_type):
return _safe_gremlin_date_and_datetime(expected_type, (datetime.date,), argument_value)
elif GraphQLDateTime.is_same_type(expected_type):
return _safe_gremlin_date_and_datetime(expected_type,
(datetime.datetime, arrow.Arrow), argument_value)
elif isinstance(expected_type, GraphQLList):
return _safe_gremlin_list(expected_type.of_type, argument_value)
else:
raise AssertionError(u'Could not safely represent the requested GraphQL type: '
u'{} {}'.format(expected_type, argument_value))
######
# Public API
######
def _safe_match_argument(expected_type, argument_value):
"""Return a MATCH (SQL) string representing the given argument value."""
if GraphQLString.is_same_type(expected_type):
return _safe_match_string(argument_value)
elif GraphQLID.is_same_type(expected_type):
# IDs can be strings or numbers, but the GraphQL library coerces them to strings.
# We will follow suit and treat them as strings.
if not isinstance(argument_value, six.string_types):
if isinstance(argument_value, bytes): # should only happen in py3
argument_value = argument_value.decode('utf-8')
else:
argument_value = six.text_type(argument_value)
return _safe_match_string(argument_value)
elif GraphQLFloat.is_same_type(expected_type):
return represent_float_as_str(argument_value)
elif GraphQLInt.is_same_type(expected_type):
# Special case: in Python, isinstance(True, int) returns True.
# Safeguard against this with an explicit check against bool type.
if isinstance(argument_value, bool):
raise GraphQLInvalidArgumentError(u'Attempting to represent a non-int as an int: '
u'{}'.format(argument_value))
return type_check_and_str(int, argument_value)
elif GraphQLBoolean.is_same_type(expected_type):
return type_check_and_str(bool, argument_value)
elif GraphQLDate.is_same_type(expected_type):
return _safe_match_date_and_datetime(expected_type, (datetime.date,), argument_value)
elif GraphQLDateTime.is_same_type(expected_type):
return _safe_match_date_and_datetime(expected_type,
(datetime.datetime, arrow.Arrow), argument_value)
elif isinstance(expected_type, GraphQLList):
return _safe_match_list(expected_type.of_type, argument_value)
else:
raise AssertionError(u'Could not safely represent the requested GraphQL type: '
u'{} {}'.format(expected_type, argument_value))
######
# Public API
######
def _serialize_datetime(value):
"""Serialize a DateTime object to its proper ISO-8601 representation."""
if not isinstance(value, (datetime, arrow.Arrow)):
raise ValueError(u'The received object was not a datetime: '
u'{} {}'.format(type(value), value))
return value.isoformat()
def _parse_datetime_value(value):
"""Deserialize a DateTime object from its proper ISO-8601 representation."""
if value.endswith('Z'):
# Arrow doesn't support the "Z" literal to denote UTC time.
# Strip the "Z" and add an explicit time zone instead.
value = value[:-1] + '+00:00'
return arrow.get(value, 'YYYY-MM-DDTHH:mm:ssZ').datetime
def fresh_timestamp(self):
"""
Get a fresh timestamp for the model.
:return: arrow.Arrow
"""
return arrow.get().naive
def from_datetime(self, value):
"""
Convert datetime to a datetime object
:rtype: datetime.datetime
"""
if isinstance(value, arrow.Arrow):
return value.naive
return arrow.get(value).naive
def as_datetime(self, value):
"""
Return a timestamp as a datetime.
:rtype: arrow.Arrow
"""
return arrow.get(value)
def _format_date(self, date):
"""
Format a date or timestamp.
:param date: The date or timestamp
:type date: datetime.datetime or datetime.date or arrow.Arrow
:rtype: str
"""
if date is None:
return date
format = self.get_date_format()
if format == 'iso':
if isinstance(date, basestring):
return arrow.get(date).isoformat()
return date.isoformat()
else:
if isinstance(date, arrow.Arrow):
return date.format(format)
elif isinstance(date, basestring):
return arrow.get(date).format(format)
return date.strftime(format)
def getDate():
today = arrow.utcnow()
return arrow.Arrow(today.year, today.month, today.day).datetime
def _json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, Arrow):
serial = obj.isoformat()
return serial
elif isinstance(obj, bytes):
return to_jsonb64(obj)
raise TypeError("Type %s not serializable" % type(obj))
def get_timeframe(data: Dict[str, Dict]) -> Tuple[arrow.Arrow, arrow.Arrow]:
"""
Get the maximum timeframe for the given backtest data
:param data: dictionary with backtesting data
:return: tuple containing min_date, max_date
"""
min_date, max_date = None, None
for values in data.values():
sorted_values = sorted(values, key=lambda d: arrow.get(d['T']))
if not min_date or sorted_values[0]['T'] < min_date:
min_date = sorted_values[0]['T']
if not max_date or sorted_values[-1]['T'] > max_date:
max_date = sorted_values[-1]['T']
return arrow.get(min_date), arrow.get(max_date)
def datetime_to_str(d):
"""
:param arrow.Arrow d:
:return str:
"""
datetime_str = d.to('utc').isoformat()
if datetime_str.endswith('+00:00'):
datetime_str = datetime_str[:-6] + 'Z'
return datetime_str
def str_to_datetime(s):
"""
:param str s:
:return arrow.Arrow:
"""
return arrow.get(s)
def datetime_to_timestamp(d):
"""
:param arrow.Arrow d: A datetime object.
:return float: An equivalent UNIX timestamp.
"""
return d.float_timestamp
def format_time(self, time: Union[str, Arrow]) -> str:
return time.format(TIME_FORMAT) if isinstance(time, Arrow) else arrow.utcnow().to(time).format(TIME_FORMAT)
def create_row(song):
return [
song.Path,
song.Title or 'Unknown',
song.AlbumArtist or 'Unknown',
song.Album or 'Unknown',
time_helper.seconds_to_string(song.Length),
song.Year or '0000',
arrow.Arrow(song.Added.year, song.Added.month, song.Added.day).humanize(),
song.Played,
song.Genre,
str(song.Tracknumber)
]
def msgpack_encoder(value):
""" Encoder used by msgpack serializer when an object is unknown.
This hook is basically used to serialize dates in Arrow objects.
"""
if isinstance(value, arrow.Arrow):
value = msgpack.ExtType(1, value.isoformat().encode())
return value
def msgpack_ext_decoder(code, data):
""" Decoded used by msgpack deserializer when an ext type is found.
This hook is basically used to deserialize dates in Arrow objects.
"""
if code == 1:
try:
return arrow.get(data.decode())
except arrow.parser.ParserError:
return arrow.Arrow.strptime(data.decode(), '%Y%m%dT%H:%M:%S.%f')
return msgpack.ExtType(code, data)
def get_month_name(cls, month_index):
date = arrow.Arrow(year=2000, day=1, month=month_index)
return date.strftime("%B")
def get_month_number(cls, month_index):
date = arrow.Arrow(year=2000, day=1, month=month_index)
return date.strftime("%m")
def tweets_for_month(self, year, month_number):
start_date = arrow.Arrow(year, month_number, 1)
end_date = start_date.shift(months=+1)
query = """
SELECT twitter_id, text, timestamp FROM tweets
WHERE (timestamp > ?) AND (timestamp < ?)
ORDER BY twitter_id ASC
"""
for row in self.query_many(query,
start_date.timestamp,
end_date.timestamp):
yield self.tweet_from_row(row)
def transform(cls, data):
"""
Compute InfluxDB query expression from data in transformation dictionary.
Also compute date range from query parameters "from" and "to".
"""
measurement = data.measurement
# Vanilla QL (v1)
#expression = 'SELECT * FROM {measurement}'.format(measurement=measurement)
# PyInfluxQL (v2)
# https://github.com/jjmalina/pyinfluxql
# Labs
#time_begin = arrow.utcnow() - arrow.Arrow(hour=1)
#expression = Query('*').from_(measurement).where(time__gt=datetime.utcnow() - timedelta(hours=1))
#expression = Query(Mean('*')).from_(measurement).where(time__gt=datetime.now() - timedelta(1)).group_by(time=timedelta(hours=1))
# Fix up "measurement" if starting with numeric value
# TODO: Fix should go to pyinfluxql
if is_number(measurement[0]):
measurement = '"{measurement}"'.format(measurement=measurement)
# TODO: Use ".date_range" API method
time_begin, time_end = compute_daterange(data.get('from'), data.get('to'))
tags = {}
#tags = InfluxDBAdapter.get_tags(data)
expression = Query('*').from_(measurement).where(time__gte=time_begin, time__lte=time_end, **tags)
result = {
'expression': str(expression),
'time_begin': time_begin,
'time_end': time_end,
}
return result
def stat(self, **kwargs):
"""Gets information about an object."""
url_endpoint, params, headers, _ = self._get_parameters(**kwargs)
resp = self.get_requests_session().head(
url_endpoint, params=params, headers=headers)
if resp.ok:
return location.LocationStat.from_keywords(
session=self._session,
location=self,
size=resp.headers["x-goog-stored-content-length"],
generation=resp.headers["x-goog-generation"],
created=arrow.Arrow(*(rfc822.parsedate(
resp.headers["Last-Modified"])[:7])).timestamp,
)
def _coerce_timestamp(value):
if isinstance(value, arrow.Arrow):
return value.float_timestamp
return float(value)