def create_media(media):
"""Download media link"""
if is_valid_url(media.data_value):
filename = media.data_value.split('/')[-1]
data_file = NamedTemporaryFile()
content_type = mimetypes.guess_type(filename)
with closing(requests.get(media.data_value, stream=True)) as r:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
data_file.write(chunk)
data_file.seek(os.SEEK_SET, os.SEEK_END)
size = os.path.getsize(data_file.name)
data_file.seek(os.SEEK_SET)
media.data_value = filename
media.data_file = InMemoryUploadedFile(
data_file, 'data_file', filename, content_type,
size, charset=None)
return media
return None
python类SEEK_END的实例源码
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = ""
self.fileobj.seek(self.position)
def map_file(cls, fileobj, offset = 0, size = None):
# If no size is given, it's the whole file by default
if size is None:
fileobj.seek(0, os.SEEK_END)
size = fileobj.tell() - offset
# Read the footer
fileobj.seek(offset + size - cls._Footer.size)
values_pos, = cls._Footer.unpack(fileobj.read(cls._Footer.size))
fileobj.seek(offset)
# Map everything
id_mapper = cls.IdMapper.map_file(fileobj, offset, size = values_pos)
value_array = cls.ValueArray.map_file(fileobj, offset + values_pos,
size = size - cls._Footer.size - values_pos)
return cls(value_array, id_mapper)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def md5sum2(filename, offset=0, partsize=0):
m = get_md5()
fp = open(filename, 'rb')
if offset > os.path.getsize(filename):
fp.seek(os.SEEK_SET, os.SEEK_END)
else:
fp.seek(offset)
left_len = partsize
BufferSize = BUFFER_SIZE
while True:
if left_len <= 0:
break
elif left_len < BufferSize:
buffer_content = fp.read(left_len)
else:
buffer_content = fp.read(BufferSize)
m.update(buffer_content)
left_len = left_len - len(buffer_content)
md5sum = m.hexdigest()
return md5sum
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def try_enc():
import os
inFile = open('x.wav', 'rb')
inFile.seek(0, os.SEEK_END)
wavFileSize = inFile.tell()
inFile.seek(44) # skip wav header
outFile = open('x.mp3', 'wb')
lame = LameEncoder(44100,1,128)
while(1):
inBytes = inFile.read(512)
if inBytes == '':
break
#inBuf = ctypes.create_string_buffer(inBytes, 512)
sample_count = len(inBytes) /2
output_buff_len = int(1.25 * sample_count + 7200)
output_buff = (ctypes.c_char*output_buff_len)()
lame.dll.lame_encode_buffer.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_char), ctypes.c_int];
output_size = lame.dll.lame_encode_buffer(lame.lame, inBytes, 0, len(inBytes)/2, output_buff, output_buff_len);
outFile.write(output_buff[0:output_size])
outFile.close()
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = ""
self.fileobj.seek(self.position)
def test_stream(self):
# guess content types from extension
descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), extension='.txt')
self.assertIsInstance(descriptor, StreamDescriptor)
self.assertEqual(descriptor.content_type, 'text/plain')
descriptor.seek(2)
self.assertEqual(descriptor.tell(), 2)
descriptor.seek(0, os.SEEK_END)
self.assertEqual(descriptor.tell(), 11)
# guess extension from original filename
descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), original_filename='letter.pdf')
self.assertEqual(descriptor.extension, '.pdf')
# guess extension from content type
descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), content_type='application/json')
self.assertEqual(descriptor.extension, '.json')
self.assertRaises(DescriptorOperationError, lambda: descriptor.filename)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def reopen(self):
if self.inner_file is not None:
self.inner_file.close()
open(self.filename, 'a').close()
f = open(self.filename, 'rb')
f.seek(0, os.SEEK_END)
length = f.tell()
if length > 100*1000*1000:
f.seek(-1000*1000, os.SEEK_END)
while True:
if f.read(1) in ('', '\n'):
break
data = f.read()
f.close()
f = open(self.filename, 'wb')
f.write(data)
f.close()
self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def xor_file(input_file, output_file, xorkey):
number_added = 0
while True:
some_bytes = input_file.read(4)
if len(some_bytes) == 0:
break
if len(some_bytes) % 4 != 0:
number_added = 4 - len(some_bytes)
some_bytes = some_bytes + "\x00" * (number_added)
writable_bytes = struct.pack("<I", (struct.unpack("<I", some_bytes)[0]) ^ xorkey)
output_file.write(writable_bytes)
if number_added != 0:
number_added = 0 - number_added
output_file.seek(number_added, os.SEEK_END)
output_file.truncate()
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def read_random_line(f):
import os
chunk_size = 16
with open(f, 'rb') as f_handle:
f_handle.seek(0, os.SEEK_END)
size = f_handle.tell()
# i = random.randint(0, size)
i = u_randint(0, size)
while True:
i -= chunk_size
if i < 0:
chunk_size += i
i = 0
f_handle.seek(i, os.SEEK_SET)
d = f_handle.read(chunk_size)
i_newline = d.rfind(b'\n')
if i_newline != -1:
i += i_newline + 1
break
if i == 0:
break
f_handle.seek(i, os.SEEK_SET)
return f_handle.readline()
def _is_ascii_file(data):
"""
This function returns True if the data represents an ASCII file.
Please note that a False value does not necessary means that the data
represents a binary file. It can be a (very *RARE* in real life, but
can easily be forged) ascii file.
"""
# Skip header...
data.seek(BINARY_HEADER)
size = struct.unpack('<I', data.read(4))[0]
# Use seek() method to get size of the file.
data.seek(0, os.SEEK_END)
file_size = data.tell()
# Reset to the start of the file.
data.seek(0)
if size == 0: # Odds to get that result from an ASCII file are null...
print("WARNING! Reported size (facet number) is 0, assuming invalid binary STL file.")
return False # Assume binary in this case.
return (file_size != BINARY_HEADER + 4 + BINARY_STRIDE * size)
def uuid_from_file(fn, block_size=1 << 20):
"""
Returns an arbitrary sized unique ASCII string based on the file contents.
(exact hashing method may change).
"""
with open(fn, 'rb') as f:
# first get the size
import os
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
del os
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
# skip the '0x'
return hex(size)[2:] + sha1.hexdigest()
def _uuid_from_file(fn, block_size=1 << 20):
with open(fn, 'rb') as f:
# first get the size
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
return (hex(size)[2:] + sha1.hexdigest()).encode()