python类IO的实例源码

shellscript.py 文件源码 项目:charm-glusterfs 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse(f: IO[Any]) -> Result:
    """
    Parse a shellscript and return a ShellScript
    :param f: TextIOBase handle to the shellscript file
    :return: Result with Ok or Err
    """
    comments = []
    commands = []
    interpreter = ""

    buf = f.readlines()

    for line in buf:
        trimmed = line.strip()
        if trimmed.startswith("#!"):
            interpreter = trimmed
        elif trimmed.startswith("#"):
            comments.append(str(trimmed))
        else:
            # Skip blank lines
            if trimmed:
                commands.append(str(trimmed))
    return Ok(ShellScript(interpreter=interpreter,
                          comments=comments,
                          commands=commands))
fstab.py 文件源码 项目:charm-glusterfs 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parse_entries(self, file: IO[Any]) -> Result:
        """
        Parse fstab entries
        :param file: TextIOWrapper file handle to the fstab
        :return: Result with Ok or Err
        """
        entries = []
        contents = file.readlines()

        for line in contents:
            if line.startswith("#"):
                continue
            parts = line.split()
            if len(parts) != 6:
                continue
            fsck_order = int(parts[5])
            entries.append(FsEntry(
                fs_spec=parts[0],
                mountpoint=os.path.join(parts[1]),
                vfs_type=parts[2],
                mount_options=parts[3].split(","),
                dump=False if parts[4] == "0" else True,
                fsck_order=fsck_order))
        return Ok(entries)
dlvhex2.py 文件源码 项目:py-aspio 作者: hexhex 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, *, process: subprocess.Popen, encoding: str, tmp_input: FilesystemIPC) -> None:
        self.process = process
        self.stdout_encoding = encoding
        self.iterating = False
        #
        # We need to capture stderr in a background thread to avoid deadlocks.
        # (The problem would occur when dlvhex2 is blocked because the OS buffers on the stderr pipe are full... so we have to constantly read from *both* stdout and stderr)
        self.stderr_capture_thread = StreamCaptureThread(self.process.stderr)
        self.stderr_capture_thread.start()
        #
        # Set up finalization. Using weakref.finalize seems to work more robustly than using __del__.
        # (One problem that occurred with __del__: It seemed like python was calling __del__ for self.process and its IO streams first,
        # which resulted in ResourceWarnings even though we were closing the streams properly in our __del__ function.)
        self._finalize = weakref.finalize(self, DlvhexLineReader.__close, process, self.stderr_capture_thread, encoding, tmp_input)  # type: ignore
        # Make sure the subprocess will be terminated if it's still running when the python process exits
        self._finalize.atexit = True
ocad.py 文件源码 项目:pysport 作者: sportorg 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse(self, file):
        if not isinstance(file, str) and not isinstance(file, IO):
            raise TypeError("file is not str or IO")
        if isinstance(file, str):
            try:
                enc = 'windows-1251'
                with open(file, encoding=enc) as f:
                    content = f.readlines()
            except FileNotFoundError:
                raise FileNotFoundError("Not found " + file)
        else:
            content = file.readlines()
        self._data = [x.strip() for x in content]
        self.clear()

        return self
file_hash.py 文件源码 项目:binaryalert 作者: airbnb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _read_in_chunks(file_object: IO[bytes], chunk_size: int = 2*MB) -> Generator[bytes, None, None]:
    """Read a file in fixed-size chunks (to minimize memory usage for large files).

    Args:
        file_object: An opened file-like object supporting read().
        chunk_size: Max size (in bytes) of each file chunk.

    Yields:
        File chunks, each of size at most chunk_size.
    """
    while True:
        chunk = file_object.read(chunk_size)
        if chunk:
            yield chunk
        else:
            return  # End of file.
