python类linesep()的实例源码

mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _install_message(self, message):
        """Format a message and blindly write to self._file."""
        from_line = None
        if isinstance(message, str) and message.startswith('From '):
            newline = message.find('\n')
            if newline != -1:
                from_line = message[:newline]
                message = message[newline + 1:]
            else:
                from_line = message
                message = ''
        elif isinstance(message, _mboxMMDFMessage):
            from_line = 'From ' + message.get_from()
        elif isinstance(message, email.message.Message):
            from_line = message.get_unixfrom()  # May be None.
        if from_line is None:
            from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
        start = self._file.tell()
        self._file.write(from_line + os.linesep)
        self._dump_message(message, self._file, self._mangle_from_)
        stop = self._file.tell()
        return (start, stop)
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _generate_toc(self):
        """Generate key-to-(start, stop) table of contents."""
        starts, stops = [], []
        self._file.seek(0)
        while True:
            line_pos = self._file.tell()
            line = self._file.readline()
            if line.startswith('From '):
                if len(stops) < len(starts):
                    stops.append(line_pos - len(os.linesep))
                starts.append(line_pos)
            elif line == '':
                stops.append(line_pos)
                break
        self._toc = dict(enumerate(zip(starts, stops)))
        self._next_key = len(self._toc)
        self._file_length = self._file.tell()
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
        return original_headers.getvalue() + \
               self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
wheel.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def fix_script(path):
    """Replace #!python with #!/path/to/python
    Return True if file was changed."""
    # XXX RECORD hashes will need to be updated
    if os.path.isfile(path):
        script = open(path, 'rb')
        try:
            firstline = script.readline()
            if not firstline.startswith(binary('#!python')):
                return False
            exename = sys.executable.encode(sys.getfilesystemencoding())
            firstline = binary('#!') + exename + binary(os.linesep)
            rest = script.read()
        finally:
            script.close()
        script = open(path, 'wb')
        try:
            script.write(firstline)
            script.write(rest)
        finally:
            script.close()
        return True
__init__.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main(initial_args=None):
    if initial_args is None:
        initial_args = sys.argv[1:]

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(initial_args)
    except PipError:
        e = sys.exc_info()[1]
        sys.stderr.write("ERROR: %s" % e)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    command = commands[cmd_name]()
    return command.main(cmd_args)
__init__.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def main(args=None):
    if args is None:
        args = sys.argv[1:]

    # Configure our deprecation warnings to be sent through loggers
    deprecation.install_warning_logger()

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(args)
    except PipError as exc:
        sys.stderr.write("ERROR: %s" % exc)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    # Needed for locale.getpreferredencoding(False) to work
    # in pip._internal.utils.encoding.auto_decode
    try:
        locale.setlocale(locale.LC_ALL, '')
    except locale.Error as e:
        # setlocale can apparently crash if locale are uninitialized
        logger.debug("Ignoring error %s when setting locale", e)
    command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
    return command.main(cmd_args)
recipe-578043.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def remove(path, pth_file=None):
    pth_file = pth_file or PATH
    try:
        with open(pth_file, "r") as f:
            lines = f.read().splitlines()[2:]
    except IOError:
        raise ValueError("not found: {}".format(path))

    try:
        lines.remove(path)
    except ValueError:
        raise ValueError("not found: {}".format(path))

    lines.insert(0, HEADER)
    with open(pth_file, "w") as f:
        f.write(os.linesep.join(lines))
recipe-546530.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _printf(fmt, *args, **print3opts):
    '''Formatted print to sys.stdout or given stream.

        *print3opts* -- print keyword arguments, like Python 3.+
    '''
    if print3opts:  # like Python 3.0
        f = print3opts.get('file', None) or sys.stdout
        if args:
            f.write(fmt % args)
        else:
            f.write(fmt)
        f.write(print3opts.get('end', linesep))
        if print3opts.get('flush', False):
            f.flush()
    elif args:
        print(fmt % args)
    else:
        print(fmt)
waf_unit_test.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set_exit_code(bld):
    """
    If any of the tests fail waf will exit with that exit code.
    This is useful if you have an automated build system which need
    to report on errors from the tests.
    You may use it like this:

        def build(bld):
            bld(features='cxx cxxprogram test', source='main.c', target='app')
            from waflib.Tools import waf_unit_test
            bld.add_post_fun(waf_unit_test.set_exit_code)
    """
    lst = getattr(bld, 'utest_results', [])
    for (f, code, out, err) in lst:
        if code:
            msg = []
            if out:
                msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
            if err:
                msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
            bld.fatal(os.linesep.join(msg))
