python类IO的实例源码

interface.py 文件源码 项目:sk-torch 作者: mattHawthorn 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def load(cls, path: Union[str, IO]) -> 'TorchModel':
        with open_file(path, 'rb') as infile:
            state = pickle.load(infile)
        model = cls.__new__(cls)
        model.__setstate__(state)
        return model

    # for using pickle.dump/load directly
stream_capture_thread.py 文件源码 项目:py-aspio 作者: hexhex 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, stream: IO[S]) -> None:
        super().__init__(daemon=True)
        self.stream = stream
        self.data = None  # type: S
parsing.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
    config = configparser.ConfigParser()
    config.read_dict(config_dict)
    config.write(config_file, space_around_delimiters=False)
parsing.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
    config = configparser.ConfigParser()
    config.read_dict(config_dict)
    config.write(config_file, space_around_delimiters=False)
parsing.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
    config = configparser.ConfigParser()
    config.read_dict(config_dict)
    config.write(config_file, space_around_delimiters=False)
test_file_storage_backends.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_test_file(self) -> (typing.IO, bytes):
        import io
        import secrets

        file_contents = secrets.token_bytes(512)
        test_file: typing.IO = io.BytesIO(file_contents)

        return test_file, file_contents
export.py 文件源码 项目:drill 作者: rr- 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _export(deck: db.Deck, handle: IO[Any]) -> None:
    json.dump(
        {
            'name': deck.name,
            'description': deck.description,
            'tags':
            [{
                'name': tag.name,
                'color': tag.color,
            } for tag in deck.tags],
            'cards':
            [{
                'id': card.num,
                'question': card.question,
                'answers': card.answers,
                'active': card.is_active,
                'activation_date': card.activation_date,
                'tags': [tag.name for tag in card.tags],
                'user_answers':
                [{
                    'date': answer.date,
                    'correct': answer.is_correct,
                } for answer in card.user_answers],
            } for card in deck.cards],
        },
        handle,
        default=_json_serializer,
        separators=(',', ':'),
        check_circular=False)
fconfigure.py 文件源码 项目:fcompile 作者: azag0 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def configure(sources: Iterable[Path],
              cmd: str,
              blddir: Path,
              out: IO[str] = sys.stdout) -> None:
    namer = ObjectFileNamer()
    args = cmd.split()
    fortran_tasks = {str(path): {
        'source': str(path),
        'args': args + [str(blddir/namer(path))]
    } for path in sources}
    json.dump(fortran_tasks, out)
drumeo.py 文件源码 项目:pyscrapers 作者: veltzer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def download_course(course):
    folder_name = os.path.join("drumeo", course.number)
    if not os.path.isdir(folder_name):
        os.makedirs(folder_name)
    details = os.path.join(folder_name, "details.txt")
    if not os.path.isfile(details):
        with open(details, "wt") as file_handle:  # type: IO[str]
            print("course_number: {}".format(course.number), file=file_handle)
            print("course_name: {}".format(course.name), file=file_handle)
            print("course_difficulty: {}".format(course.diff), file=file_handle)
            print("instructor: {}".format(course.instructor), file=file_handle)
    if course.resources is not None:
        download_url(course.resources, os.path.join(folder_name, "resources.zip"))
    for i, (video, quality) in enumerate(course.videos):
        download_video_if_wider(video, os.path.join(folder_name, "{}.mp4".format(i)), width=int(quality))
lib.py 文件源码 项目:python-zulip-api 作者: zulip 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def open(self, filepath):
        # type: (str) -> IO[str]
        filepath = os.path.normpath(filepath)
        abs_filepath = os.path.join(self._root_dir, filepath)
        if abs_filepath.startswith(self._root_dir):
            return open(abs_filepath)
        else:
            raise PermissionError("Cannot open file \"{}\". Bots may only access "
                                  "files in their local directory.".format(abs_filepath))
generate_manifest.py 文件源码 项目:python-zulip-api 作者: zulip 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def generate_and_write(filepaths, file_obj):
    # type: (Iterator[str], IO[str]) -> None
    template = 'include {line}\n'
    lines = map(lambda line: template.format(line=line), filepaths)

    file_obj.writelines(lines)
    file_obj.write('\n')
zephyr_mirror_backend.py 文件源码 项目:python-zulip-api 作者: zulip 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def process_loop(log):
    # type: (IO[Any]) -> None
    restart_check_count = 0
    last_check_time = time.time()
    while True:
        select.select([zephyr._z.getFD()], [], [], 15)
        try:
            # Fetch notices from the queue until its empty
            while True:
                notice = zephyr.receive(block=False)
                if notice is None:
                    break
                try:
                    process_notice(notice, log)
                except Exception:
                    logger.exception("Error relaying zephyr:")
                    time.sleep(2)
        except Exception:
            logger.exception("Error checking for new zephyrs:")
            time.sleep(1)
            continue

        if time.time() - last_check_time > 15:
            last_check_time = time.time()
            try:
                maybe_restart_mirroring_script()
                if restart_check_count > 0:
                    logger.info("Stopped getting errors checking whether restart is required.")
                    restart_check_count = 0
            except Exception:
                if restart_check_count < 5:
                    logger.exception("Error checking whether restart is required:")
                    restart_check_count += 1

            if options.forward_class_messages:
                try:
                    update_subscriptions()
                except Exception:
                    logger.exception("Error updating subscriptions from Zulip:")
