def buildtime():
'''
Return the build date and time.
CLI Example:
.. code-block:: bash
salt '*' rpmpck.buildtime
'''
installtime = lambda rpm_date: time.strptime(rpm_date, "%c")
cmd = ['rpm', '-qa', '--queryformat', r'%{INSTALLTID:date}\n']
out = __salt__['cmd.run'](cmd, output_loglevel='trace',
python_shell=False).splitlines()
first = min(installtime(rpm_date) for rpm_date in out)
return time.asctime(first)
python类asctime()的实例源码
def lastupdate():
'''
Return the date of the last rpm package update/installation.
CLI Example:
.. code-block:: bash
salt '*' rpmlibpkg.lastupdate
'''
installdate = lambda h: h.sprintf("%{INSTALLTID:date}")
installptime = lambda h: time.strptime(installdate(h), "%c")
ts = rpm.TransactionSet()
mi = ts.dbMatch()
last = max(installptime(h) for h in mi)
return time.asctime(last)
def buildtime():
'''
Return the build date and time.
CLI Example:
.. code-block:: bash
salt '*' rpmlibpkg.buildtime
'''
installdate = lambda h: h.sprintf("%{INSTALLTID:date}")
installptime = lambda h: time.strptime(installdate(h), "%c")
ts = rpm.TransactionSet()
mi = ts.dbMatch()
first = min(installptime(h) for h in mi)
return time.asctime(first)
def error_log(self, error, station_name=None):
if os.path.isfile('error_log.txt'):
file = open('error_log.txt', 'a')
else:
file = open('error_log.txt', 'w')
if not station_name:
station_name = 'NO-STATION-NAME'
# Create time stamp
time_stamp = time.asctime(time.localtime(time.time()))
# Form error message 'Station == Time == Error'
error_msg = str(station_name) + '\t' + time_stamp + '\t' + str(error) + '\n'
file.write(error_msg)
file.close()
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def add_acl(ip):
'''
add an entry to the ACL. look at success or not of the commands
:param ip:
:return:
'''
UPDATE_ACL_COMMANDS = """
ip access-list extended %s
no deny ip any any
remark %s
permit ip any host %s
deny ip any any
"""
localtime = time.asctime(time.localtime(time.time()))
remark = "Added %s @%s" % (ip, localtime)
responses = configure(UPDATE_ACL_COMMANDS % (ACLNAME, remark, ip))
success = reduce(lambda x, y: x and y, [r.success for r in responses])
status = "Success" if success else "Fail"
log("adding IP: %s to ACL: status: %s" % (ip, status), 5)
def process(re_table, apply_change):
exec_commands = []
re_table.Reset()
output = cli("show int | inc Last in|line pro")
#print (output)
localtime = time.asctime(time.localtime(time.time()))
description = "description PCIShutdown: %s" % localtime
fsm_results = re_table.ParseText(output)
for interface in fsm_results:
# skip non Ethernet
if is_idle(interface[2], interface[3]) and ("Ethernet" in interface[0]):
if interface[1] != "administratively":
if apply_change:
exec_commands.extend(['interface ' + interface[0], description, 'shutdown'])
else:
print("(testmode) would have SHUT %s (%s,%s)" % (interface[0], interface[2], interface[3]))
print('Commands to run:')
print(exec_commands)
apply_commands(exec_commands)
def compute(i, n, task=None):
import time
yield task.sleep(n)
raise StopIteration((i, task.location, time.asctime())) # result of 'compute' is current time
# client (local) task submits computations
def compute(n, task=None):
import time
yield task.sleep(n)
raise StopIteration(time.asctime()) # result of 'compute' is current time
def compute(i, n, task=None):
import time
yield task.sleep(n)
raise StopIteration((i, task.location, time.asctime())) # result of 'compute' is current time
# client (local) task submits computations
def compute(n, task=None):
import time
yield task.sleep(n)
raise StopIteration(time.asctime()) # result of 'compute' is current time
def compute(i, n, task=None):
import time
yield task.sleep(n)
raise StopIteration((i, task.location, time.asctime())) # result of 'compute' is current time
# client (local) task submits computations
def _worker_thread_loop(self):
while self.go:
try:
try:
(x, regmod_path) = self.queue.get(timeout=0.5)
except Queue.Empty:
continue
# TODO -- could comment this out if you want CSV data to feed into something
print "--> Attempting for %s" % regmod_path
# Go Grab it if we think we have something!
sensor_id = x.env.endpoint.SensorId
hostname = x.env.endpoint.SensorHostName
# TODO -- this could use some concurrency and work queues because we could wait a while for
# each of these to get established and retrieve the value
# Establish our CBLR session if necessary!
lrh = self._create_lr_session_if_necessary(sensor_id)
data = lrh.get_registry_value(regmod_path)
print "%s,%s,%d,%s,%s,%s" % ( time.asctime(),
hostname,
sensor_id,
x.header.process_path,
regmod_path,
data.get('value_data', "") if data else "<UNKNOWN>")
# TODO -- could *do something* here, like if it is for autoruns keys then go check the signature status
# of the binary at the path pointed to, and see who wrote it out, etc
except:
traceback.print_exc()
def generate_cookie(name, securekey):
#print (">> generate cookie for %s" % name)
content = { 'name':name, 'login-time': time.asctime() }
text = json.dumps(content)
part1 = base64.b64encode(text.encode('ascii'))
part2 = hashlib.md5( (text+securekey).encode('ascii') ).hexdigest()
# part1 is binary(ascii) and part2 is str(utf-8)
cookie = str(part1, encoding='utf-8') +"."+ part2
#print ("cookie : %s" % cookie)
return cookie
def print_io(inputs, outputs):
if inputs:
print(time.asctime(), "Inputs:", inputs)
if outputs:
print(time.asctime(), "Outputs:", outputs)
if not inputs and not outputs:
print(time.asctime(), "No inputs/outputs?")
#--------------------------- graph traversal stuff ---------------------------
def set_from(self, from_, time_=None):
"""Set "From " line, formatting and appending time_ if specified."""
if time_ is not None:
if time_ is True:
time_ = time.gmtime()
from_ += ' ' + time.asctime(time_)
self._from = from_
def _worker_thread_loop(self):
while self.go:
try:
try:
(x, regmod_path) = self.queue.get(timeout=0.5)
except Queue.Empty:
continue
# TODO -- could comment this out if you want CSV data to feed into something
print "--> Attempting for %s" % regmod_path
# Go Grab it if we think we have something!
sensor_id = x.env.endpoint.SensorId
hostname = x.env.endpoint.SensorHostName
# TODO -- this could use some concurrency and work queues because we could wait a while for
# each of these to get established and retrieve the value
# Establish our CBLR session if necessary!
lrh = self._create_lr_session_if_necessary(sensor_id)
data = lrh.get_registry_value(regmod_path)
print "%s,%s,%d,%s,%s,%s" % ( time.asctime(),
hostname,
sensor_id,
x.header.process_path,
regmod_path,
data.get('value_data', "") if data else "<UNKNOWN>")
# TODO -- could *do something* here, like if it is for autoruns keys then go check the signature status
# of the binary at the path pointed to, and see who wrote it out, etc
except:
traceback.print_exc()
def timestampToString(value):
return asctime(time.gmtime(max(0, value + epoch_diff)))
def add(self, eventclass="note", message="Your message here"):
"Compose a log entry from the elements passed to add() and append it to the list of events."
import time
event = self.datetime() + ' ' + eventclass + ' ' + message
if ( self.debug == "on" ):
print 'DEBUG: Adding', event, 'to log', self.name
self.modified = time.asctime(time.localtime(time.time()))
self.events.append(event)
return
def datetime(self):
"Generate the date/time stamp used in our log entries"
import time
datestamp = time.asctime(time.localtime(time.time()))
return datestamp