python类write()的实例源码

tests.py 文件源码 项目:SynthDB 作者: shawnrushefsky 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def try_it(qu):
    stdout.write("\r{} ---> ".format(qu))
    stdout.flush()
    passed = 0
    req = None
    try:
        req = qu.run(c)
        if basic_test(req):
            passed = 1
            stdout.write("PASS\n")
        else:
            fails.append(err_format(q, req))
            stdout.write("FAIL\n")
            print err_format(q, req)
            exit()
    except (preqlerrors.TopologyError, preqlerrors.ValueTypeError, preqlerrors.NonexistenceError) as e:
        errors.append(err_format(q, str(e.msg)))
        stdout.write("ERROR\n")
    stdout.flush()
    return passed, 1, req
blocklists_simple.py 文件源码 项目:exabgp-edgerouter 作者: infowolfe 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def fetch():
    a = IPSet([])
    for blocklist in blocklists:
        r = requests.get(blocklist)
        for line in r.iter_lines():
            if linefilter(line):
                a.add(makeprefix(linefilter(line)))

    for prefix in b:
        if b.len() > 0 and b.__contains__(prefix) and not a.__contains__(prefix):
            a.discard(prefix)
            stdout.write('withdraw route ' + str(prefix) + nexthop)
            stdout.flush()

    for prefix in a:
        if a.__contains__(prefix) and not b.__contains__(prefix):
            stdout.write('announce route ' + str(prefix) + nexthop)
            stdout.flush()

    b.add(a)
fballprofilepic.py 文件源码 项目:Facebook-Scripts 作者: Sunil02324 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def process_photos(photos):
  if 'error' in photos:
    print "Error = ", error
    raise Exception("Error in Response")

  no_of_photos = 0
  if 'data' not in photos:
    return
  while len(photos['data']) > 0:
    for photo in photos['data']:
      if 'tags' in photo:
        process_photo_tags(photo['tags'])
      if 'comments' in photo:
        process_photo_comments(photo['comments'])
      no_of_photos += 1
      stdout.write("\rNumber of Photos Processed = %d" % no_of_photos)
      stdout.flush()
    if 'paging' in photos and 'next' in photos['paging']:
      request_str = photos['paging']['next'].replace('https://graph.facebook.com/', '')
      request_str = request_str.replace('limit=25', 'limit=200')
      photos = graph.get(request_str)
    else:
      photos['data'] = []
FenrirTail.py 文件源码 项目:fenrir-ocd 作者: Orange-Cyberdefense 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def mangleException(self, pkt, reason=''):
        self.notifyBad('\nFENRIR PANIC : Process failed during MANGLING', 1, 1)
        if reason != '':
            self.notifyBad('Reason : ' + reason, 1)
        self.notify('Packet was logged to errorLogFile : FENRIR.err', 1)
        logfd = open('FENRIR.err', 'a')
        logfd.write(
            '---DUMP BEGINS--------------------------------------------------------------------------------------\n')
        logfd.write(
            '[*] Packet header SRC : ' + pkt[IP].src + ' (' + pkt[Ether].src + ') DST : ' + pkt[IP].dst + ' (' + pkt[
                Ether].dst + ')\n')
        logfd.write('Packet dump :\n')
        logfd.write(str(ls(pkt)) + '\n')
        logfd.write(
            '---DUMP ENDS----------------------------------------------------------------------------------------\n')
        logfd.close()


    ## fenrirPanic : unrecoverable exception handling ##
terminal.py 文件源码 项目:connect4 作者: guglielmilo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def init():
    height_term, width_term = get_terminal_size()
    height_min = COL_HEIGHT * HEIGHT + 2 + 9
    width_min = COL_WIDTH * WIDTH + 2 + 5
    if height_term < height_min or width_term < width_min:
        # resize the terminal to fit the minimum size to display the connect4 before exit
        stdout.write("\x1b[8;{h};{w}t".format(h=max(height_min, height_term), w=max(width_min, width_term)))
        exit('\033[91m' + 'The terminal was too small, you can now restart ' + '\033[1m' + 'Connect4' + '\033[0m')
    stdscr = curses.initscr()
    height,width = stdscr.getmaxyx()
    if height < height_min or width < width_min:
        # abort the program if the terminal can't be resized
        curses.endwin()
        exit('Please resize your terminal [%d%s%d] (minimum required %d%s%d)' %(width, 'x', height, width_min, 'x', height_min))
    curses.noecho()
    curses.cbreak()
    curses.curs_set(0)
    stdscr.keypad(1)
    #define the different colors
    if curses.can_change_color():
        defineColors()
    #return stdscr, width
    stdscr.clear()
    stdscr.border(0)
    return stdscr, width, height