Node.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def write_json(self, data, pretty=True):
        """
        Writes a python object as JSON to disk. Files are always written as UTF8 as per the JSON standard::

            def build(bld):
                bld.path.find_node('xyz.json').write_json(199)

        :type  data: object
        :param data: The data to write to disk
        :type  pretty: boolean
        :param pretty: Determines if the JSON will be nicely space separated
        """
        import json # Python 2.6 and up
        indent = 2
        separators = (',', ': ')
        sort_keys = pretty
        newline = os.linesep
        if not pretty:
            indent = None
            separators = (',', ':')
            newline = ''
        output = json.dumps(data, indent=indent, separators=separators, sort_keys=sort_keys) + newline
        self.write(output, encoding='utf-8')
waf_unit_test.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def set_exit_code(bld):
    """
    If any of the tests fail waf will exit with that exit code.
    This is useful if you have an automated build system which need
    to report on errors from the tests.
    You may use it like this:

        def build(bld):
            bld(features='cxx cxxprogram test', source='main.c', target='app')
            from waflib.Tools import waf_unit_test
            bld.add_post_fun(waf_unit_test.set_exit_code)
    """
    lst = getattr(bld, 'utest_results', [])
    for (f, code, out, err) in lst:
        if code:
            msg = []
            if out:
                msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
            if err:
                msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
            bld.fatal(os.linesep.join(msg))
Node.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def write_json(self, data, pretty=True):
        """
        Writes a python object as JSON to disk. Files are always written as UTF8 as per the JSON standard::

            def build(bld):
                bld.path.find_node('xyz.json').write_json(199)

        :type  data: object
        :param data: The data to write to disk
        :type  pretty: boolean
        :param pretty: Determines if the JSON will be nicely space separated
        """
        import json # Python 2.6 and up
        indent = 2
        separators = (',', ': ')
        sort_keys = pretty
        newline = os.linesep
        if not pretty:
            indent = None
            separators = (',', ':')
            newline = ''
        output = json.dumps(data, indent=indent, separators=separators, sort_keys=sort_keys) + newline
        self.write(output, encoding='utf-8')
waf_unit_test.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def set_exit_code(bld):
    """
    If any of the tests fail waf will exit with that exit code.
    This is useful if you have an automated build system which need
    to report on errors from the tests.
    You may use it like this:

        def build(bld):
            bld(features='cxx cxxprogram test', source='main.c', target='app')
            from waflib.Tools import waf_unit_test
            bld.add_post_fun(waf_unit_test.set_exit_code)
    """
    lst = getattr(bld, 'utest_results', [])
    for (f, code, out, err) in lst:
        if code:
            msg = []
            if out:
                msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
            if err:
                msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
            bld.fatal(os.linesep.join(msg))
streams.py 文件源码 项目:TerminalView 作者: Wramberg 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, to=sys.stdout, only=(), *args, **kwargs):
        super(DebugStream, self).__init__(*args, **kwargs)

        def safe_str(chunk):
            if isinstance(chunk, bytes):
                chunk = chunk.decode("utf-8")
            elif not isinstance(chunk, str):
                chunk = str(chunk)

            return chunk

        class Bugger(object):
            __before__ = __after__ = lambda *args: None

            def __getattr__(self, event):
                def inner(*args, **kwargs):
                    to.write(event.upper() + " ")
                    to.write("; ".join(map(safe_str, args)))
                    to.write(" ")
                    to.write(", ".join("{0}: {1}".format(k, safe_str(v))
                                       for k, v in kwargs.items()))
                    to.write(os.linesep)
                return inner

        self.attach(Bugger(), only=only)
merac-o-matic-gui.py 文件源码 项目:merac-o-matic 作者: RogueAI42 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def generate():
    global sound
    statement = coremeraco.constructRegularStatement() + os.linesep
    textArea.insert(Tkinter.END, statement)
    textArea.yview(Tkinter.END)
    if readOption and readOutputState.get():
        if sound:
            sound.stop()
        sound = playsnd.playsnd()
        threading.Thread(target=sound.play, args=(procfest.text2wave(statement),)).start()
    if saveOutputState.get():
        try:
            outputFileHandler = open(saveOutputDestination.get(), 'a') # 'a' means append mode
            outputFileHandler.write(statement)
            outputFileHandler.close()
        except IOError as Argument:
            tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "ioErrorMessage") + str(Argument))
        except Exception as Argument:
            tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "genericErrorMessage") + str(Argument))
