def __compile_policy_value(self, str_value, value_dict):
value = str_value
if not '$' in value:
return value
self.__logger.debug("Compiling value %s", value)
for key in value_dict:
self.__logger.debug("Searching for key %s", key)
val = value_dict[key]
if id(type) and type(val) in (datetime, date):
self.__logger.debug("Value in dictionary: %s -> %s", key, val)
val = time.mktime(self.__get_localtime(val).timetuple())
self.__logger.debug("Timestamp converted: %s -> %s", key, val)
value = value.replace(self.__get_compiled_key(key), str(val))
self.__logger.debug("Value after convertsion: %s", value)
int_value = int(eval(value))
compiled_value = self.__get_localtime(int_value)
self.__logger.debug("Compiled value: %s", compiled_value)
return compiled_value
python类mktime()的实例源码
def kill_invalid_connection():
unfinished_logs = Log.objects.filter(is_finished=False)
now = datetime.datetime.now()
now_timestamp = int(time.mktime(now.timetuple()))
for log in unfinished_logs:
try:
log_file_mtime = int(os.stat('%s.log' % log.log_path).st_mtime)
except OSError:
log_file_mtime = 0
if (now_timestamp - log_file_mtime) > 3600:
if log.login_type == 'ssh':
try:
os.kill(int(log.pid), 9)
except OSError:
pass
elif (now - log.start_time).days < 1:
continue
log.is_finished = True
log.end_time = now
log.save()
logger.warn('kill log %s' % log.log_path)
def _totimestamp(dt_obj):
"""
Args:
dt_obj (:class:`datetime.datetime`):
Returns:
int:
"""
if not dt_obj:
return None
try:
# Python 3.3+
return int(dt_obj.timestamp())
except AttributeError:
# Python 3 (< 3.3) and Python 2
return int(mktime(dt_obj.timetuple()))
def history_get(self,item_ID,date_from,date_till):
'''
return history of item
[eg1]#zabbix_api history_get 23296 "2016-08-01 00:00:00" "2016-09-01 00:00:00"
[note]The date_till time must be within the historical data retention time
'''
dateFormat = "%Y-%m-%d %H:%M:%S"
try:
startTime = time.strptime(date_from,dateFormat)
endTime = time.strptime(date_till,dateFormat)
except:
err_msg("???? ['2016-05-01 00:00:00'] ['2016-06-01 00:00:00']")
time_from = int(time.mktime(startTime))
time_till = int(time.mktime(endTime))
history_type=self.__item_search(item_ID)
self.__history_get(history_type,item_ID,time_from,time_till)
def timestamp_normalized(self):
"""
we're expecting self.timestamp to be either a long, int, a datetime, or a timedelta
:return:
"""
if not self.timestamp:
return None
if isinstance(self.timestamp, six.integer_types):
return self.timestamp
if isinstance(self.timestamp, timedelta):
tmp = datetime.now() + self.timestamp
else:
tmp = self.timestamp
return int(time.mktime(tmp.timetuple()) * 1e+6 + tmp.microsecond)
def next_time(time_string):
try:
parsed = list(time.strptime(time_string, "%H:%M"))
except (TypeError, ValueError):
return float(time_string)
now = time.localtime()
current = list(now)
current[3:6] = parsed[3:6]
current_time = time.time()
delta = time.mktime(current) - current_time
if delta <= 0.0:
current[2] += 1
return time.mktime(current) - current_time
return delta
def get_creation_date(user):
client_id = 'jzkbprff40iqj646a697cyrvl0zt2m6'
headers = { 'Client-ID' : client_id }
# Loop ends when a value is returned
while 1:
# Uses try in case of request timeout
try:
r = requests.get('https://api.twitch.tv/kraken/users/{}'.format(user), headers = headers)
except:
time.sleep(1)
continue
if r.status_code == 200:
# Captures only YYYY-MM-DD
date = re.match(
'([\d]{4}-[\d]{2}-[\d]{2})',
json.loads(r.text)['created_at']
)
epoch = datetime.datetime.strptime("{}".format(date.group(1)) , "%Y-%m-%d")
epoch = int(time.mktime(epoch.timetuple()) / 3600)
# except:
# print('Failed to get time')
# return
return epoch
def getDateSent(self):
"""Get the time of sending from the Date header
Returns a time object using time.mktime. Not very reliable, because
the Date header can be missing or spoofed (and often is, by spammers).
Throws a MessageDateError if the Date header is missing or invalid.
"""
dh = self.getheader('Date')
if dh == None:
return None
try:
return time.mktime(rfc822.parsedate(dh))
except ValueError:
raise MessageDateError("message has missing or bad Date")
except TypeError: # gets thrown by mktime if parsedate returns None
raise MessageDateError("message has missing or bad Date")
except OverflowError:
raise MessageDateError("message has missing or bad Date")
def db_value(self, value):
if value is None:
return
if isinstance(value, datetime.datetime):
pass
elif isinstance(value, datetime.date):
value = datetime.datetime(value.year, value.month, value.day)
else:
return int(round(value * self.resolution))
if self.utc:
timestamp = calendar.timegm(value.utctimetuple())
else:
timestamp = time.mktime(value.timetuple())
timestamp += (value.microsecond * .000001)
if self.resolution > 1:
timestamp *= self.resolution
return int(round(timestamp))
def morsel_to_cookie(morsel):
"""Convert a Morsel object into a Cookie containing the one k/v pair."""
expires = None
if morsel['max-age']:
expires = time.time() + morsel['max-age']
elif morsel['expires']:
time_template = '%a, %d-%b-%Y %H:%M:%S GMT'
expires = time.mktime(
time.strptime(morsel['expires'], time_template)) - time.timezone
return create_cookie(
comment=morsel['comment'],
comment_url=bool(morsel['comment']),
discard=False,
domain=morsel['domain'],
expires=expires,
name=morsel.key,
path=morsel['path'],
port=None,
rest={'HttpOnly': morsel['httponly']},
rfc2109=False,
secure=bool(morsel['secure']),
value=morsel.value,
version=morsel['version'] or 0,
)
def getYYYYMMDD(minLen=1, returnTimeStamp=False, returnDateTime=False, alternateValue=None):
if Cmd.ArgumentsRemaining():
argstr = Cmd.Current().strip()
if argstr:
if alternateValue is not None and argstr.lower() == alternateValue.lower():
Cmd.Advance()
return None
if argstr[0] in [u'+', u'-']:
argstr = getDeltaDate(argstr).strftime(YYYYMMDD_FORMAT)
try:
dateTime = datetime.datetime.strptime(argstr, YYYYMMDD_FORMAT)
Cmd.Advance()
if returnTimeStamp:
return time.mktime(dateTime.timetuple())*1000
if returnDateTime:
return dateTime
return argstr
except ValueError:
invalidArgumentExit(YYYYMMDD_FORMAT_REQUIRED)
elif minLen == 0:
Cmd.Advance()
return u''
missingArgumentExit(YYYYMMDD_FORMAT_REQUIRED)
def time_parse(self, s):
try:
epoch = int(s)
return epoch
except ValueError:
pass
try:
epoch = int(time.mktime(time.strptime(s, '%Y-%m-%d')))
return epoch
except ValueError:
pass
try:
epoch = int(time.mktime(time.strptime(s, '%Y-%m-%d %H:%M:%S')))
return epoch
except ValueError:
pass
raise ValueError('Invalid time: "%s"' % s)
def seconds_until(until=9):
""" Counts the seconds until it is a certain hour again.
Keyword Arguments:
until (int): the hour we want to count to (default: {9})
Returns:
(float): how many seconds until the specified time.
"""
now = time.localtime()
now_sec = time.mktime(now)
if now.tm_hour >= until:
delta = (until * 60 * 60) \
+ (60 * 60 * (24 - now.tm_hour)) \
- (60 * now.tm_min) \
- (now.tm_sec)
else:
delta = (until * 60 * 60) \
- (60 * 60 * now.tm_hour) \
- (60 * now.tm_min) \
- (now.tm_sec)
then = time.localtime(now_sec + delta)
return time.mktime(then) - time.time()
def test_crontab():
c = Crontab()
c.add('boo')
c.add('foo', 0)
c.add('bar', [1, 3], -5, -1, -1, 0)
assert c.actions(0, 1, 1, 1, 1) == {'boo', 'foo'}
assert c.actions(1, 1, 1, 1, 1) == {'boo'}
assert c.actions(1, 5, 1, 1, 7) == {'boo', 'bar'}
assert c.actions(3, 5, 1, 1, 7) == {'boo', 'bar'}
ts = mktime(datetime(2016, 1, 17, 5, 1).timetuple())
assert c.actions_ts(ts) == {'boo', 'bar'}
def _get_date_and_size(zip_stat):
size = zip_stat.file_size
# ymdhms+wday, yday, dst
date_time = zip_stat.date_time + (0, 0, -1)
# 1980 offset already done
timestamp = time.mktime(date_time)
return timestamp, size
def _get_date_and_size(zip_stat):
size = zip_stat.file_size
# ymdhms+wday, yday, dst
date_time = zip_stat.date_time + (0, 0, -1)
# 1980 offset already done
timestamp = time.mktime(date_time)
return timestamp, size
def scroll(query, begin, until, prefix=None):
diff = timedelta(minutes=4)
while begin < until:
to = min(begin + diff, until)
res = DB.query(query % (pad(begin), pad(to)))
for batch in res:
for row in batch:
# truncate longer ids to match with shorter host names
if "container_id" in row:
row["container_id"] = row["container_id"][0:11]
time_col = row["time"][0:min(26, len(row["time"]) - 1)]
if len(time_col) == 19:
t = time.strptime(time_col, "%Y-%m-%dT%H:%M:%S")
else:
t = time.strptime(time_col, "%Y-%m-%dT%H:%M:%S.%f")
if prefix is not None:
for key in row.iterkeys():
if (key not in SKIP_PREFIX) and ((prefix + "|") not in key):
row[APP_METRIC_DELIMITER.join((prefix, key))] = row.pop(key)
yield (time.mktime(t), row)
begin = to
def parse_retry_after(self, retry_after):
# Whitespace: https://tools.ietf.org/html/rfc7230#section-3.2.4
if re.match(r"^\s*[0-9]+\s*$", retry_after):
seconds = int(retry_after)
else:
retry_date_tuple = email.utils.parsedate(retry_after)
if retry_date_tuple is None:
raise InvalidHeader("Invalid Retry-After header: %s" % retry_after)
retry_date = time.mktime(retry_date_tuple)
seconds = retry_date - time.time()
if seconds < 0:
seconds = 0
return seconds
def _get_date_and_size(zip_stat):
size = zip_stat.file_size
# ymdhms+wday, yday, dst
date_time = zip_stat.date_time + (0, 0, -1)
# 1980 offset already done
timestamp = time.mktime(date_time)
return timestamp, size
def _get_date_and_size(zip_stat):
size = zip_stat.file_size
# ymdhms+wday, yday, dst
date_time = zip_stat.date_time + (0, 0, -1)
# 1980 offset already done
timestamp = time.mktime(date_time)
return timestamp, size
def uptime(args):
with os.popen("docker inspect -f '{{json .State}}' " + args.container + " 2>&1") as pipe:
status = pipe.read().strip()
if "No such image or container" in status:
print "0"
else:
statusjs = json.loads(status)
if statusjs["Running"]:
uptime = statusjs["StartedAt"]
start = time.strptime(uptime[:19], "%Y-%m-%dT%H:%M:%S")
print int(time.time() - time.mktime(start))
else:
print "0"
# get the approximate disk usage
# alt docker inspect -s -f {{.SizeRootFs}} 49219085bdaa
# alt docker exec " + args.container + " du -s -b / 2> /dev/null
def convert(timeString):
if timeString == 'now':
return now()
_sets = timeString.split(':')
if len(_sets) != 7:
return CF.UTCTime(0,0,0)
_year, _month, _day, _blank, _hours, _minutes, _seconds = timeString.split(':')
_full_seconds = float(_seconds)
_time = time.mktime((int(_year),int(_month),int(_day),int(_hours),int(_minutes),int(_full_seconds),0,0,0))-time.timezone
return CF.UTCTime(1, _time, _full_seconds - int(_full_seconds))
# Break out the whole seconds into a GMT time
# Insert the arithmetic functions as operators on the PrecisionUTCTime class
def purge_expired_peers():
"""
Removes peers who haven't announced in the last internval.
Should be set as a recurring event source in your Zappa config.
"""
if DATASTORE == "DynamoDB":
# This is a costly operation, but I think it has to be done.
# Optimizations (pagination? queries? batching?) welcomed.
all_torrents = table.scan()
for torrent in all_torrents['Items']:
for peer_id in torrent['peers']:
peer_last_announce = int(torrent['peers'][peer_id][0]['last_announce'])
window = datetime.now() - timedelta(seconds=ANNOUNCE_INTERVAL)
window_unix = int(time.mktime(window.timetuple()))
if peer_last_announce < window_unix:
remove_peer_from_info_hash(torrent['info_hash'], peer_id)
else:
# There must be a better way to do this.
# Also, it should probably be done as a recurring function and cache,
# not dynamically every time.
for key in s3_client.list_objects(Bucket=BUCKET_NAME)['Contents']:
if 'peers.json' in key['Key']:
remote_object = s3.Object(BUCKET_NAME, key['Key']).get()
content = remote_object['Body'].read().decode('utf-8')
torrent = json.loads(content)
for peer_id in torrent['peers']:
peer_last_announce = int(torrent['peers'][peer_id]['last_announce'])
window = datetime.now() - timedelta(seconds=ANNOUNCE_INTERVAL)
window_unix = int(time.mktime(window.timetuple()))
if peer_last_announce < window_unix:
remove_peer_from_info_hash(torrent['info_hash'], peer_id)
return
##
# Database
##
def expireat(self, name, when):
"""
Set an expire flag on key ``name``. ``when`` can be represented
as an integer indicating unix time or a Python datetime object.
"""
if isinstance(when, datetime.datetime):
when = int(mod_time.mktime(when.timetuple()))
return self.execute_command('EXPIREAT', name, when)
def pexpireat(self, name, when):
"""
Set an expire flag on key ``name``. ``when`` can be represented
as an integer representing unix time in milliseconds (unix time * 1000)
or a Python datetime object.
"""
if isinstance(when, datetime.datetime):
ms = int(when.microsecond / 1000)
when = int(mod_time.mktime(when.timetuple())) * 1000 + ms
return self.execute_command('PEXPIREAT', name, when)
def _handle_header_line(self, line):
m = re.search("\\#(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+server\\s+id\\s+\\d+", line)
datetime_str = "%s %s" % (m.group(1), m.group(2))
dt = datetime.datetime.strptime(datetime_str, '%y%m%d %H:%M:%S')
new_header_timestamp = int(time.mktime(dt.timetuple()))
self.header_timestamp = new_header_timestamp
def createTimeStamp(datestr, format="%Y-%m-%d %H:%M:%S"):
return time.mktime(time.strptime(datestr, format))
def getdate(self, name):
"""Retrieve a date field from a header.
Retrieves a date field from the named header, returning a tuple
compatible with time.mktime().
"""
try:
data = self[name]
except KeyError:
return None
return parsedate(data)
def getdate_tz(self, name):
"""Retrieve a date field from a header as a 10-tuple.
The first 9 elements make up a tuple compatible with time.mktime(),
and the 10th is the offset of the poster's time zone from GMT/UTC.
"""
try:
data = self[name]
except KeyError:
return None
return parsedate_tz(data)
# Access as a dictionary (only finds *last* header of each type):
def mktime_tz(data):
"""Turn a 10-tuple as returned by parsedate_tz() into a UTC timestamp."""
if data[9] is None:
# No zone info, so localtime is better assumption than GMT
return time.mktime(data[:8] + (-1,))
else:
t = time.mktime(data[:8] + (0,))
return t - data[9] - time.timezone