list_versions.py 文件源码 项目:grafanalib 作者: weaveworks 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _usage(error_message=None):
    if error_message:
        stderr.write('ERROR: ' + error_message + linesep)
    stdout.write(
        linesep.join([
            'Usage:', '    list_versions.py [OPTION]... [DEPENDENCY]',
            'Examples:', '    list_versions.py go',
            '    list_versions.py -r docker',
            '    list_versions.py --rc docker',
            '    list_versions.py -l kubernetes',
            '    list_versions.py --latest kubernetes', 'Options:',
            '-l/--latest Include only the latest version of each major and'
            ' minor versions sub-tree.',
            '-r/--rc     Include release candidate versions.',
            '-h/--help   Prints this!', ''
        ]))
terminalwriter.py 文件源码 项目:py 作者: pytest-dev 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, file=None, stringio=False, encoding=None):
        if file is None:
            if stringio:
                self.stringio = file = py.io.TextIO()
            else:
                from sys import stdout as file
        elif py.builtin.callable(file) and not (
             hasattr(file, "write") and hasattr(file, "flush")):
            file = WriteFile(file, encoding=encoding)
        if hasattr(file, "isatty") and file.isatty() and colorama:
            file = colorama.AnsiToWin32(file).stream
        self.encoding = encoding or getattr(file, 'encoding', "utf-8")
        self._file = file
        self.hasmarkup = should_do_markup(file)
        self._lastlen = 0
        self._chars_on_current_line = 0
terminalwriter.py 文件源码 项目:py 作者: pytest-dev 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def write_out(fil, msg):
    # XXX sometimes "msg" is of type bytes, sometimes text which
    # complicates the situation.  Should we try to enforce unicode?
    try:
        # on py27 and above writing out to sys.stdout with an encoding
        # should usually work for unicode messages (if the encoding is
        # capable of it)
        fil.write(msg)
    except UnicodeEncodeError:
        # on py26 it might not work because stdout expects bytes
        if fil.encoding:
            try:
                fil.write(msg.encode(fil.encoding))
            except UnicodeEncodeError:
                # it might still fail if the encoding is not capable
                pass
            else:
                fil.flush()
                return
        # fallback: escape all unicode characters
        msg = msg.encode("unicode-escape").decode("ascii")
        fil.write(msg)
    fil.flush()
client.py 文件源码 项目:cliFBChat 作者: blcc 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _get(self, url, query=None, timeout=30):
        payload=self._generatePayload(query)
        a = 0
        while 1:
            try:
                a = self._session.get(url, headers=self._header, params=payload, timeout=timeout)
            except :
                #print(exc_info())
                a = a+1
                if self.listening:
                    stdout.write("_get "+url+" failed, retrying..."+str(a)+"\r")
                    stdout.flush()
                    continue
            break
        stdout.write("                                                                 \r")
        stdout.flush()
        return a
__main__.py 文件源码 项目:vscode-azurecli 作者: Microsoft 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    timings = False
    start = time.time()
    initialize()
    if timings: print('initialize {} s'.format(time.time() - start), file=stderr)

    start = time.time()
    command_table = load_command_table()
    if timings: print('load_command_table {} s'.format(time.time() - start), file=stderr)

    start = time.time()
    group_index = get_group_index(command_table)
    if timings: print('get_group_index {} s'.format(time.time() - start), file=stderr)

    start = time.time()
    snippets = get_snippets(command_table) if AUTOMATIC_SNIPPETS_ENABLED else []
    if timings: print('get_snippets {} s'.format(time.time() - start), file=stderr)

    while True:
        line = stdin.readline()
        start = time.time()
        request = json.loads(line)
        response_data = None
        if request['data'].get('request') == 'status':
            response_data = get_status()
            if timings: print('get_status {} s'.format(time.time() - start), file=stderr)
        elif request['data'].get('request') == 'hover':
            response_data = get_hover_text(group_index, command_table, request['data']['command'])
            if timings: print('get_hover_text {} s'.format(time.time() - start), file=stderr)
        else:
            response_data = get_completions(group_index, command_table, snippets, request['data'], True)
            if timings: print('get_completions {} s'.format(time.time() - start), file=stderr)
        response = {
            'sequence': request['sequence'],
            'data': response_data
        }
        output = json.dumps(response)
        stdout.write(output + '\n')
        stdout.flush()
        stderr.flush()
progressbar.py 文件源码 项目:progressbar 作者: zaktimson 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __next__(self):
        """
        next overload. If display is true the latest stetistics are displayed
        :return: The next number in iterator
        """
        if self.display:
            self.__restart_line()
            stdout.write(str(self))
            stdout.flush()
        if self.current >= self.end:
            raise StopIteration
        self.current += self.step
        return self.current - self.step
progressbar.py 文件源码 项目:progressbar 作者: zaktimson 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __restart_line():
        """
        Writes return carriage to stdout and flushes. This allows writing to the same line.
        :return: None
        """
        stdout.write('\r')
        stdout.flush()
