def getDateSent(self):
"""Get the time of sending from the Date header
Returns a time object using time.mktime. Not very reliable, because
the Date header can be missing or spoofed (and often is, by spammers).
Throws a MessageDateError if the Date header is missing or invalid.
"""
dh = self.getheader('Date')
if dh == None:
return None
try:
return time.mktime(rfc822.parsedate(dh))
except ValueError:
raise MessageDateError("message has missing or bad Date")
except TypeError: # gets thrown by mktime if parsedate returns None
raise MessageDateError("message has missing or bad Date")
except OverflowError:
raise MessageDateError("message has missing or bad Date")
python类parsedate()的实例源码
def parseRFC822Time(t):
return mktime(parsedate(t))
def GetCreatedAtInSeconds(self):
'''Get the time this status message was posted, in seconds since the epoch.
Returns:
The time this status message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def GetCreatedAtInSeconds(self):
'''Get the time this direct message was posted, in seconds since the epoch.
Returns:
The time this direct message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def search_BEFORE(self, query, id, msg):
date = parseTime(query.pop(0))
return rfc822.parsedate(msg.getInternalDate()) < date
def search_ON(self, query, id, msg):
date = parseTime(query.pop(0))
return rfc822.parsedate(msg.getInternalDate()) == date
def search_SENTBEFORE(self, query, id, msg):
date = msg.getHeader(False, 'date').get('date', '')
date = rfc822.parsedate(date)
return date < parseTime(query.pop(0))
def search_SENTON(self, query, id, msg):
date = msg.getHeader(False, 'date').get('date', '')
date = rfc822.parsedate(date)
return date[:3] == parseTime(query.pop(0))[:3]
def search_SENTSINCE(self, query, id, msg):
date = msg.getHeader(False, 'date').get('date', '')
date = rfc822.parsedate(date)
return date > parseTime(query.pop(0))
def extract_headers_metadata(self, headers):
headers = [(stringify(k), stringify(v)) for k, v in headers]
self.result.headers = dict(headers)
for field, value in headers:
field = field.lower()
if field is None or value is None:
continue
if field == 'subject':
self.update('title', value)
if field == 'message-id':
self.update('id', value)
if field == 'date':
try:
date = rfc822.parsedate(value)
date = datetime.fromtimestamp(mktime(date))
self.update('created_at', date)
except Exception as ex:
log.warning("Failed to parse [%s]: %s", date, ex)
if field == 'from':
addr = address.parse(value)
if addr is not None:
author = stringify(addr.display_name)
email = stringify(addr.address)
self.result.emails.append(email)
if author is not None and author != email:
self.update('author', author)
self.result.entities.append(author)
if field in ['to', 'cc', 'bcc']:
for addr in address.parse_list(value):
name = stringify(addr.display_name)
email = stringify(addr.address)
self.result.emails.append(email)
if name is not None and name != email:
self.result.entities.append(name)
def search_BEFORE(self, query, id, msg):
date = parseTime(query.pop(0))
return rfc822.parsedate(msg.getInternalDate()) < date
def search_ON(self, query, id, msg):
date = parseTime(query.pop(0))
return rfc822.parsedate(msg.getInternalDate()) == date
def search_SENTBEFORE(self, query, id, msg):
date = msg.getHeader(False, 'date').get('date', '')
date = rfc822.parsedate(date)
return date < parseTime(query.pop(0))
def search_SENTON(self, query, id, msg):
date = msg.getHeader(False, 'date').get('date', '')
date = rfc822.parsedate(date)
return date[:3] == parseTime(query.pop(0))[:3]
def search_SENTSINCE(self, query, id, msg):
date = msg.getHeader(False, 'date').get('date', '')
date = rfc822.parsedate(date)
return date > parseTime(query.pop(0))
def GetCreatedAtInSeconds(self):
'''Get the time this status message was posted, in seconds since the epoch.
Returns:
The time this status message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def GetCreatedAtInSeconds(self):
'''Get the time this direct message was posted, in seconds since the epoch.
Returns:
The time this direct message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def MaximumHitFrequency(self):
'''Determines the minimum number of seconds that a program must wait
before hitting the server again without exceeding the rate_limit
imposed for the currently authenticated user.
Returns:
The minimum second interval that a program must use so as to not
exceed the rate_limit imposed for the user.
'''
rate_status = self.GetRateLimitStatus()
reset_time = rate_status.get('reset_time', None)
limit = rate_status.get('remaining_hits', None)
if reset_time:
# put the reset time into a datetime object
reset = datetime.datetime(*rfc822.parsedate(reset_time)[:7])
# find the difference in time between now and the reset time + 1 hour
delta = reset + datetime.timedelta(hours=1) - datetime.datetime.utcnow()
if not limit:
return int(delta.seconds)
# determine the minimum number of seconds allowed as a regular interval
max_frequency = int(delta.seconds / limit) + 1
# return the number of seconds
return max_frequency
return 60
def GetCreatedAtInSeconds(self):
'''Get the time this status message was posted, in seconds since the epoch.
Returns:
The time this status message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def GetCreatedAtInSeconds(self):
'''Get the time this direct message was posted, in seconds since the epoch.
Returns:
The time this direct message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def MaximumHitFrequency(self):
'''Determines the minimum number of seconds that a program must wait
before hitting the server again without exceeding the rate_limit
imposed for the currently authenticated user.
Returns:
The minimum second interval that a program must use so as to not
exceed the rate_limit imposed for the user.
'''
rate_status = self.GetRateLimitStatus()
reset_time = rate_status.get('reset_time', None)
limit = rate_status.get('remaining_hits', None)
if reset_time:
# put the reset time into a datetime object
reset = datetime.datetime(*rfc822.parsedate(reset_time)[:7])
# find the difference in time between now and the reset time + 1 hour
delta = reset + datetime.timedelta(hours=1) - datetime.datetime.utcnow()
if not limit:
return int(delta.seconds)
# determine the minimum number of seconds allowed as a regular interval
max_frequency = int(delta.seconds / limit) + 1
# return the number of seconds
return max_frequency
return 60
def created_at_in_seconds(self):
""" Get the time this status message was posted, in seconds since
the epoch (1 Jan 1970).
Returns:
int: The time this status message was posted, in seconds since
the epoch.
"""
return timegm(parsedate(self.created_at))
def GetCreatedAtInSeconds(self):
'''Get the time this status message was posted, in seconds since the epoch.
Returns:
The time this status message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def GetCreatedAtInSeconds(self):
'''Get the time this direct message was posted, in seconds since the epoch.
Returns:
The time this direct message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def MaximumHitFrequency(self):
'''Determines the minimum number of seconds that a program must wait
before hitting the server again without exceeding the rate_limit
imposed for the currently authenticated user.
Returns:
The minimum second interval that a program must use so as to not
exceed the rate_limit imposed for the user.
'''
rate_status = self.GetRateLimitStatus()
reset_time = rate_status.get('reset_time', None)
limit = rate_status.get('remaining_hits', None)
if reset_time:
# put the reset time into a datetime object
reset = datetime.datetime(*rfc822.parsedate(reset_time)[:7])
# find the difference in time between now and the reset time + 1 hour
delta = reset + datetime.timedelta(hours=1) - datetime.datetime.utcnow()
if not limit:
return int(delta.seconds)
# determine the minimum number of seconds allowed as a regular interval
max_frequency = int(delta.seconds / limit) + 1
# return the number of seconds
return max_frequency
return 60
def GetCreatedAtInSeconds(self):
'''Get the time this status message was posted, in seconds since the epoch.
Returns:
The time this status message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def GetCreatedAtInSeconds(self):
'''Get the time this direct message was posted, in seconds since the epoch.
Returns:
The time this direct message was posted, in seconds since the epoch.
'''
return calendar.timegm(rfc822.parsedate(self.created_at))
def MaximumHitFrequency(self):
'''Determines the minimum number of seconds that a program must wait
before hitting the server again without exceeding the rate_limit
imposed for the currently authenticated user.
Returns:
The minimum second interval that a program must use so as to not
exceed the rate_limit imposed for the user.
'''
rate_status = self.GetRateLimitStatus()
reset_time = rate_status.get('reset_time', None)
limit = rate_status.get('remaining_hits', None)
if reset_time:
# put the reset time into a datetime object
reset = datetime.datetime(*rfc822.parsedate(reset_time)[:7])
# find the difference in time between now and the reset time + 1 hour
delta = reset + datetime.timedelta(hours=1) - datetime.datetime.utcnow()
if not limit:
return int(delta.seconds)
# determine the minimum number of seconds allowed as a regular interval
max_frequency = int(delta.seconds / limit) + 1
# return the number of seconds
return max_frequency
return 60
def stat(self, **kwargs):
"""Gets information about an object."""
url_endpoint, params, headers, _ = self._get_parameters(**kwargs)
resp = self.get_requests_session().head(
url_endpoint, params=params, headers=headers)
if resp.ok:
return location.LocationStat.from_keywords(
session=self._session,
location=self,
size=resp.headers["x-goog-stored-content-length"],
generation=resp.headers["x-goog-generation"],
created=arrow.Arrow(*(rfc822.parsedate(
resp.headers["Last-Modified"])[:7])).timestamp,
)