python类TextIOWrapper()的实例源码

registrations.py 文件源码 项目:quizbot-2017 作者: pycontw 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def generate_info():
    tickets_archive_path = ROOT_DIR_PATH.joinpath('tickets.zip')
    ensure_data_file(tickets_archive_path, DATA_FILE_INFO['TICKETS_URL'])

    with zipfile.ZipFile(str(tickets_archive_path)) as zf:
        for name in zf.namelist():
            stem, ext = os.path.splitext(name)
            if ext != '.csv':
                continue
            with zf.open(name) as f:
                # Zipfile only opens file in binary mode, but csv only accepts
                # text files, so we need to wrap this.
                # See <https://stackoverflow.com/questions/5627954>.
                textfile = io.TextIOWrapper(f, encoding='utf8', newline='')
                for row in csv.DictReader(textfile):
                    yield Registration(row)
runserver.py 文件源码 项目:django-webpack 作者: csinchok 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self, *args, **options):
        p_path = os.path.join('/proc', str(os.getppid()), 'cmdline')
        with open(p_path, 'rb') as f:
            p_cmdline = f.read().split(b'\x00')

        p = None
        if b'runserver' not in p_cmdline:
            self.stdout.write("Starting webpack-dev-server...")
            p = webpack_dev_server()
            wrapper = io.TextIOWrapper(p.stdout, line_buffering=True)
            first_line = next(wrapper)
            webpack_host = first_line.split()[-1]

            print(webpack_host)

        super().run(**options)

        if p:
            p.kill()
            p.wait()
filesys.py 文件源码 项目:srctools 作者: TeamSpen210 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def open_str(self, name: str, encoding='utf8'):
        """Open a file in unicode mode or raise FileNotFoundError.

        The return value is a StringIO in-memory buffer.
        """
        with self:
            # File() calls with the VPK object we need directly.
            if isinstance(name, VPKFile):
                file = name
            else:
                try:
                    file = self._ref[name]
                except KeyError:
                    raise FileNotFoundError(name)
            # Wrap the data to treat it as bytes, then
            # wrap that to decode and clean up universal newlines.
            return io.TextIOWrapper(io.BytesIO(file.read()), encoding)
os.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering is None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdout=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
    else:
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdin=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
target_csv.py 文件源码 项目:target-csv 作者: singer-io 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--config', help='Config file')
    args = parser.parse_args()

    if args.config:
        with open(args.config) as input:
            config = json.load(input)
    else:
        config = {}

    if not config.get('disable_collection', False):
        logger.info('Sending version information to stitchdata.com. ' +
                    'To disable sending anonymous usage data, set ' +
                    'the config parameter "disable_collection" to true')
        threading.Thread(target=send_usage_stats).start()

    input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
    state = persist_lines(config.get('delimiter', ','),
                          config.get('quotechar', '"'),
                          input)

    emit_state(state)
    logger.debug("Exiting normally")