ZProgressbar.py 文件源码 项目:progressbar 作者: zaktimson 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __next__(self):
        """
        next overload. If display is true the latest stetistics are displayed
        :return: The next number in iterator
        """
        if self.display:
            self.__restart_line()
            stdout.write(str(self))
            stdout.flush()
        if self.current >= self.end:
            raise StopIteration
        self.current += self.step
        return self.current - self.step
ZProgressbar.py 文件源码 项目:progressbar 作者: zaktimson 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __restart_line():
        """
        Writes return carriage to stdout and flushes. This allows writing to the same line.
        :return: None
        """
        stdout.write('\r')
        stdout.flush()
gop_util.py 文件源码 项目:pybot 作者: spillai 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update( self, n ):
        mem = ""
        if self.show_mem:
            m = mem_usage()
            if m>0:
                mem = "  [ %0.1fMb used ]"%m
        try:
            stdout.write( (self.m+mem+"   \r")%(100*(n-self.start+1e-50)/(self.range+1e-50)) )
        except:
            stdout.write( ("%s%0.1f%%   "+mem+"\r")%(self.m,100*(n-self.start+1e-50)/(self.range+1e-50)) )
gop_util.py 文件源码 项目:pybot 作者: spillai 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __del__( self ):
        stdout.write('\n')
jira_confluence_backup.py 文件源码 项目:jira-confluence-backup 作者: MyMedsAndMe 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def monitor(s):
    r = s.get(url=progress_url)
    try:
        progress_data = json.loads(r.text)
    except ValueError:
        print """No JSON object could be decoded.
        Get progress failed to return expected data.
        Return code: %s """ % (r.status_code)
        result = ['No JSON object could be decoded\
            - get progress failed to return expected data\
        Return code: %s """ % (r.status_code)', False]
    # Timeout waiting for remote backup to complete
    # (since it sometimes fails) in 5s multiples
    global timeout
    timeout_count = timeout*12  # timeout x 12 = number of iterations of 5s
    time_left = timeout
    while 'fileName' not in progress_data or timeout_count > 0:
        # Clears the line before re-writing to avoid artifacts
        stdout.write("\r\x1b[2k")
        stdout.write("\r\x1b[2K%s. Timeout remaining: %sm"
                     % (progress_data['alternativePercentage'],
                        str(time_left)))
        stdout.flush()
        r = s.get(url=progress_url)
        progress_data = json.loads(r.text)
        time.sleep(5)
        timeout_count = timeout_count - 5
        if timeout_count % 12 == 0:
            time_left = time_left - 1
    if 'fileName' in progress_data:
        result = [progress_data['fileName'], True]
        return result
jira_confluence_backup.py 文件源码 项目:jira-confluence-backup 作者: MyMedsAndMe 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def download(s, l):
    filename = get_filename(s)
    if not filename:
        return False
    print "Filename found: %s" % filename
    print "Checking if url is valid"
    r = s.get(url=download_url + filename, stream=True)
    print "Status code: %s" % str(r.status_code)
    if int(r.status_code) == 200:
        print "Url returned '200', downloading file"
        if not create_backup_location(l):
            result = ['Failed to create backup location', False]
            return result
        date_time = datetime.datetime.now().strftime("%Y%m%d")
        with open(l + '/' + application + '-' + date_time + '.zip', 'wb') as f:
            file_total = 0
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
                    file_total = file_total + 1024
                    file_total_m = float(file_total)/1048576
                    # Clears the line before re-writing to avoid artifacts
                    stdout.write("\r\x1b[2k")
                    stdout.write("\r\x1b[2K%.2fMB   downloaded" % file_total_m)
                    stdout.flush()
        stdout.write("\n")
        result = ['Backup downloaded successfully', True]
        return result
    else:
        print "Download file not found on remote server - response code %s" % \
            str(r.status_code)
        print "Download url: %s" % download_url + filename
        result = ['Download file not found on remote server', False]
        return result
vampy.py 文件源码 项目:vampy 作者: m4n3dw0lf 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def LoadingCallBack(j,k):
    stdout.write("\r [+] Files: [{}] (strings: [{}])".format(j,k))
    stdout.flush()
make_charset.py 文件源码 项目:pypilot 作者: pypilot 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def create_character(n, c, ifont):
    size = ifont.getsize(c)
    image = Image.new('RGBA', size)
    draw = ImageDraw.Draw(image)
    draw.text((0, 0), c, font=ifont)
    data = list(image.getdata())
    print 'const PROGMEM unsigned char font%d_%02x[] = {' % (n, ord(c))
    for i in range(len(data)):
        stdout.write('0x%02x, ' % data[i][0])
    print '};';
    return size


问题


面经


文章

微信
公众号

扫码关注公众号