def _run_test(self, fmt_key):
conv=conversions[fmt_key]
config.strConfig(logcfg+conv[0])
self.logger=logging.getLogger('')
self.console=self.logger.handlers[0]
self.console.stream=self.tfile
pval=time.strftime(conv[1])
#
self.logger.info('test1')
self.tfile.seek(0)
logline=self.tfile.read()
logline=logline.strip()
if len(conv) > 2:
logline = logline.split(conv[2])[0]
pval = pval.split(conv[2])[0]
self.assertEquals( pval, logline)
python类strftime()的实例源码
libmilter.py 文件源码
项目:sipxecs-voicemail-transcription
作者: andrewsauder
项目源码
文件源码
阅读 45
收藏 0
点赞 0
评论 0
def debug(msg , level=1 , protId=0):
if not DEBUG: return
if level <= DEBUG:
out = '[%s] DEBUG: ' % time.strftime('%H:%M:%S')
if protId:
out += 'ID: %d ; ' % protId
out += msg
print(out, file=sys.stderr)
# }}}
# Response Constants {{{
#
# Constants for responses back to the MTA. You should use these actions
# at the end of each callback. If none of these are specified,
# CONTINUE is used as the default
#
def _run_test(self, fmt_key):
conv=conversions[fmt_key]
config.strConfig(logcfg+conv[0])
self.logger=logging.getLogger('')
self.console=self.logger.handlers[0]
self.console.stream=self.tfile
pval=time.strftime(conv[1])
#
self.logger.info('test1')
self.tfile.seek(0)
logline=self.tfile.read()
logline=logline.strip()
if len(conv) > 2:
logline = logline.split(conv[2])[0]
pval = pval.split(conv[2])[0]
self.assertEquals( pval, logline)
def log(log_subsys, log_message, log_type='info', log_data=None):
current_time = time.time()
# form log entry dictionary
log_entry = {
'time' : current_time,
'subsys' : log_subsys,
'type' : log_type,
'message' : log_message,
}
if log_data is not None:
log_dict = dict(log_entry, **log_data)
else:
log_dict = log_entry
if Logger.debug:
print("LOG {:s} | {:s}".format(time.strftime("%H:%M:%S", time.localtime(current_time)), log_message))
# attempt to place in queue
try:
Logger.log_queue.put(log_dict)
except Queue.Full as e:
sys.stderr.write('Warning: log queue full, discarding message: "{:s}"\n'.format(log_message))
def construct_csr_matrix_from_data_and_nodes(f,nodes,blacklisted_nodes,remove_diag=True):
print "GenomeDISCO | "+strftime("%c")+" | processing: Loading interaction data from "+f
total_nodes=len(nodes.keys())
i=[]
j=[]
v=[]
#print strftime("%c")
c=0
for line in gzip.open(f):
items=line.strip().split('\t')
n1,n2,val=nodes[items[0]]['idx'],nodes[items[1]]['idx'],float(items[2])
i.append(n1)
j.append(n2)
v.append(val)
c+=1
csr_m=csr_matrix( (v,(i,j)), shape=(total_nodes,total_nodes),dtype=float)
if remove_diag:
csr_m.setdiag(0)
return filter_nodes(csr_m,blacklisted_nodes)
def random_walks_by_chunk_get_score_sparse_matrix(mym1,mym2,tmin,tmax,nonzero_total,chunksize):
scores=[]
n=mym1.shape[0]
m1_t=mym1.transpose()
m2_t=mym2.transpose()
mat_names[1]='mats'
for t in range(1,(tmax+1)):
if t!=1:
compute_current_matrices(t,mat_names)
if t>=tmin:
pass
#scores.append(1.0*abs_diff_by_chunk_sparse_matrix(t)/nonzero_total)
print 'done '+str(t)+' '+strftime("%c")
return scores
def random_walks_by_chunk_get_score(mym1,mym2,tmin,tmax,nonzero_total,chunksize):
scores=[]
hdf5_names={}
n=mym1.shape[0]
m1_t=mym1.transpose()
m2_t=mym2.transpose()
#write the ms into hdf5s
#todo: make name more specific
#print 'filling hdf5 '+strftime("%c")
hdf5_names[1]='hdf5s'
fill_hdf5_with_sparse_by_chunk(mym1,mym2,hdf5_names[1],chunksize)
for t in range(1,(tmax+1)):
if t!=1:
hdf5_names[t]='hdf5s_'+str(t)
#t=1, t=(t-1) and the new t=t that we want to compute
multiply_by_chunk(hdf5_names[1],hdf5_names[t-1],hdf5_names[t],chunksize)
if t>=tmin:
scores.append(1.0*abs_diff_by_chunk(hdf5_names[t],'m1','m2',chunksize)/nonzero_total)
print 'done '+str(t)+' '+strftime("%c")
return scores
def disconnect(self):
flag = self.get_conn()
if len(flag) == 1:
handle = flag[0][0]
dialname = str(flag[0][1])
try:
win32ras.HangUp(handle)
self.saveData(False, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
logger.info("??" + dialname + "????")
return True
except Exception as e:
logger.info(dialname + "???????" + str(e.message))
# disconnect()
else:
logger.info("?????????????")
# ????????
def __init__(self, configuration):
self.client_queue = multiprocessing.Queue(0)
self.apply_patch()
self.logger = self.init_logger()
if ["debug", "html", "content_type", "notify", "ports"] not in configuration:
raise PJFMissingArgument()
if configuration.debug:
print("[\033[92mINFO\033[0m] Starting HTTP ({0}) and HTTPS ({1}) built-in server...".format(
configuration.ports["servers"]["HTTP_PORT"],
configuration.ports["servers"]["HTTPS_PORT"]
))
if not configuration.content_type:
configuration.content_type = False
if not configuration.content_type:
configuration.content_type = "application/json"
self.config = configuration
self.json = PJFFactory(configuration)
self.https = SSLWSGIRefServer(host="0.0.0.0", port=self.config.ports["servers"]["HTTPS_PORT"])
self.http = WSGIRefServer(host="0.0.0.0", port=self.config.ports["servers"]["HTTP_PORT"])
self.httpsd = multiprocessing.Process(target=run, kwargs={"server": self.https, "quiet": True})
self.httpd = multiprocessing.Process(target=run, kwargs={"server": self.http, "quiet": True})
if self.config.fuzz_web:
self.request_checker = Thread(target=self.request_pool, args=())
self.logger.debug("[{0}] - PJFServer successfully initialized".format(time.strftime("%H:%M:%S")))
def __init__(self, configuration):
"""
Init the ProcessMonitor server
"""
self.logger = self.init_logger()
if ["debug", "ports", "process_to_monitor"] not in configuration:
raise PJFMissingArgument()
self.config = configuration
self.process = None
self.finished = False
self.testcase_count = 0
if self.config.debug:
print("[\033[92mINFO\033[0m] Starting process monitoring...")
print("[\033[92mINFO\033[0m] Starting Testcase Server ({0})...".format(
self.config.ports["servers"]["TCASE_PORT"]
))
super(PJFProcessMonitor, self).__init__(configuration)
self.logger.debug("[{0}] - PJFProcessMonitor successfully completed".format(time.strftime("%H:%M:%S")))
def __init__(self, configuration):
"""
Class that represent a JSON object
"""
self.logger = self.init_logger()
if ["json", "json_file", "strong_fuzz", "parameters", "exclude_parameters", "url_encode", "indent",
"utf8"] not in configuration:
raise PJFMissingArgument("Some arguments are missing from PJFFactory object")
self.config = configuration
self.mutator = PJFMutation(self.config)
other = self.config.json
if not self.config.strong_fuzz:
if type(other) == dict:
self.json = other
elif type(other) == list:
self.json = {"array": other}
else:
raise PJFInvalidType(other, dict)
else:
if self.config.json_file:
self.json = other
else:
self.json = json.dumps(other)
self.logger.debug("[{0}] - PJFFactory successfully initialized".format(time.strftime("%H:%M:%S")))
def mkdir_file(self):
"""
:return:?????????
"""
ini = U.ConfigIni()
result_file = str(ini.get_ini('test_case', 'log_file'))
result_file_every = result_file + '/' + \
time.strftime("%Y-%m-%d_%H_%M_%S{}".format(random.randint(10, 99)),
time.localtime(time.time()))
file_list = [
result_file,
result_file_every,
result_file_every + '/log',
result_file_every + '/per',
result_file_every + '/img',
result_file_every + '/status']
if not os.path.exists(result_file):
os.mkdir(result_file)
for file_path in file_list:
if not os.path.exists(file_path):
os.mkdir(file_path)
return result_file_every
def imshow_cv(label, im, block=False, text=None, wait=2):
vis = im.copy()
print_status(vis, text=text)
window_manager.imshow(label, vis)
ch = cv2.waitKey(0 if block else wait) & 0xFF
if ch == ord(' '):
cv2.waitKey(0)
if ch == ord('v'):
print('Entering debug mode, image callbacks active')
while True:
ch = cv2.waitKey(10) & 0xFF
if ch == ord('q'):
print('Exiting debug mode!')
break
if ch == ord('s'):
fn = 'img-%s.png' % time.strftime("%Y-%m-%d-%H-%M-%S")
print 'Saving %s' % fn
cv2.imwrite(fn, vis)
elif ch == 27 or ch == ord('q'):
sys.exit(1)
def main(_):
config = flags.FLAGS.__flags.copy()
config.update(json.loads(config['config']))
del config['config']
if config['results_dir'] == '':
del config['results_dir']
if config['task'] == 'search':
# Hyperparameter search cannot be continued, so a new results dir is created.
config['results_dir'] = os.path.join(results_dir, 'hs', config['model_name'] \
+ time.strftime('_%Y-%m-%d_%H-%M-%S', time.gmtime()))
hb = Hyperband(config)
results = hb.run()
else:
model = make_model(config)
if config['task'] == 'train':
model.train()
elif config['task'] == 'test':
model.test()
else:
print('Invalid argument: --task=%s. ' \
+ 'It should be either of {train, test, search}.' % config['task'])
def log(scan_type,host,port,info=''):
mutex.acquire()
time_str = time.strftime('%X', time.localtime( time.time()))
if scan_type == 'portscan':
print "[%s] %s:%d open"%(time_str,host,int(port))
elif scan_type == 'discern':
print "[%s] %s:%d is %s"%(time_str,host,int(port),info)
elif scan_type == 'active':
print "[%s] %s active" % (time_str, host)
elif info:
log = "[*%s] %s:%d %s %s"%(time_str,host,int(port),scan_type,info)
print log
log_file = open('result.log','a')
log_file.write(log+"\r\n")
log_file.close()
mutex.release()
def asctime(t=None):
"""
Convert a tuple or struct_time representing a time as returned by gmtime()
or localtime() to a 24-character string of the following form:
>>> asctime(time.gmtime(0))
'Thu Jan 1 00:00:00 1970'
If t is not provided, the current time as returned by localtime() is used.
Locale information is not used by asctime().
This is meant to normalise the output of the built-in time.asctime() across
different platforms and Python versions.
In Python 3.x, the day of the month is right-justified, whereas on Windows
Python 2.7 it is padded with zeros.
See https://github.com/behdad/fonttools/issues/455
"""
if t is None:
t = time.localtime()
s = "%s %s %2s %s" % (
DAYNAMES[t.tm_wday], MONTHNAMES[t.tm_mon], t.tm_mday,
time.strftime("%H:%M:%S %Y", t))
return s
def write_parameter_log(options, args, output_dir):
"""
Write paramter values to a log file, named by current time.
"""
with open(output_dir+'/CLAM_Aligner.Log.'+ strftime("%Y%m%d_%H%M") + '.txt', 'w') as log:
log.write('CLAM Re-aligner ' + __version__ + '\n')
log.write('Args:\n' + '\n'.join(args) + '\n')
log.write('resume: ' + str(options.resume) + '\n')
log.write('verbose: ' + str(options.verbose) + '\n')
log.write('output_dir: ' + str(options.output_dir) + '\n')
log.write('tmp_dir: ' + str(options.tmp_dir) + '\n')
log.write('window_size: ' + str(options.window_size) + '\n')
log.write('max_multihits: ' + str(options.max_multihits) + '\n')
log.write('is_stranded: ' + str(options.is_stranded) + '\n')
log.write('max-gap: ' + str(options.max_gaps) + '\n')
#log.write('gtf: ' + str(options.gtf) + '\n')
#if len(args)>1:
# log.write('cov_site_min: ' + str(options.cov_site_min) + '\n')
# log.write('cov_gene_min: ' + str(options.cov_gene_min) + '\n')
return
def write_parameter_log(options, output_dir):
"""
Write paramter values to a log file, named by current time.
"""
merge_method_dict={1:'narrowPeak', 2:'broadPeak'}
correction_method_dict={1:'Bonferroni', 2:'BH_FDR'}
with open(output_dir+'/CLAM_Peaker.Parameters.'+ strftime("%Y%m%d_%H%M") + '.txt', 'w') as log:
log.write('CLAM Peaker ' + __version__ + '\n')
log.write('resume: ' + str(options.resume) + '\n')
log.write('verbose: ' + str(options.verbose) + '\n')
log.write('output_dir:' + str(options.output_dir) + '\n')
log.write('tmp_dir: ' + str(options.tmp_dir) + '\n')
log.write('peak_file: ' + str(options.peak_file) + '\n')
log.write('is_stranded: ' + str(options.is_stranded) + '\n')
log.write('extend: ' + str(options.extend) + '\n')
log.write('pval_cutoff: ' + str(options.pval_cutoff) + '\n')
log.write('merge_size: ' + str(options.merge_size) + '\n')
log.write('max_iter: ' + str(options.max_iter) + '\n')
log.write('gtf: ' + str(options.gtf) + '\n')
log.write('seed: ' + str(options.seed) + '\n')
log.write('merge_method: ' + merge_method_dict[options.merge_method] + '\n')
log.write('correction_method: ' + correction_method_dict[options.correction_method] + '\n')
log.write('thread: ' + str(options.nb_proc) + '\n')
def write_parameter_log(options, args, output_dir):
"""
Write paramter values to a log file, named by current time.
"""
with open(output_dir+'/CLAM_Aligner.Log.'+ strftime("%Y%m%d_%H%M") + '.txt', 'w') as log:
log.write('CLAM Re-aligner ' + __version__ + '\n')
log.write('Args:\n' + '\n'.join(args) + '\n')
log.write('resume: ' + str(options.resume) + '\n')
log.write('verbose: ' + str(options.verbose) + '\n')
log.write('output_dir: ' + str(options.output_dir) + '\n')
log.write('tmp_dir: ' + str(options.tmp_dir) + '\n')
log.write('window_size: ' + str(options.window_size) + '\n')
log.write('max_multihits: ' + str(options.max_multihits) + '\n')
log.write('is_stranded: ' + str(options.is_stranded) + '\n')
log.write('max-gap: ' + str(options.max_gaps) + '\n')
#log.write('gtf: ' + str(options.gtf) + '\n')
#if len(args)>1:
# log.write('cov_site_min: ' + str(options.cov_site_min) + '\n')
# log.write('cov_gene_min: ' + str(options.cov_gene_min) + '\n')
return
def write_parameter_log(options, output_dir):
"""
Write paramter values to a log file, named by current time.
"""
merge_method_dict={1:'narrowPeak', 2:'broadPeak'}
correction_method_dict={1:'Bonferroni', 2:'BH_FDR'}
with open(output_dir+'/CLAM_Peaker.Parameters.'+ strftime("%Y%m%d_%H%M") + '.txt', 'w') as log:
log.write('CLAM Peaker ' + __version__ + '\n')
log.write('resume: ' + str(options.resume) + '\n')
log.write('verbose: ' + str(options.verbose) + '\n')
log.write('output_dir:' + str(options.output_dir) + '\n')
log.write('tmp_dir: ' + str(options.tmp_dir) + '\n')
log.write('peak_file: ' + str(options.peak_file) + '\n')
log.write('is_stranded: ' + str(options.is_stranded) + '\n')
log.write('extend: ' + str(options.extend) + '\n')
log.write('pval_cutoff: ' + str(options.pval_cutoff) + '\n')
log.write('merge_size: ' + str(options.merge_size) + '\n')
log.write('max_iter: ' + str(options.max_iter) + '\n')
log.write('gtf: ' + str(options.gtf) + '\n')
log.write('seed: ' + str(options.seed) + '\n')
log.write('merge_method: ' + merge_method_dict[options.merge_method] + '\n')
log.write('correction_method: ' + correction_method_dict[options.correction_method] + '\n')
log.write('thread: ' + str(options.nb_proc) + '\n')
def request(self, method, request_uri, headers, content):
"""Modify the request headers"""
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
cnonce = _cnonce()
request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
self.credentials[0],
self.challenge['realm'],
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist)
def gethtml(zurl,str_fname):
mobileEmulation = {'deviceName': 'Apple iPhone 6'}
options = webdriver.ChromeOptions()
options.add_experimental_option('mobileEmulation', mobileEmulation)
driver = webdriver.Chrome(executable_path='chromedriver.exe', chrome_options=options)
driver.get(zurl)
time.sleep(5)
result = []
# for i in range(0,300): #???0?20?????i?
for i in range(0, 1): # ???0?3?????i?
print('????' + str(i))
myscroll(driver)
time.sleep(2)
st=time.strftime("%Y%m%d",time.localtime())
# print(driver.page_source, file=open('itg201703.html', 'w', encoding='utf-8'))
print(driver.page_source, file=open(str_fname+"-"+st+".html", 'w', encoding='utf-8'))
print("?????????")
print(driver.title)
driver.quit()
def strftime(config, context, arg, now=time.gmtime()):
"""
strftime returns the current time (in UTC) converted to the format
specified by the first argument. The format is specified using
Python's time.strftime format (
https://docs.python.org/2/library/time.html#time.strftime).
Example:
{"CFPP::Strftime": "%Y%m%d_%H%M%S"} ==> 20060102_220405
Note: use special care when using this function with CloudFormation's
"update" functionality. The output of this function will change each
time cfpp is run.
"""
_raise_unless_string(context, arg)
return time.strftime(arg, now)
def make_layout(self,frame,label,labelloc,labelwidth):
"""
Generate chip with dimensions xdim,ydim
"""
box=cad.shapes.Box((-self.xdim/2, -self.ydim/2), (self.xdim/2, self.ydim/2),
width=self.boxwidth, layer =self.layer_box)
date = time.strftime("%d/%m/%Y")
# The label is added 100 um on top of the main cell
label_grid_chip = cad.shapes.LineLabel( self.name + " " +\
date,self.boxwidth,
position=labelloc,
line_width=labelwidth,
layer=self.layer_label)
if frame==True:
self.add(box)
if label==True:
self.add(label_grid_chip)
def make_wafer(self,wafer_r,frame,label,labelloc,labelwidth):
"""
Generate wafer with primary flat on the left. From https://coresix.com/products/wafers/ I estimated that the angle defining the wafer flat to arctan(flat/2 / radius)
"""
angled = 18
angle = angled*np.pi/180
circ = cad.shapes.Circle((0,0), wafer_r, width=self.boxwidth, initial_angle=180+angled, final_angle=360+180-angled, layer=self.layer_box)
flat = cad.core.Path([(-wafer_r*np.cos(angle),wafer_r*np.sin(angle)),(-wafer_r*np.cos(angle),-wafer_r*np.sin(angle))], width=self.boxwidth, layer=self.layer_box)
date = time.strftime("%d/%m/%Y")
if labelloc==(0,0):
labelloc=(-2e3,wafer_r-1e3)
# The label is added 100 um on top of the main cell
label_grid_chip = cad.shapes.LineLabel( self.name + " " +\
date,500,position=labelloc,
line_width=labelwidth,
layer=self.layer_label)
if frame==True:
self.add(circ)
self.add(flat)
if label==True:
self.add(label_grid_chip)
def timeheader(timestamp=time.gmtime()):
"""Timestamp header string
timestamp - timestamp
return - timetamp string for the file header
"""
assert isinstance(timestamp, time.struct_time), 'Unexpected type of timestamp'
# ATTENTION: MPE pool timestamp [prefix] intentionally differs a bit from the
# benchmark timestamp to easily find/filter each of them
return time.strftime('# ----- %Y-%m-%d %H:%M:%S ' + '-'*30, timestamp)
# Limit the amount of memory consumption by worker processes.
# NOTE:
# - requires import of psutils
# - automatically reduced to the RAM size if the specidied limit is larger
def outputjson():
site_info = site_get()
tempdict = {}
tempjson = "["
info_list = Article.query.filter_by().all()
for item in info_list:
tempdict = item.__dict__
del tempdict["_sa_instance_state"]
value = json.dumps(tempdict,cls=CJsonEncoder)
tempjson += value + ",\n"
tempjson = tempjson[:-2] + "]"
filename = 'page_list_'+str(time.strftime("%Y%m%d"))+'.txt'
output = open(filename,'w')
output.write(tempjson)
output.close()
flash(u'?????????????')
return render_template('admin/output.html', **locals())
def get_merged_nodes():
merged = {}
mergeddmp = open(args.infile_mergeddmp_path,'r')
for curr_line in mergeddmp:
curr_line_old_taxid = curr_line.split('|')[0].strip()
curr_line_new_taxid = curr_line.split('|')[1].strip()
merged[curr_line_old_taxid] = curr_line_new_taxid
mergeddmp.close()
log_file = open(args.logfile_path, 'a')
log_file.write('get_merged_nodes() finished ' + strftime("%H:%M:%S on %d-%m-%Y",localtime()) + '\n')
log_file.close()
return(merged)
#################################################
def get_deleted_nodes():
deleted = {}
delnodesdmp = open(args.infile_delnodesdmp_path,'r')
for curr_line in delnodesdmp:
curr_line_old_taxid = curr_line.split('|')[0].strip()
deleted[curr_line_old_taxid] = True
delnodesdmp.close()
log_file = open(args.logfile_path, 'a')
log_file.write('get_deleted_nodes() finished ' + strftime("%H:%M:%S on %d-%m-%Y",localtime()) + '\n')
log_file.close()
return(deleted)
#################################################
wmo_file.py 文件源码
项目:Blender-WMO-import-export-scripts
作者: WowDevTools
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def save_liquids(self):
start_time = time.time()
for liquid_obj in self.bl_scene_objects.liquids:
print("\nSaving liquid: <<{}>>".format(liquid_obj.name))
if not liquid_obj.WowLiquid.WMOGroup:
print("WARNING: Failed saving liquid: <<{}>>".format(liquid_obj.name))
continue
group_obj = bpy.context.scene.objects[liquid_obj.WowLiquid.WMOGroup]
group_index = group_obj.WowWMOGroup.GroupID
group = self.groups[group_index]
group.save_liquid(liquid_obj)
print("Done saving liquid: <<{}>>".format(liquid_obj.name))
print("\nDone saving liquids. "
"\nTotal saving time: ", time.strftime("%M minutes %S seconds", time.gmtime(time.time() - start_time)))