model.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(
        self,
        _=None,  # type: Optional[Union[AnyStr, typing.Mapping, typing.Sequence, typing.IO]]
    ):
        self._meta = None
        if _ is not None:
            if isinstance(_, HTTPResponse):
                meta.get(self).url = _.url
            _ = deserialize(_)
            for k, v in _.items():
                try:
                    self[k] = v
                except KeyError as e:
                    if e.args and len(e.args) == 1:
                        e.args = (
                            r'%s.%s: %s' % (type(self).__name__, e.args[0], json.dumps(_)),
                        )
                    raise e
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def rawstream(fp):
    # type: (IO[Any]) -> IO[bytes]
    if PY3:
        try:
            return fp.buffer  # type: ignore
        except AttributeError:
            # There might be a BytesIO behind fp.
            pass
    return fp  # type: Optional[IO[bytes]]
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def write(s, fp=None):
    # type: (Union[str, bytes], Optional[IO[Any]]) -> None
    """Write s to the binary stream fp (default is stdout).
    """
    efp = fp if fp is not None else sys.stdout
    rawstream(efp).write(bytestr(s))
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def outline(s=b'', end=b'\n', fp=None):
    # type: (Union[str, bytes], Union[str, bytes], Optional[IO]) -> None
    write(bytestr(s) + bytestr(end), fp=fp)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def category_print(categories, categorytype, category, s, prefix='', end='\n', fp=None):
    # type: (Set[str], str, str, Union[str, bytes], str, str, Optional[IO]) -> None
    if category not in categories:
        return
    if categorytype == 'info':
        msg = prefix
    else:
        msg = '%s%s_%s: ' % (prefix, categorytype, category)
    if MESSAGE_CATEGORY_FILES is not None:
        logfilename = 'whatstyle_%s_%s.log' % (categorytype, category)
        fp = MESSAGE_CATEGORY_FILES.get(logfilename)
        if fp is None:
            path = os.path.join(tempfile.gettempdir(), logfilename)
            fp = open(path, 'wb')
            MESSAGE_CATEGORY_FILES[logfilename] = fp
    if fp is None and LOGFILE:
        global LOGFILEFP
        if not LOGFILEFP:
            LOGFILEFP = open(LOGFILE, 'wb')
        fp = LOGFILEFP
    if fp is None:
        fp = rawstream(sys.stderr if STDERR_OUTPUT else sys.stdout)
    write(msg, fp=fp)
    write(s, fp=fp)
    if end:
        write(end, fp=fp)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def iprint(category, s, prefix='', end='\n', fp=None):
    # type: (str, AnyStr, str, str, Optional[IO[AnyStr]]) -> None
    category_print(args_info, 'info', category, s, prefix, end, fp=fp)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def reporterror(s, fp=None):
    # type: (str, Optional[IO[AnyStr]]) -> None
    if fp is None:
        fp = rawstream(sys.stderr)  # type: ignore
    reportmessage(s, fp=fp)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def soutline(s='', enc='utf-8', fp=None):
    # type: (str, str, Optional[IO[Any]]) -> None
    data = unescape_ill_surrencode(s, enc=enc)
    write(data + b'\n', fp=fp)
app.py 文件源码 项目:puppeter 作者: coi-gov-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, namespace):
        self.__answers = namespace.answers  # type: IO[Any]
        self.__verbose = namespace.verbose  # type: int
        self.__execute = namespace.execute  # type: bool
app.py 文件源码 项目:puppeter 作者: coi-gov-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def answers(self):
        # type: () -> IO[Any]
        return self.__answers
unattendedapp.py 文件源码 项目:puppeter 作者: coi-gov-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __load_answers(gateway, target):
        # type: (AnswersGateway, IO) -> Answers
        return gateway.read_answers_from_file(target)
answers.py 文件源码 项目:puppeter 作者: coi-gov-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def write_answers_to_file(self, answers, target_file):
        # type: (Answers, IO) -> None
        raw_answers = {}
        if answers.installer() is not None:
            raw_answers['installer'] = answers.installer().raw_options()
        if answers.fqdn_configuration() is not None:
            raw_answers['fqdn'] = answers.fqdn_configuration().raw_options()
        if answers.csrattrs_configuration() is not None:
            raw_answers['csr-attributes'] = answers.csrattrs_configuration().raw_options()
        yaml = ruamel.yaml.dump(raw_answers, Dumper=ruamel.yaml.RoundTripDumper)
        target_file.write(yaml)
answers.py 文件源码 项目:puppeter 作者: coi-gov-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def read_answers_from_file(self, target):
        # type: (IO) -> Answers
        pass