test_metadata.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_append_metadata_file(tmpdir):
    initial_data = 'some data'
    my_metadata = {
        'some': 'data',
        'for': 'this metadata'
    }
    output_file = tmpdir.join('metadata')
    output_file.write_binary((initial_data + os.linesep).encode('utf-8'))

    with metadata.MetadataWriter(suppress_output=False)(str(output_file)) as writer:
        writer.write_metadata(**my_metadata)

    lines = output_file.readlines()
    assert len(lines) == 2
    assert lines[0].strip() == initial_data
    assert json.loads(lines[1].strip()) == my_metadata
test_metadata.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_overwrite_metdata_file(tmpdir):
    initial_data = 'some data'
    my_metadata = {
        'some': 'data',
        'for': 'this metadata'
    }
    output_file = tmpdir.join('metadata')
    output_file.write(initial_data + os.linesep)

    overwrite_writer = metadata.MetadataWriter(suppress_output=False)(str(output_file))
    overwrite_writer.force_overwrite()

    with overwrite_writer as writer:
        writer.write_metadata(**my_metadata)

    lines = output_file.readlines()
    assert len(lines) == 1
    assert json.loads(lines[0].strip()) == my_metadata
metadata.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def write_metadata(self, **metadata):
        # type: (**Any) -> Optional[int]
        """Writes metadata to the output stream if output is not suppressed.

        :param **metadata: JSON-serializeable metadata kwargs to write
        """
        if self.suppress_output:
            return 0  # wrote 0 bytes

        metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep
        metadata_output = ''  # type: Union[str, bytes]
        if 'b' in self._output_mode:
            metadata_output = metadata_line.encode('utf-8')
        else:
            metadata_output = metadata_line
        return self._output_stream.write(metadata_output)
archivebot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _collect(self, room_name, compress):
        event = yield idiokit.next()

        while True:
            current = datetime.utcnow().day

            with _open_archive(self.archive_dir, time.time(), room_name) as archive:
                self.log.info("Opened archive {0!r}".format(archive.name))

                while current == datetime.utcnow().day:
                    json_dict = dict((key, event.values(key)) for key in event.keys())
                    archive.write(json.dumps(json_dict) + os.linesep)

                    event = yield idiokit.next()

            yield compress.queue(0.0, _rename(archive.name))
transcoders.py 文件源码 项目:robograph 作者: csparpa 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def output(self):
        if self._params['delimiter'] is None:
            delim = ','
        else:
            delim = self._params['delimiter']

        if self._params['linesep'] is None:
            eol = os.linesep
        else:
            eol = self._params['linesep']

        lines = eol.join([delim.join(map(str, row)) for row in self._params['data_matrix']])
        if self._params['header_list']:
            header = delim.join(self._params['header_list'])
        else:
            header = ''
        return header + eol + lines
__init__.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 95 收藏 0 点赞 0 评论 0
def main(args=None):
    if args is None:
        args = sys.argv[1:]

    # Configure our deprecation warnings to be sent through loggers
    deprecation.install_warning_logger()

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(args)
    except PipError as exc:
        sys.stderr.write("ERROR: %s" % exc)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    # Needed for locale.getpreferredencoding(False) to work
    # in pip.utils.encoding.auto_decode
    locale.setlocale(locale.LC_ALL, '')
    command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
    return command.main(cmd_args)


# ###########################################################
# # Writing freeze files
__init__.py 文件源码 项目:gui_tool 作者: UAVCAN 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def keyPressEvent(self, qkeyevent):
        if qkeyevent.matches(QKeySequence.Copy):
            selected_rows = [x.row() for x in self.selectionModel().selectedRows()]
            logger.info('Copy to clipboard requested [%r rows]' % len(selected_rows))

            out_string = ''
            for row in selected_rows:
                out_string += self.get_row_as_string(row) + os.linesep

            if out_string:
                QApplication.clipboard().setText(out_string)
        else:
            super(BasicTable, self).keyPressEvent(qkeyevent)

        if qkeyevent.matches(QKeySequence.InsertParagraphSeparator):
            if self.hasFocus():
                self.on_enter_pressed([(x.row(), x.column()) for x in self.selectedIndexes()])
