def try_it(qu):
stdout.write("\r{} ---> ".format(qu))
stdout.flush()
passed = 0
req = None
try:
req = qu.run(c)
if basic_test(req):
passed = 1
stdout.write("PASS\n")
else:
fails.append(err_format(q, req))
stdout.write("FAIL\n")
print err_format(q, req)
exit()
except (preqlerrors.TopologyError, preqlerrors.ValueTypeError, preqlerrors.NonexistenceError) as e:
errors.append(err_format(q, str(e.msg)))
stdout.write("ERROR\n")
stdout.flush()
return passed, 1, req
python类write()的实例源码
def fetch():
a = IPSet([])
for blocklist in blocklists:
r = requests.get(blocklist)
for line in r.iter_lines():
if linefilter(line):
a.add(makeprefix(linefilter(line)))
for prefix in b:
if b.len() > 0 and b.__contains__(prefix) and not a.__contains__(prefix):
a.discard(prefix)
stdout.write('withdraw route ' + str(prefix) + nexthop)
stdout.flush()
for prefix in a:
if a.__contains__(prefix) and not b.__contains__(prefix):
stdout.write('announce route ' + str(prefix) + nexthop)
stdout.flush()
b.add(a)
def process_photos(photos):
if 'error' in photos:
print "Error = ", error
raise Exception("Error in Response")
no_of_photos = 0
if 'data' not in photos:
return
while len(photos['data']) > 0:
for photo in photos['data']:
if 'tags' in photo:
process_photo_tags(photo['tags'])
if 'comments' in photo:
process_photo_comments(photo['comments'])
no_of_photos += 1
stdout.write("\rNumber of Photos Processed = %d" % no_of_photos)
stdout.flush()
if 'paging' in photos and 'next' in photos['paging']:
request_str = photos['paging']['next'].replace('https://graph.facebook.com/', '')
request_str = request_str.replace('limit=25', 'limit=200')
photos = graph.get(request_str)
else:
photos['data'] = []
def mangleException(self, pkt, reason=''):
self.notifyBad('\nFENRIR PANIC : Process failed during MANGLING', 1, 1)
if reason != '':
self.notifyBad('Reason : ' + reason, 1)
self.notify('Packet was logged to errorLogFile : FENRIR.err', 1)
logfd = open('FENRIR.err', 'a')
logfd.write(
'---DUMP BEGINS--------------------------------------------------------------------------------------\n')
logfd.write(
'[*] Packet header SRC : ' + pkt[IP].src + ' (' + pkt[Ether].src + ') DST : ' + pkt[IP].dst + ' (' + pkt[
Ether].dst + ')\n')
logfd.write('Packet dump :\n')
logfd.write(str(ls(pkt)) + '\n')
logfd.write(
'---DUMP ENDS----------------------------------------------------------------------------------------\n')
logfd.close()
## fenrirPanic : unrecoverable exception handling ##
def init():
height_term, width_term = get_terminal_size()
height_min = COL_HEIGHT * HEIGHT + 2 + 9
width_min = COL_WIDTH * WIDTH + 2 + 5
if height_term < height_min or width_term < width_min:
# resize the terminal to fit the minimum size to display the connect4 before exit
stdout.write("\x1b[8;{h};{w}t".format(h=max(height_min, height_term), w=max(width_min, width_term)))
exit('\033[91m' + 'The terminal was too small, you can now restart ' + '\033[1m' + 'Connect4' + '\033[0m')
stdscr = curses.initscr()
height,width = stdscr.getmaxyx()
if height < height_min or width < width_min:
# abort the program if the terminal can't be resized
curses.endwin()
exit('Please resize your terminal [%d%s%d] (minimum required %d%s%d)' %(width, 'x', height, width_min, 'x', height_min))
curses.noecho()
curses.cbreak()
curses.curs_set(0)
stdscr.keypad(1)
#define the different colors
if curses.can_change_color():
defineColors()
#return stdscr, width
stdscr.clear()
stdscr.border(0)
return stdscr, width, height
def _usage(error_message=None):
if error_message:
stderr.write('ERROR: ' + error_message + linesep)
stdout.write(
linesep.join([
'Usage:', ' list_versions.py [OPTION]... [DEPENDENCY]',
'Examples:', ' list_versions.py go',
' list_versions.py -r docker',
' list_versions.py --rc docker',
' list_versions.py -l kubernetes',
' list_versions.py --latest kubernetes', 'Options:',
'-l/--latest Include only the latest version of each major and'
' minor versions sub-tree.',
'-r/--rc Include release candidate versions.',
'-h/--help Prints this!', ''
]))
def __init__(self, file=None, stringio=False, encoding=None):
if file is None:
if stringio:
self.stringio = file = py.io.TextIO()
else:
from sys import stdout as file
elif py.builtin.callable(file) and not (
hasattr(file, "write") and hasattr(file, "flush")):
file = WriteFile(file, encoding=encoding)
if hasattr(file, "isatty") and file.isatty() and colorama:
file = colorama.AnsiToWin32(file).stream
self.encoding = encoding or getattr(file, 'encoding', "utf-8")
self._file = file
self.hasmarkup = should_do_markup(file)
self._lastlen = 0
self._chars_on_current_line = 0
def write_out(fil, msg):
# XXX sometimes "msg" is of type bytes, sometimes text which
# complicates the situation. Should we try to enforce unicode?
try:
# on py27 and above writing out to sys.stdout with an encoding
# should usually work for unicode messages (if the encoding is
# capable of it)
fil.write(msg)
except UnicodeEncodeError:
# on py26 it might not work because stdout expects bytes
if fil.encoding:
try:
fil.write(msg.encode(fil.encoding))
except UnicodeEncodeError:
# it might still fail if the encoding is not capable
pass
else:
fil.flush()
return
# fallback: escape all unicode characters
msg = msg.encode("unicode-escape").decode("ascii")
fil.write(msg)
fil.flush()
def _get(self, url, query=None, timeout=30):
payload=self._generatePayload(query)
a = 0
while 1:
try:
a = self._session.get(url, headers=self._header, params=payload, timeout=timeout)
except :
#print(exc_info())
a = a+1
if self.listening:
stdout.write("_get "+url+" failed, retrying..."+str(a)+"\r")
stdout.flush()
continue
break
stdout.write(" \r")
stdout.flush()
return a
def main():
timings = False
start = time.time()
initialize()
if timings: print('initialize {} s'.format(time.time() - start), file=stderr)
start = time.time()
command_table = load_command_table()
if timings: print('load_command_table {} s'.format(time.time() - start), file=stderr)
start = time.time()
group_index = get_group_index(command_table)
if timings: print('get_group_index {} s'.format(time.time() - start), file=stderr)
start = time.time()
snippets = get_snippets(command_table) if AUTOMATIC_SNIPPETS_ENABLED else []
if timings: print('get_snippets {} s'.format(time.time() - start), file=stderr)
while True:
line = stdin.readline()
start = time.time()
request = json.loads(line)
response_data = None
if request['data'].get('request') == 'status':
response_data = get_status()
if timings: print('get_status {} s'.format(time.time() - start), file=stderr)
elif request['data'].get('request') == 'hover':
response_data = get_hover_text(group_index, command_table, request['data']['command'])
if timings: print('get_hover_text {} s'.format(time.time() - start), file=stderr)
else:
response_data = get_completions(group_index, command_table, snippets, request['data'], True)
if timings: print('get_completions {} s'.format(time.time() - start), file=stderr)
response = {
'sequence': request['sequence'],
'data': response_data
}
output = json.dumps(response)
stdout.write(output + '\n')
stdout.flush()
stderr.flush()
def __next__(self):
"""
next overload. If display is true the latest stetistics are displayed
:return: The next number in iterator
"""
if self.display:
self.__restart_line()
stdout.write(str(self))
stdout.flush()
if self.current >= self.end:
raise StopIteration
self.current += self.step
return self.current - self.step
def __restart_line():
"""
Writes return carriage to stdout and flushes. This allows writing to the same line.
:return: None
"""
stdout.write('\r')
stdout.flush()
def __next__(self):
"""
next overload. If display is true the latest stetistics are displayed
:return: The next number in iterator
"""
if self.display:
self.__restart_line()
stdout.write(str(self))
stdout.flush()
if self.current >= self.end:
raise StopIteration
self.current += self.step
return self.current - self.step
def __restart_line():
"""
Writes return carriage to stdout and flushes. This allows writing to the same line.
:return: None
"""
stdout.write('\r')
stdout.flush()
def update( self, n ):
mem = ""
if self.show_mem:
m = mem_usage()
if m>0:
mem = " [ %0.1fMb used ]"%m
try:
stdout.write( (self.m+mem+" \r")%(100*(n-self.start+1e-50)/(self.range+1e-50)) )
except:
stdout.write( ("%s%0.1f%% "+mem+"\r")%(self.m,100*(n-self.start+1e-50)/(self.range+1e-50)) )
def __del__( self ):
stdout.write('\n')
jira_confluence_backup.py 文件源码
项目:jira-confluence-backup
作者: MyMedsAndMe
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def monitor(s):
r = s.get(url=progress_url)
try:
progress_data = json.loads(r.text)
except ValueError:
print """No JSON object could be decoded.
Get progress failed to return expected data.
Return code: %s """ % (r.status_code)
result = ['No JSON object could be decoded\
- get progress failed to return expected data\
Return code: %s """ % (r.status_code)', False]
# Timeout waiting for remote backup to complete
# (since it sometimes fails) in 5s multiples
global timeout
timeout_count = timeout*12 # timeout x 12 = number of iterations of 5s
time_left = timeout
while 'fileName' not in progress_data or timeout_count > 0:
# Clears the line before re-writing to avoid artifacts
stdout.write("\r\x1b[2k")
stdout.write("\r\x1b[2K%s. Timeout remaining: %sm"
% (progress_data['alternativePercentage'],
str(time_left)))
stdout.flush()
r = s.get(url=progress_url)
progress_data = json.loads(r.text)
time.sleep(5)
timeout_count = timeout_count - 5
if timeout_count % 12 == 0:
time_left = time_left - 1
if 'fileName' in progress_data:
result = [progress_data['fileName'], True]
return result
jira_confluence_backup.py 文件源码
项目:jira-confluence-backup
作者: MyMedsAndMe
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def download(s, l):
filename = get_filename(s)
if not filename:
return False
print "Filename found: %s" % filename
print "Checking if url is valid"
r = s.get(url=download_url + filename, stream=True)
print "Status code: %s" % str(r.status_code)
if int(r.status_code) == 200:
print "Url returned '200', downloading file"
if not create_backup_location(l):
result = ['Failed to create backup location', False]
return result
date_time = datetime.datetime.now().strftime("%Y%m%d")
with open(l + '/' + application + '-' + date_time + '.zip', 'wb') as f:
file_total = 0
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
file_total = file_total + 1024
file_total_m = float(file_total)/1048576
# Clears the line before re-writing to avoid artifacts
stdout.write("\r\x1b[2k")
stdout.write("\r\x1b[2K%.2fMB downloaded" % file_total_m)
stdout.flush()
stdout.write("\n")
result = ['Backup downloaded successfully', True]
return result
else:
print "Download file not found on remote server - response code %s" % \
str(r.status_code)
print "Download url: %s" % download_url + filename
result = ['Download file not found on remote server', False]
return result
def LoadingCallBack(j,k):
stdout.write("\r [+] Files: [{}] (strings: [{}])".format(j,k))
stdout.flush()
def create_character(n, c, ifont):
size = ifont.getsize(c)
image = Image.new('RGBA', size)
draw = ImageDraw.Draw(image)
draw.text((0, 0), c, font=ifont)
data = list(image.getdata())
print 'const PROGMEM unsigned char font%d_%02x[] = {' % (n, ord(c))
for i in range(len(data)):
stdout.write('0x%02x, ' % data[i][0])
print '};';
return size