predict.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def _run(predictor: Predictor,
         input_file: IO,
         output_file: Optional[IO],
         batch_size: int,
         print_to_console: bool,
         cuda_device: int) -> None:

    def _run_predictor(batch_data):
        if len(batch_data) == 1:
            result = predictor.predict_json(batch_data[0], cuda_device)
            # Batch results return a list of json objects, so in
            # order to iterate over the result below we wrap this in a list.
            results = [result]
        else:
            results = predictor.predict_batch_json(batch_data, cuda_device)

        for model_input, output in zip(batch_data, results):
            string_output = json.dumps(output)
            if print_to_console:
                print("input: ", model_input)
                print("prediction: ", string_output)
            if output_file:
                output_file.write(string_output + "\n")

    batch_json_data = []
    for line in input_file:
        if not line.isspace():
            # Collect batch size amount of data.
            json_data = json.loads(line)
            batch_json_data.append(json_data)
            if len(batch_json_data) == batch_size:
                _run_predictor(batch_json_data)
                batch_json_data = []

    # We might not have a dataset perfectly divisible by the batch size,
    # so tidy up the scraps.
    if batch_json_data:
        _run_predictor(batch_json_data)
scratchdir.py 文件源码 项目:scratchdir 作者: ahawker 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def file(self, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None,
             newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX,
             prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None) -> typing.IO:
        """
        Create a new temporary file within the scratch dir.

        This returns the result of :func:`~tempfile.TemporaryFile` which returns a nameless, file-like object that
        will cease to exist once it is closed.

        :param mode: (Optional) mode to open the file with
        :type mode: :class:`~str`
        :param buffering: (Optional) size of the file buffer
        :type buffering: :class:`~int`
        :param encoding: (Optional) encoding to open the file with
        :type encoding: :class:`~str` or :class:`~NoneType`
        :param newline: (Optional) newline argument to open the file with
        :type newline: :class:`~str` or :class:`~NoneType`
        :param suffix: (Optional) filename suffix
        :type suffix: :class:`~str` or :class:`~NoneType`
        :param prefix: (Optional) filename prefix
        :type prefix: :class:`~str` or :class:`~NoneType`
        :param dir: (Optional) relative path to directory within the scratch dir where the file should exist
        :type dir: :class:`~str` or :class:`~NoneType`
        :return: file-like object as returned by :func:`~tempfile.TemporaryFile`
        :rtype: :class:`~_io.BufferedRandom`
        """
        return tempfile.TemporaryFile(mode, buffering, encoding, newline,
                                      suffix, prefix, self.join(dir))
scratchdir.py 文件源码 项目:scratchdir 作者: ahawker 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def named(self, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None,
              newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX,
              prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None,
              delete: bool = True) -> typing.IO:
        """
        Create a new named temporary file within the scratch dir.

        This returns the result of :func:`~tempfile.NamedTemporaryFile` which returns a named, file-like object that
        will cease to exist once it is closed unless `delete` is set to `False`.

        :param mode: (Optional) mode to open the file with
        :type mode: :class:`~str`
        :param buffering: (Optional) size of the file buffer
        :type buffering: :class:`~int`
        :param encoding: (Optional) encoding to open the file with
        :type encoding: :class:`~str` or :class:`~NoneType`
        :param newline: (Optional) newline argument to open the file with
        :type newline: :class:`~str` or :class:`~NoneType`
        :param suffix: (Optional) filename suffix
        :type suffix: :class:`~str` or :class:`~NoneType`
        :param prefix: (Optional) filename prefix
        :type prefix: :class:`~str` or :class:`~NoneType`
        :param dir: (Optional) relative path to directory within the scratch dir where the file should exist
        :type dir: :class:`~str` or :class:`~NoneType`
        :param delete: (Optional) flag to indicate if the file should be deleted from disk when it is closed
        :type delete: :class:`~bool`
        :return: file-like object as returned by :func:`~tempfile.NamedTemporaryFile`
        :rtype: :class:`~_io.TemporaryFileWrapper`
        """
        return tempfile.NamedTemporaryFile(mode, buffering, encoding, newline,
                                           suffix, prefix, self.join(dir), delete)