colorize.py 文件源码 项目:colorize 作者: BruXy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def print_help():
    """Print help encoded as initial script's comment between #< and #>."""
    show_line = False
    fp = open(sys.argv[0], 'r')

    for line in fp:
        if line[0:2] == '#<':  # start token
            show_line = True
            continue
        elif line[0:2] == '#>':  # end token
            return

        if show_line == True:
            print(line.rstrip(os.linesep)[1:])

    fp.close()
gimp_colorize.py 文件源码 项目:colorize 作者: BruXy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def gui_ask_for_api():
    """Gtk dialog for API key insert."""
    message = gtk.MessageDialog(type=gtk.MESSAGE_QUESTION, buttons=gtk.BUTTONS_OK_CANCEL)
    message.set_markup(colorize.MSG_ASK_API.replace(colorize.URL,"<u>" + colorize.URL +"</u>"))

    entry = gtk.Entry(max=64)
    entry.set_text("Enter your API key")
    entry.show()
    message.vbox.pack_end(entry)
    entry.connect("activate", lambda _: d.response(gtk.RESPONSE_OK))
    message.set_default_response(gtk.RESPONSE_OK)
    message.run()

    api_key = entry.get_text().decode('utf8')
    fp = open(colorize.HOME + colorize.API_KEY_FILE, 'w')
    fp.write("YOUR_API_KEY={0}{1}".format(api_key, os.linesep))
    fp.close()

    # process buttong click immediately
    message.destroy()
    while gtk.events_pending():
        gtk.main_iteration()
test_iutils.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testOutputSignal(self):
        # Use SIGKILL here because it's guaranteed to be delivered. Using
        # SIGHUP might not work in, e.g., a buildbot slave run under the
        # 'nohup' command.
        exe = sys.executable
        scriptFile = self.makeSourceFile([
            "import sys, os, signal",
            "sys.stdout.write('stdout bytes\\n')",
            "sys.stderr.write('stderr bytes\\n')",
            "sys.stdout.flush()",
            "sys.stderr.flush()",
            "os.kill(os.getpid(), signal.SIGKILL)"
            ])

        def gotOutputAndValue(err):
            (out, err, sig) = err.value # XXX Sigh wtf
            self.assertEquals(out, "stdout bytes" + os.linesep)
            self.assertEquals(err, "stderr bytes" + os.linesep)
            self.assertEquals(sig, signal.SIGKILL)

        d = utils.getProcessOutputAndValue(exe, ['-u', scriptFile])
        return d.addErrback(gotOutputAndValue)
wheel.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def fix_script(path):
    """Replace #!python with #!/path/to/python
    Return True if file was changed."""
    # XXX RECORD hashes will need to be updated
    if os.path.isfile(path):
        script = open(path, 'rb')
        try:
            firstline = script.readline()
            if not firstline.startswith(binary('#!python')):
                return False
            exename = sys.executable.encode(sys.getfilesystemencoding())
            firstline = binary('#!') + exename + binary(os.linesep)
            rest = script.read()
        finally:
            script.close()
        script = open(path, 'wb')
        try:
            script.write(firstline)
            script.write(rest)
        finally:
            script.close()
        return True
__init__.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def main(initial_args=None):
    if initial_args is None:
        initial_args = sys.argv[1:]

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(initial_args)
    except PipError:
        e = sys.exc_info()[1]
        sys.stderr.write("ERROR: %s" % e)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    command = commands[cmd_name]()
    return command.main(cmd_args)
mailbox.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _install_message(self, message):
        """Format a message and blindly write to self._file."""
        from_line = None
        if isinstance(message, str) and message.startswith('From '):
            newline = message.find('\n')
            if newline != -1:
                from_line = message[:newline]
                message = message[newline + 1:]
            else:
                from_line = message
                message = ''
        elif isinstance(message, _mboxMMDFMessage):
            from_line = 'From ' + message.get_from()
        elif isinstance(message, email.message.Message):
            from_line = message.get_unixfrom()  # May be None.
        if from_line is None:
            from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
        start = self._file.tell()
        self._file.write(from_line + os.linesep)
        self._dump_message(message, self._file, self._mangle_from_)
        stop = self._file.tell()
        return (start, stop)
mailbox.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg


问题


面经


文章

微信
公众号

扫码关注公众号