__init__.py 文件源码 项目:python-zulip-api 作者: zulip 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def call_endpoint(self, url=None, method="POST", request=None, longpolling=False, files=None):
        # type: (str, str, Dict[str, Any], bool, List[IO[Any]]) -> Dict[str, Any]
        if request is None:
            request = dict()
        return self.do_api_query(request, API_VERSTRING + url, method=method,
                                 longpolling=longpolling, files=files)
__init__.py 文件源码 项目:python-zulip-api 作者: zulip 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def upload_file(self, file):
        # type: (IO[Any]) -> Dict[str, Any]
        '''
            See examples/upload-file for example usage.
        '''
        return self.call_endpoint(
            url='user_uploads',
            files=[file]
        )
__init__.py 文件源码 项目:unidump 作者: Codepoints 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def unidump(inbytes: IO[bytes], env: Env) -> None:
    """take a list of bytes and print their Unicode codepoints

    >>> import io
    >>> import sys
    >>> from unidump.env import Env
    >>> _env = Env(linelength=4, output=sys.stdout)
    >>> unidump(io.BytesIO(b'\\x01\\xF0\\x9F\\x99\\xB8ABC'), _env)
          0    0001 1F678 0041 0042    .\U0001F678AB
          7    0043                   C
    >>> unidump(io.BytesIO(b'\\xD7'), _env)
          0    ?D7?                   X
    >>> _env.encoding = 'latin1'
    >>> unidump(io.BytesIO(b'\\xD7'), _env)
          0    00D7                   \u00D7
    """

    byteoffset = 0
    bytebuffer = b''
    current_line = [0, [], '']

    byte = inbytes.read(1)
    while byte:
        byteoffset += 1
        bytebuffer += byte

        try:
            char = bytebuffer.decode(env.encoding)
        except UnicodeDecodeError:
            next_byte = inbytes.read(1)
            if not next_byte or len(bytebuffer) >= 4:
                for i, x in enumerate(bytebuffer):
                    current_line = (
                        fill_and_print(current_line, byteoffset - 4 + i,
                                       '?{:02X}?'.format(x), 'X', env)
                    )
                bytebuffer = b''
            byte = next_byte
            continue
        else:
            current_line = (
                fill_and_print(current_line, byteoffset - len(bytebuffer),
                               '{:04X}'.format(ord(char)), sanitize_char(char),
                               env)
            )

        bytebuffer = b''
        byte = inbytes.read(1)

    print_line(current_line, env)
import.py 文件源码 项目:drill 作者: rr- 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _import(handle: IO[Any]) -> None:
    with db.session_scope() as session:
        deck_obj = json.load(handle)

        deck = db.Deck()
        deck.name = deck_obj['name']
        deck.description = deck_obj['description']

        existing_deck = db.try_get_deck_by_name(session, deck.name)
        if existing_deck:
            if not util.confirm(
                    'Are you sure you want to overwrite deck %r?' % deck.name):
                return
            session.delete(existing_deck)
            session.commit()

        tag_dict = {}
        for tag_obj in deck_obj['tags']:
            tag = db.Tag()
            tag.name = tag_obj['name']
            tag.color = tag_obj['color']
            deck.tags.append(tag)
            tag_dict[tag.name] = tag

        for card_obj in deck_obj['cards']:
            card = db.Card()
            card.num = card_obj['id']
            card.question = card_obj['question']
            card.answers = card_obj['answers']
            card.is_active = card_obj['active']
            card.tags = [tag_dict[name] for name in card_obj['tags']]
            for user_answer_obj in card_obj['user_answers']:
                user_answer = db.UserAnswer()
                user_answer.date = parse_date(user_answer_obj['date'])
                user_answer.is_correct = user_answer_obj['correct']
                card.user_answers.append(user_answer)
            if 'activation_date' in card_obj:
                if card_obj['activation_date']:
                    card.activation_date = parse_date(
                        card_obj['activation_date'])
            elif card.user_answers:
                card.activation_date = sorted(
                    card.user_answers, key=lambda ua: ua.date)[0].date
            card.due_date = scheduler.next_due_date(card)
            deck.cards.append(card)
        session.add(deck)


问题


面经


文章

微信
公众号

扫码关注公众号