makefile.py 文件源码 项目:jira_worklog_scanner 作者: pgarneau 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
dist.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        false.
        """
        import sys

        if six.PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
        try:
            return _Distribution.handle_display_options(self, option_order)
        finally:
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)
makemessages.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def gettext_popen_wrapper(args, os_err_exc_type=CommandError, stdout_encoding="utf-8"):
    """
    Makes sure text obtained from stdout of gettext utilities is Unicode.
    """
    # This both decodes utf-8 and cleans line endings. Simply using
    # popen_wrapper(universal_newlines=True) doesn't properly handle the
    # encoding. This goes back to popen's flaky support for encoding:
    # https://bugs.python.org/issue6135. This is a solution for #23271, #21928.
    # No need to do anything on Python 2 because it's already a byte-string there.
    manual_io_wrapper = six.PY3 and stdout_encoding != DEFAULT_LOCALE_ENCODING

    stdout, stderr, status_code = popen_wrapper(args, os_err_exc_type=os_err_exc_type,
                                                universal_newlines=not manual_io_wrapper)
    if manual_io_wrapper:
        stdout = io.TextIOWrapper(io.BytesIO(stdout), encoding=stdout_encoding).read()
    if six.PY2:
        stdout = stdout.decode(stdout_encoding)
    return stdout, stderr, status_code
dataload.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def anyfile(infile, mode='r', encoding="utf8"):
    '''
    return a file handler with the support for gzip/zip comppressed files
    if infile is a two value tuple, then first one is the compressed file;
      the second one is the actual filename in the compressed file.
      e.g., ('a.zip', 'aa.txt')

    '''
    if isinstance(infile, tuple):
        infile, rawfile = infile[:2]
    else:
        rawfile = os.path.splitext(infile)[0]
    filetype = os.path.splitext(infile)[1].lower()
    if filetype == '.gz':
        import gzip
        in_f = io.TextIOWrapper(gzip.GzipFile(infile, 'r'),encoding=encoding)
    elif filetype == '.zip':
        import zipfile
        in_f = io.TextIOWrapper(zipfile.ZipFile(infile, 'r').open(rawfile, 'r'),encoding=encoding)
    else:
        in_f = open(infile, mode, encoding=encoding)
    return in_f
filesys.py 文件源码 项目:srctools 作者: TeamSpen210 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def open_str(self, name: str, encoding='utf8'):
        """Return a string buffer for a 'file'. 

        This performs universal newlines conversion.
        The encoding argument is ignored for files which are
        originally text.
        """
        # We don't need this, but it should match other filesystems.
        self._check_open()

        try:
            filename, data = self._mapping[self._clean_path(name)]
        except KeyError:
            raise FileNotFoundError(name)
        if isinstance(data, bytes):
            # Decode on the fly, with universal newlines.
            return io.TextIOWrapper(
                io.BytesIO(data),
                encoding=encoding,
            )
        else:
            # None = universal newlines mode directly.
            # No encoding is needed obviously.
            return io.StringIO(data, newline=None)
makefile.py 文件源码 项目:atc_alexa 作者: ckuzma 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
os.py 文件源码 项目:ivaochdoc 作者: ivaoch 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering is None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdout=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
    else:
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdin=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
case.py 文件源码 项目:jd4 作者: vijos 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def read_cases(file):
    open = try_open_zip(file)
    if not open:
        file.seek(0)
        open = try_open_tar(file)
        if not open:
            raise FormatError(file, 'not a zip file or tar file')
    try:
        config = TextIOWrapper(open('config.ini'),
                               encoding='utf-8', errors='replace')
        return read_legacy_cases(config, open)
    except FileNotFoundError:
        pass
    try:
        config = open('config.yaml')
        return read_yaml_cases(config, open)
    except FileNotFoundError:
        pass
    raise FormatError('config file not found')
flake8_rst_docstrings.py 文件源码 项目:flake8-rst-docstrings 作者: peterjc 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def load_source(self):
        """Load the source for the specified file."""
        if self.filename in self.STDIN_NAMES:
            self.filename = 'stdin'
            if sys.version_info[0] < 3:
                self.source = sys.stdin.read()
            else:
                self.source = TextIOWrapper(sys.stdin.buffer,
                                            errors='ignore').read()
        else:
            # Could be a Python 2.7 StringIO with no context manager, sigh.
            # with tokenize_open(self.filename) as fd:
            #     self.source = fd.read()
            handle = tokenize_open(self.filename)
            self.source = handle.read()
            handle.close()
amt_import_solver.py 文件源码 项目:AutoMergeTool 作者: xgouchet 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __write_merged_imports(self, file: TextIOWrapper, imports: list):
        """
        Sorts and write the given imports to the file, adding a blank line between
        each group
        f -- the file object (needs write permission)
        imports -- the list of imports
        """
        sorted_imports = self.sort_imports(imports)
        previous_group = -1
        for imp in sorted_imports:
            group = self.get_import_group(imp)
            if not (group == previous_group):
                if not (previous_group == -1):
                    file.write("\n")
                previous_group = group
            file.write(imp)
openpy.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def source_to_unicode(txt, errors='replace', skip_encoding_cookie=True):
    """Converts a bytes string with python source code to unicode.

    Unicode strings are passed through unchanged. Byte strings are checked
    for the python source file encoding cookie to determine encoding.
    txt can be either a bytes buffer or a string containing the source
    code.
    """
    if isinstance(txt, str):
        return txt
    if isinstance(txt, bytes):
        buffer = BytesIO(txt)
    else:
        buffer = txt
    try:
        encoding, _ = detect_encoding(buffer.readline)
    except SyntaxError:
        encoding = "ascii"
    buffer.seek(0)
    text = TextIOWrapper(buffer, encoding, errors=errors, line_buffering=True)
    text.mode = 'r'
    if skip_encoding_cookie:
        return u"".join(strip_encoding_cookie(text))
    else:
        return text.read()
os.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering == None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdout=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
    else:
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdin=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
test_tarfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_fileobj_readlines(self):
        self.tar.extract("ustar/regtype", TEMPDIR)
        tarinfo = self.tar.getmember("ustar/regtype")
        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
            lines1 = fobj1.readlines()

        fobj = self.tar.extractfile(tarinfo)
        try:
            fobj2 = io.TextIOWrapper(fobj)
            lines2 = fobj2.readlines()
            self.assertTrue(lines1 == lines2,
                    "fileobj.readlines() failed")
            self.assertTrue(len(lines2) == 114,
                    "fileobj.readlines() failed")
            self.assertTrue(lines2[83] ==
                    "I will gladly admit that Python is not the fastest running scripting language.\n",
                    "fileobj.readlines() failed")
        finally:
            fobj.close()
test_mailbox.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_add(self):
        # Add copies of a sample message
        keys = []
        keys.append(self._box.add(self._template % 0))
        self.assertEqual(len(self._box), 1)
        keys.append(self._box.add(mailbox.Message(_sample_message)))
        self.assertEqual(len(self._box), 2)
        keys.append(self._box.add(email.message_from_string(_sample_message)))
        self.assertEqual(len(self._box), 3)
        keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
        self.assertEqual(len(self._box), 4)
        keys.append(self._box.add(_sample_message))
        self.assertEqual(len(self._box), 5)
        keys.append(self._box.add(_bytes_sample_message))
        self.assertEqual(len(self._box), 6)
        with self.assertWarns(DeprecationWarning):
            keys.append(self._box.add(
                io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
        self.assertEqual(len(self._box), 7)
        self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
        for i in (1, 2, 3, 4, 5, 6):
            self._check_sample(self._box[keys[i]])
bytecontainer.py 文件源码 项目:py-cloud-compute-cannon 作者: Autodesk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def open(self, mode='r', encoding=None):
        """Return file-like object

        Args:
            mode (str): access mode (only reading modes are supported)
            encoding (str): text decoding method for text access (default: system default)

        Returns:
            io.BytesIO OR io.TextIOWrapper: buffer accessing the file as bytes or characters
        """
        access_type = self._get_access_type(mode)

        if access_type == 't' and encoding is not None and encoding != self.encoded_with:
            warnings.warn('Attempting to decode %s as "%s", but encoding is declared as "%s"'
                          % (self, encoding, self.encoded_with))

        if encoding is None:
            encoding = self.encoded_with

        buffer = io.BytesIO(self._contents)
        if access_type == 'b':
            return buffer
        else:
            return io.TextIOWrapper(buffer, encoding=encoding)
dist.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        false.
        """
        import sys

        if six.PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
        try:
            return _Distribution.handle_display_options(self, option_order)
        finally:
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)
tokenize.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def open(filename):
    """Open a file in read only mode using the encoding detected by
    detect_encoding().
    """
    buffer = _builtin_open(filename, 'rb')
    try:
        encoding, lines = detect_encoding(buffer.readline)
        buffer.seek(0)
        text = TextIOWrapper(buffer, encoding, line_buffering=True)
        text.mode = 'r'
        return text
    except:
        buffer.close()
        raise
