def instrument_name(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
length = VariableLengthValue(data).value
raw_data = bytearray(data.read(length))
try:
text = raw_data.decode("ASCII")
except UnicodeDecodeError as exc:
raise EventTextError("Unparsable text in instrument name") from exc
return length, text, raw_data
python类FileIO()的实例源码
def marker(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
length = VariableLengthValue(data).value
raw_data = bytearray(data.read(length))
try:
text = raw_data.decode("ASCII")
except UnicodeDecodeError as exc:
raise EventTextError("Unparseable text in marker text") from exc
return length, text, raw_data
def cue_point(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
length = VariableLengthValue(data).value
raw_data = bytearray(data.read(length))
try:
text = raw_data.decode("ASCII")
except UnicodeDecodeError as exc:
raise EventTextError("Unparseable text in Cue Point text") from exc
return length, text, raw_data
def channel_prefix(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]:
length_bytes = data.read(4)
length = int.from_bytes(length_bytes, "big")
if length != 0x01:
raise EventLengthError("Channel Prefix length invalid. It should be 1, but it's {}".format(length))
prefix_raw = bytearray(data.read(1))
prefix = int.from_bytes(prefix_raw, "big")
return length, prefix, prefix_raw
def end_of_track(data: Union[FileIO, BufferedReader]) -> Tuple[int, None, None]:
length_bytes = data.read(4)
length = int.from_bytes(length_bytes, "big")
if length != 0:
raise EventLengthError("End of Track event with non-zero length")
return length, None, None
def set_tempo(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]:
length_bytes = data.read(4)
length = int.from_bytes(length_bytes, "big")
if length != 3:
raise EventLengthError("Set Tempo event with length other than 3. Given length was {}".format(length))
raw_data = bytearray(data.read(3))
tpqm = int.from_bytes(raw_data, "big")
return length, tpqm, raw_data
def time_signature(data: Union[FileIO, BufferedReader]) -> Tuple[int, Tuple[int, int, int, int], bytearray]:
length_bytes = bytearray(data.read(1))
length = int.from_bytes(length_bytes, "big")
if length != 0x04:
raise EventLengthError("Time Signature event has invalid length. Should be 4, value was {}".format(length))
data_bytes = bytearray(data.read(4)) # type: bytearray
numerator = data_bytes[0] # type: int
denominator = data_bytes[1] # type: int
clock_num = data_bytes[2]
ts_number = data_bytes[3]
return length, (numerator, denominator, clock_num, ts_number), data_bytes
def key_signature(data: Union[FileIO, BufferedReader]) -> Tuple[int, Tuple[int, int], bytearray]:
length_bytes = bytearray(data.read(1))
length = int.from_bytes(length_bytes, "big")
if length != 0x02:
raise EventLengthError("Key Signature event has invalid length. Should be 2, value was {}".format(length))
data_bytes = bytearray(data.read(2))
signature_index = data_bytes[0]
minor_major = data_bytes[1]
return length, (signature_index, minor_major), data_bytes
def __init__(self, data: Union[FileIO, BufferedReader]) -> None:
chunk_name = data.read(4)
if chunk_name != b'MTrk':
raise ValueError("Track Chunk header invalid")
self.length = int.from_bytes(data.read(4), 'big')
def _parse(self, data: Union[FileIO, BufferedReader]):
delta_time = VariableLengthValue(data)
def patch_open():
'''Patch open() to allow mocking both open() itself and the file that is
yielded.
Yields the mock for "open" and "file", respectively.'''
mock_open = MagicMock(spec=open)
mock_file = MagicMock(spec=io.FileIO)
@contextmanager
def stub_open(*args, **kwargs):
mock_open(*args, **kwargs)
yield mock_file
with patch('builtins.open', stub_open):
yield mock_open, mock_file
def patch_open():
'''Patch open() to allow mocking both open() itself and the file that is
yielded.
Yields the mock for "open" and "file", respectively.'''
mock_open = MagicMock(spec=open)
mock_file = MagicMock(spec=io.FileIO)
@contextmanager
def stub_open(*args, **kwargs):
mock_open(*args, **kwargs)
yield mock_file
with patch('builtins.open', stub_open):
yield mock_open, mock_file
def _format_files(cls, *fobjs: FileIO) -> List[HttpFile]:
files = []
for fobj in fobjs:
filename = basename(fobj.name)
extension = splitext(filename)[1]
content_type = 'application/{}'.format(extension)
files.append(('file', (filename, fobj, content_type)))
return files
def clone():
print (''+T+'Remember to put https:// in front of the website!')
hey = raw_input(''+T+'' + color.UNDERLINE + 'Website>' + color.END)
response = urllib2.urlopen(hey)
page_source = response.read()
with io.FileIO("websitesource.html", "w") as file:
file.write(page_source)
print (''+G+'[*] Finished!')
def isfileobj(f):
return isinstance(f, (io.FileIO, io.BufferedReader, io.BufferedWriter))
def isfileobj(f):
return isinstance(f, io.FileIO)
def isfileobj(f):
return isinstance(f, (io.FileIO, io.BufferedReader))
def isfile(f):
if isinstance(f, io.FileIO):
return True
elif hasattr(f, 'buffer'):
return isfile(f.buffer)
elif hasattr(f, 'raw'):
return isfile(f.raw)
return False
def download_revisions(httpauth, service, fileID, title, path, counter, log_file):
if not os.path.exists(path + "/" + title):
os.makedirs(path + "/" + title)
url = "https://www.googleapis.com/drive/v3/files/" + fileID + "/revisions"
resp, content = httpauth.request(url, 'GET')
revisions = json.loads(content.decode('utf-8'))
revision_info = []
rev_num = 1
for revision in revisions["revisions"]:
revision_info.append([str(rev_num), revision["id"], revision["modifiedTime"]])
file_path = path + "/" + title + "/" + title + ".rev" + str(rev_num)
orig_title = str(title)
# to prevent duplicate file names being saved
if os.path.exists(file_path):
file_path, title = get_new_file_name(file_path)
log_and_print(log_file, counter + " File named '" + orig_title + "' already exists. Saving as '" + title + "' instead.")
log_and_print(log_file, counter + " Downloading '" + title + ".rev" + str(rev_num) + "'...")
request = service.revisions().get_media(fileId=fileID, revisionId=revision["id"])
fh = io.FileIO(file_path, mode='wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
# Print status of download (mainly for larger files)
print("%d%%\r" % int(status.progress() * 100), end="", flush=True)
fh.close()
log_and_print(log_file, counter + " Hashing '" + title + ".rev" + str(rev_num) + "'...")
with open(path + "/_hashes.txt", "a") as hashes_file:
hashes_file.write(title + ".rev" + str(rev_num) + "\n")
hashes_file.write("--MD5: " + hash_file(file_path, "md5") + "\n")
hashes_file.write("--SHA1: " + hash_file(file_path, "sha1") + "\n")
hashes_file.write("--SHA256: " + hash_file(file_path, "sha256") + "\n")
rev_num += 1
log_and_print(log_file, counter + " Writing revision info for '" + title + "'...")
with open(path + "/" + title + "/" + title + "_revisions.txt", "w") as saved_file:
for item in revision_info:
saved_file.write("Revision Number: " + item[0] + "\n")
saved_file.write("--Revision ID: " + item[1] + "\n")
saved_file.write("--Revision Last Modifed: " + item[2] + "\n")
# Check if there are revisions for a given fileID