scratchdir.py 文件源码 项目:scratchdir 作者: ahawker 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def spooled(self, max_size: int = 0, mode: str = 'w+b', buffering: int = -1,
                encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None,
                suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX,
                dir: typing.Optional[str] = None) -> typing.IO:
        """
        Create a new spooled temporary file within the scratch dir.

        This returns a :class:`~tempfile.SpooledTemporaryFile` which is a specialized object that wraps a
        :class:`StringIO`/:class:`BytesIO` instance that transparently overflows into a file on the disk once it
        reaches a certain size.

        By default, a spooled file will never roll over to disk.

        :param max_size: (Optional) max size before the in-memory buffer rolls over to disk
        :type max_size: :class:`~int`
        :param mode: (Optional) mode to open the file with
        :type mode: :class:`~str`
        :param buffering: (Optional) size of the file buffer
        :type buffering: :class:`~int`
        :param encoding: (Optional) encoding to open the file with
        :type encoding: :class:`~str`
        :param newline: (Optional) newline argument to open the file with
        :type newline: :class:`~str` or :class:`~NoneType`
        :param suffix: (Optional) filename suffix
        :type suffix: :class:`~str` or :class:`~NoneType`
        :param prefix: (Optional) filename prefix
        :type prefix: :class:`~str` or :class:`~NoneType`
        :param dir: (Optional) relative path to directory within the scratch dir where the file should exist
        :type dir: :class:`~bool`
        :return: SpooledTemporaryFile instance
        :rtype: :class:`~tempfile.SpooledTemporaryFile`
        """
        return tempfile.SpooledTemporaryFile(max_size, mode, buffering, encoding,
                                             newline, suffix, prefix, self.join(dir))
utilities.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sha1_from_file_object(file_object: typing.IO[bytes]):
    block_size = 65536
    hasher = hashlib.sha1()
    buf = file_object.read(block_size)
    while len(buf) > 0:
        hasher.update(buf)
        buf = file_object.read(block_size)
    file_object.close()
    return hasher.hexdigest()
utils.py 文件源码 项目:sockeye 作者: awslabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, candidates: List[int], lock_dir: str) -> None:
        self.candidates = candidates
        self.lock_dir = lock_dir
        self.lock_file = None  # type: Optional[IO[Any]]
        self.lock_file_path = None  # type: Optional[str]
        self.gpu_id = None  # type: Optional[int]
        self._acquired_lock = False
changelog.py 文件源码 项目:smartchangelog 作者: ngouzy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def print_multilines(cls, name: str, value: str, file: IO):
        if value:
            lines = value.split('\n')
            if len(lines) == 1:
                print("    * {name}: {value}".format(name=name, value=value), file=file)
            else:
                print("    * {name}:".format(name=name), file=file)
                for line in lines:
                    print("        - {line}".format(line=line), file=file)
changelog.py 文件源码 项目:smartchangelog 作者: ngouzy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def print_leaf(cls, commit: Commit, file: IO) -> None:
        print("* subject: {subject}".format(subject=commit.subject or ''), file=file)
        cls.print_multilines(name='body', value=commit.body, file=file)
        print("    * date: {date}".format(date=datetools.date2str(commit.date)), file=file)
        print("    * author: {author}".format(author=commit.author), file=file)
        print("    * commit: {id}".format(id=commit.id), file=file)
changelog.py 文件源码 项目:smartchangelog 作者: ngouzy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def print_header(self, node: 'Node', file: IO):
        print(
            "{header} {criterion_name}: {name}".format(
                header="#" * (self.depth_level() + 1),
                criterion_name=Commit.property_name(node.criterion),
                name=node.name
            ),
            file=file
        )
        print(file=file)
storage.py 文件源码 项目:twlived 作者: tausackhn 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, view: View, info: Dict[str, Any], api: TwitchAPI, quality: str, temp_dir: str = '.') -> None:
        if not TwitchVideo._schema:
            with open('video_info.schema') as json_data:
                TwitchVideo._schema = json.load(json_data)
        self._validate_info(info)

        self.info = info
        self.api = api
        self.quality = quality
        self.temp_dir = temp_dir
        self.view = view

        self.download_done: bool = False
        self.file: Optional[IO[bytes]] = None
util.py 文件源码 项目:sk-torch 作者: mattHawthorn 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def open_file(path: Union[str, IO], mode='rb'):
    if isinstance(path, str):
        file = open(path, mode)
    else:
        file = path
    return file
interface.py 文件源码 项目:sk-torch 作者: mattHawthorn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def save(self, path: Union[str, IO]):
        state = self.__getstate__()
        with open_file(path, 'wb') as outfile:
            pickle.dump(state, outfile)


问题


面经


文章

微信
公众号

扫码关注公众号