makefile.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
makefile.py 文件源码 项目:googletranslate.popclipext 作者: wizyoung 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
audit.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load(self, file):
        try:
            tree = json.load(io.TextIOWrapper(file, encoding='utf8'))
            tree = Audit.SCHEMA.validate(tree)
            self.__artifact = Artifact.fromData(tree["artifact"])
            self.__references = {
                r["artifact-id"] : Artifact.fromData(r) for r in tree["references"]
            }
        except schema.SchemaError as e:
            raise ParseError("Invalid audit record: " + str(e))
        except ValueError as e:
            raise ParseError("Invalid json: " + str(e))
        self.__validate()
audit.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def save(self, file):
        tree = {
            "artifact" : self.__artifact.dump(),
            "references" : [ a.dump() for a in self.__references.values() ]
        }
        try:
            with gzip.open(file, 'wb', 6) as gzf:
                json.dump(tree, io.TextIOWrapper(gzf, encoding='utf8'))
        except OSError as e:
            raise BuildError("Cannot write audit: " + str(e))
main.py 文件源码 项目:pyecharts-snapshot 作者: chfw 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make_a_snapshot(file_name, output_name, delay=DEFAULT_DELAY):
    file_type = output_name.split('.')[-1]
    pixel_ratio = 2
    shell_flag = False
    if sys.platform == 'win32':
        shell_flag = True
    __actual_delay_in_ms = int(delay * 1000)

    # add shell=True and it works on Windows now.
    proc_params = [
        PHANTOMJS_EXE,
        os.path.join(get_resource_dir('phantomjs'), 'snapshot.js'),
        file_name.replace('\\', '/'),
        file_type,
        str(__actual_delay_in_ms),
        str(pixel_ratio)
    ]
    proc = subprocess.Popen(
        proc_params, stdout=subprocess.PIPE, shell=shell_flag)
    if PY2:
        content = proc.stdout.read()
        content = content.decode('utf-8')
    else:
        content = io.TextIOWrapper(proc.stdout, encoding="utf-8").read()
    content_array = content.split(',')
    if len(content_array) != 2:
        raise Exception("No snapshot taken by phantomjs. " +
                        "Please make sure it is installed " +
                        "and available on your path")
    base64_imagedata = content_array[1]
    imagedata = decode_base64(base64_imagedata.encode('utf-8'))
    if file_type in ['pdf', 'gif']:
        save_as(imagedata, output_name, file_type)
    elif file_type in ['png', 'jpeg']:
        save_as_png(imagedata, output_name)
    else:
        raise Exception(NOT_SUPPORTED_FILE_TYPE % file_type)
makefile.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
json.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _wrap_reader_for_text(fp, encoding):
    if isinstance(fp.read(0), bytes):
        fp = io.TextIOWrapper(io.BufferedReader(fp), encoding)
    return fp
json.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _wrap_writer_for_text(fp, encoding):
    try:
        fp.write('')
    except TypeError:
        fp = io.TextIOWrapper(fp, encoding)
    return fp


问题


面经


文章

微信
公众号

扫码关注公众号