def load(cls, path: Union[str, IO]) -> 'TorchModel':
with open_file(path, 'rb') as infile:
state = pickle.load(infile)
model = cls.__new__(cls)
model.__setstate__(state)
return model
# for using pickle.dump/load directly
python类IO的实例源码
def __init__(self, stream: IO[S]) -> None:
super().__init__(daemon=True)
self.stream = stream
self.data = None # type: S
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
config = configparser.ConfigParser()
config.read_dict(config_dict)
config.write(config_file, space_around_delimiters=False)
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
config = configparser.ConfigParser()
config.read_dict(config_dict)
config.write(config_file, space_around_delimiters=False)
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
config = configparser.ConfigParser()
config.read_dict(config_dict)
config.write(config_file, space_around_delimiters=False)
def create_test_file(self) -> (typing.IO, bytes):
import io
import secrets
file_contents = secrets.token_bytes(512)
test_file: typing.IO = io.BytesIO(file_contents)
return test_file, file_contents
def _export(deck: db.Deck, handle: IO[Any]) -> None:
json.dump(
{
'name': deck.name,
'description': deck.description,
'tags':
[{
'name': tag.name,
'color': tag.color,
} for tag in deck.tags],
'cards':
[{
'id': card.num,
'question': card.question,
'answers': card.answers,
'active': card.is_active,
'activation_date': card.activation_date,
'tags': [tag.name for tag in card.tags],
'user_answers':
[{
'date': answer.date,
'correct': answer.is_correct,
} for answer in card.user_answers],
} for card in deck.cards],
},
handle,
default=_json_serializer,
separators=(',', ':'),
check_circular=False)
def configure(sources: Iterable[Path],
cmd: str,
blddir: Path,
out: IO[str] = sys.stdout) -> None:
namer = ObjectFileNamer()
args = cmd.split()
fortran_tasks = {str(path): {
'source': str(path),
'args': args + [str(blddir/namer(path))]
} for path in sources}
json.dump(fortran_tasks, out)
def download_course(course):
folder_name = os.path.join("drumeo", course.number)
if not os.path.isdir(folder_name):
os.makedirs(folder_name)
details = os.path.join(folder_name, "details.txt")
if not os.path.isfile(details):
with open(details, "wt") as file_handle: # type: IO[str]
print("course_number: {}".format(course.number), file=file_handle)
print("course_name: {}".format(course.name), file=file_handle)
print("course_difficulty: {}".format(course.diff), file=file_handle)
print("instructor: {}".format(course.instructor), file=file_handle)
if course.resources is not None:
download_url(course.resources, os.path.join(folder_name, "resources.zip"))
for i, (video, quality) in enumerate(course.videos):
download_video_if_wider(video, os.path.join(folder_name, "{}.mp4".format(i)), width=int(quality))
def open(self, filepath):
# type: (str) -> IO[str]
filepath = os.path.normpath(filepath)
abs_filepath = os.path.join(self._root_dir, filepath)
if abs_filepath.startswith(self._root_dir):
return open(abs_filepath)
else:
raise PermissionError("Cannot open file \"{}\". Bots may only access "
"files in their local directory.".format(abs_filepath))
def generate_and_write(filepaths, file_obj):
# type: (Iterator[str], IO[str]) -> None
template = 'include {line}\n'
lines = map(lambda line: template.format(line=line), filepaths)
file_obj.writelines(lines)
file_obj.write('\n')
def process_loop(log):
# type: (IO[Any]) -> None
restart_check_count = 0
last_check_time = time.time()
while True:
select.select([zephyr._z.getFD()], [], [], 15)
try:
# Fetch notices from the queue until its empty
while True:
notice = zephyr.receive(block=False)
if notice is None:
break
try:
process_notice(notice, log)
except Exception:
logger.exception("Error relaying zephyr:")
time.sleep(2)
except Exception:
logger.exception("Error checking for new zephyrs:")
time.sleep(1)
continue
if time.time() - last_check_time > 15:
last_check_time = time.time()
try:
maybe_restart_mirroring_script()
if restart_check_count > 0:
logger.info("Stopped getting errors checking whether restart is required.")
restart_check_count = 0
except Exception:
if restart_check_count < 5:
logger.exception("Error checking whether restart is required:")
restart_check_count += 1
if options.forward_class_messages:
try:
update_subscriptions()
except Exception:
logger.exception("Error updating subscriptions from Zulip:")
def call_endpoint(self, url=None, method="POST", request=None, longpolling=False, files=None):
# type: (str, str, Dict[str, Any], bool, List[IO[Any]]) -> Dict[str, Any]
if request is None:
request = dict()
return self.do_api_query(request, API_VERSTRING + url, method=method,
longpolling=longpolling, files=files)
def upload_file(self, file):
# type: (IO[Any]) -> Dict[str, Any]
'''
See examples/upload-file for example usage.
'''
return self.call_endpoint(
url='user_uploads',
files=[file]
)
def unidump(inbytes: IO[bytes], env: Env) -> None:
"""take a list of bytes and print their Unicode codepoints
>>> import io
>>> import sys
>>> from unidump.env import Env
>>> _env = Env(linelength=4, output=sys.stdout)
>>> unidump(io.BytesIO(b'\\x01\\xF0\\x9F\\x99\\xB8ABC'), _env)
0 0001 1F678 0041 0042 .\U0001F678AB
7 0043 C
>>> unidump(io.BytesIO(b'\\xD7'), _env)
0 ?D7? X
>>> _env.encoding = 'latin1'
>>> unidump(io.BytesIO(b'\\xD7'), _env)
0 00D7 \u00D7
"""
byteoffset = 0
bytebuffer = b''
current_line = [0, [], '']
byte = inbytes.read(1)
while byte:
byteoffset += 1
bytebuffer += byte
try:
char = bytebuffer.decode(env.encoding)
except UnicodeDecodeError:
next_byte = inbytes.read(1)
if not next_byte or len(bytebuffer) >= 4:
for i, x in enumerate(bytebuffer):
current_line = (
fill_and_print(current_line, byteoffset - 4 + i,
'?{:02X}?'.format(x), 'X', env)
)
bytebuffer = b''
byte = next_byte
continue
else:
current_line = (
fill_and_print(current_line, byteoffset - len(bytebuffer),
'{:04X}'.format(ord(char)), sanitize_char(char),
env)
)
bytebuffer = b''
byte = inbytes.read(1)
print_line(current_line, env)
def _import(handle: IO[Any]) -> None:
with db.session_scope() as session:
deck_obj = json.load(handle)
deck = db.Deck()
deck.name = deck_obj['name']
deck.description = deck_obj['description']
existing_deck = db.try_get_deck_by_name(session, deck.name)
if existing_deck:
if not util.confirm(
'Are you sure you want to overwrite deck %r?' % deck.name):
return
session.delete(existing_deck)
session.commit()
tag_dict = {}
for tag_obj in deck_obj['tags']:
tag = db.Tag()
tag.name = tag_obj['name']
tag.color = tag_obj['color']
deck.tags.append(tag)
tag_dict[tag.name] = tag
for card_obj in deck_obj['cards']:
card = db.Card()
card.num = card_obj['id']
card.question = card_obj['question']
card.answers = card_obj['answers']
card.is_active = card_obj['active']
card.tags = [tag_dict[name] for name in card_obj['tags']]
for user_answer_obj in card_obj['user_answers']:
user_answer = db.UserAnswer()
user_answer.date = parse_date(user_answer_obj['date'])
user_answer.is_correct = user_answer_obj['correct']
card.user_answers.append(user_answer)
if 'activation_date' in card_obj:
if card_obj['activation_date']:
card.activation_date = parse_date(
card_obj['activation_date'])
elif card.user_answers:
card.activation_date = sorted(
card.user_answers, key=lambda ua: ua.date)[0].date
card.due_date = scheduler.next_due_date(card)
deck.cards.append(card)
session.add(deck)