def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# All *open() methods are registered here.
python类BZ2File()的实例源码
def saveIDL(filename, annotations):
[name, ext] = os.path.splitext(filename)
if(ext == ".idl"):
file = open(filename,'w')
if(ext == ".gz"):
file = gzip.GzipFile(filename, 'w')
if(ext == ".bz2"):
file = bz2.BZ2File(filename, 'w')
i=0
for annotation in annotations:
annotation.writeIDL(file)
if (i+1<len(annotations)):
file.write(";\n")
else:
file.write(".\n")
i+=1
file.close()
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9):
"""Open bzip2 compressed cpio archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.cpioopen(name, mode, fileobj)
except IOError:
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
def open_regular_or_compressed(filename):
if filename is None:
return sys.stdin
if hasattr(filename, 'read'):
fobj = filename
else:
f = filename.lower()
ext = f.rsplit('.', 1)[-1]
if ext == 'gz':
import gzip
fobj = gzip.GzipFile(filename)
elif ext == 'bz2':
import bz2
fobj = bz2.BZ2File(filename)
elif ext == 'xz':
import lzma
fobj = lzma.open(filename)
else:
fobj = open(filename)
return fobj
def test_ignore_zeros(self):
# Test TarFile's ignore_zeros option.
if self.mode.endswith(":gz"):
_open = gzip.GzipFile
elif self.mode.endswith(":bz2"):
_open = bz2.BZ2File
else:
_open = open
for char in (b'\0', b'a'):
# Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
# are ignored correctly.
with _open(tmpname, "wb") as fobj:
fobj.write(char * 1024)
fobj.write(tarfile.TarInfo("foo").tobuf())
tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
try:
self.assertListEqual(tar.getnames(), ["foo"],
"ignore_zeros=True should have skipped the %r-blocks" % char)
finally:
tar.close()
def test_detect_stream_bz2(self):
# Originally, tarfile's stream detection looked for the string
# "BZh91" at the start of the file. This is incorrect because
# the '9' represents the blocksize (900kB). If the file was
# compressed using another blocksize autodetection fails.
if not bz2:
return
with open(tarname, "rb") as fobj:
data = fobj.read()
# Compress with blocksize 100kB, the file starts with "BZh11".
with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
fobj.write(data)
self._testfunc_file(tmpname, "r|*")
def _delete_top_row_and_compress(
self,
input_file_name,
output_file_ext,
dest_dir):
# When output_file_ext is not defined, file is not compressed
open_fn = open
if output_file_ext.lower() == '.gz':
open_fn = gzip.GzipFile
elif output_file_ext.lower() == '.bz2':
open_fn = bz2.BZ2File
os_fh_output, fn_output = \
tempfile.mkstemp(suffix=output_file_ext, dir=dest_dir)
with open(input_file_name, 'rb') as f_in,\
open_fn(fn_output, 'wb') as f_out:
f_in.seek(0)
next(f_in)
for line in f_in:
f_out.write(line)
return fn_output
def uncompress_file(input_file_name, file_extension, dest_dir):
"""
Uncompress gz and bz2 files
"""
if file_extension.lower() not in ('.gz', '.bz2'):
raise NotImplementedError("Received {} format. Only gz and bz2 "
"files can currently be uncompressed."
.format(file_extension))
if file_extension.lower() == '.gz':
fmodule = gzip.GzipFile
elif file_extension.lower() == '.bz2':
fmodule = bz2.BZ2File
with fmodule(input_file_name, mode='rb') as f_compressed,\
NamedTemporaryFile(dir=dest_dir,
mode='wb',
delete=False) as f_uncompressed:
shutil.copyfileobj(f_compressed, f_uncompressed)
return f_uncompressed.name
def raw_data_generator(path):
if os.path.isdir(path):
for walk_root, walk_dir, walk_files in os.walk(path):
for file_name in walk_files:
file_path = os.path.join(walk_root, file_name)
if file_path.endswith(FILE_SUFFIX):
print("\nReading from {}".format(file_path))
with BZ2File(file_path, "r") as raw_data:
try:
for line in raw_data: yield line
except IOError:
print("IOError from file {}".format(file_path))
continue
else: print("Skipping file {} (doesn't end with {})".format(file_path, FILE_SUFFIX))
elif os.path.isfile(path):
print("Reading from {}".format(path))
with BZ2File(path, "r") as raw_data:
for line in raw_data: yield line
def raw_data_generator(path):
if os.path.isdir(path):
for walk_root, walk_dir, walk_files in os.walk(path):
for file_name in walk_files:
file_path = os.path.join(walk_root, file_name)
if file_path.endswith(FILE_SUFFIX):
print("\nReading from {}".format(file_path))
with BZ2File(file_path, "r") as raw_data:
try:
for line in raw_data: yield line
except IOError:
print("IOError from file {}".format(file_path))
continue
else: print("Skipping file {} (doesn't end with {})".format(file_path, FILE_SUFFIX))
elif os.path.isfile(path):
print("Reading from {}".format(path))
with BZ2File(path, "r") as raw_data:
for line in raw_data: yield line
def expand_bz2(file_path):
sys.stdout.write("\tExpanding bz2... ")
if not os.path.isfile(file_path[:-4]):
file_size = os.path.getsize(file_path)
estimated_file_size = (float(5)*float(file_size))/1000.0
sys.stdout.write("Estimated "+str(estimated_file_size)+" MB\n")
try:
with open(file_path[:-4], 'wb') as new_file, bz2.BZ2File(file_path, 'rb') as file:
for data in iter(lambda : file.read(100 * 1024), b''):
new_file.write(data)
num_items = int( float(file.tell())/float(file_size)*float(5) )
progress_string = ""
for prog_index in range(25):
if prog_index <= num_items: progress_string+="-"
else: progress_string += " "
sys.stdout.write("\r\t\t["+progress_string+"] "+str(100.0*file.tell()/file_size)[:5]+"% done")
sys.stdout.flush()
sys.stdout.write("\n")
except:
print("\t\tCould not expand file.")
else:
print("\t\tFile already expanded.")
return file_path[:-4]
def file_opener(cls, file_name, mode='rb'):
if file_name.endswith('.gz'):
log.debug("gzip file: %s", file_name)
return gzip.open(file_name, mode)
elif file_name.endswith('.bz2'):
if bz2:
return bz2.BZ2File(file_name, mode)
else:
raise NotImplementedError()
else:
return open(file_name, mode)
# def copy(self, src, dst):
# log.debug("copying: %s to %s", src, dst)
# if src.endswith('.gz'):
# log.debug("gzip file: %s", src)
# with gzip.open(src, 'rb') as src_fh:
# with open(dst, 'wb') as dst_fh:
# return shutil.copyfileobj(src_fh, dst_fh)
# elif src.endswith('.gz'):
# with bz2.BZ2File(src, 'rb') as src_fh:
# with open(dst, 'wb') as dst_fh:
# return shutil.copyfileobj(src_fh, dst_fh)
# return shutil.copy2(src, dst)
def open_(filename, mode='r'):
"""Wrapper over normal python open, that opens compressed
files in format such as bz2, gz, etc.
"""
print(__func__, filename)
if mode=='w':
type_ = filename.split('.')[-1]
else:
type_ = file_type(filename)
if type_ == "bz2":
f = bz2.BZ2File(filename, mode)
elif type_ == "tar":
f = tarfile.open(filename, mode)
elif type_ == "gz":
f = gzip.GzipFile(filename, mode)
else:
f = open(filename, mode);
return f;
def decompress_bz2(bz2_path, new_path):
"""
This function decompresses a .bz2 file
:param bz2_path:
:param new_path:
:return:
"""
# Decompress, create decompressed new file
with open(new_path, 'wb') as new_file, bz2.BZ2File(bz2_path, 'rb') as file:
for data in iter(lambda: file.read(100 * 1024), b''):
new_file.write(data)
# -----------------------------------------------------------------
## This function opens a text file in read-only mode. If a file exists at the specified path, it is simply opened.
# Otherwise, the function looks for a ZIP archive with the same name as the directory in which the file would have
# resided, but with the ".zip" extension added, and it attempts to open a file with the same name from the archive.
# In both cases, the function returns a read-only file-like object that offers sequential access, i.e. it provides
# only the following methods: read(), readline(), readlines(), \_\_iter\_\_(), next().
#
def decompress_bz2(bz2_path, new_path):
"""
This function decompresses a .bz2 file
:param bz2_path:
:param new_path:
:return:
"""
# Decompress, create decompressed new file
with open(new_path, 'wb') as new_file, bz2.BZ2File(bz2_path, 'rb') as file:
for data in iter(lambda: file.read(100 * 1024), b''):
new_file.write(data)
# -----------------------------------------------------------------
## This function opens a text file in read-only mode. If a file exists at the specified path, it is simply opened.
# Otherwise, the function looks for a ZIP archive with the same name as the directory in which the file would have
# resided, but with the ".zip" extension added, and it attempts to open a file with the same name from the archive.
# In both cases, the function returns a read-only file-like object that offers sequential access, i.e. it provides
# only the following methods: read(), readline(), readlines(), \_\_iter\_\_(), next().
#
def seekable(self):
if not hasattr(self.fileobj, "seekable"):
# XXX gzip.GzipFile and bz2.BZ2File
return True
return self.fileobj.seekable()
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
fileobj.close()
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# All *open() methods are registered here.
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if mode not in ("r", "w", "x"):
raise ValueError("mode must be 'r', 'w' or 'x'")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
fileobj = bz2.BZ2File(fileobj or name, mode,
compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (OSError, EOFError):
fileobj.close()
if mode == 'r':
raise ReadError("not a bzip2 file")
raise
except:
fileobj.close()
raise
t._extfileobj = False
return t
def seekable(self):
if not hasattr(self.fileobj, "seekable"):
# XXX gzip.GzipFile and bz2.BZ2File
return True
return self.fileobj.seekable()
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
fileobj.close()
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# All *open() methods are registered here.
def hook_compressed(filename, mode):
ext = os.path.splitext(filename)[1]
if ext == '.gz':
import gzip
return gzip.open(filename, mode)
elif ext == '.bz2':
import bz2
return bz2.BZ2File(filename, mode)
else:
return open(filename, mode)
def seekable(self):
if not hasattr(self.fileobj, "seekable"):
# XXX gzip.GzipFile and bz2.BZ2File
return True
return self.fileobj.seekable()
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
fileobj.close()
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# All *open() methods are registered here.
def seekable(self):
if not hasattr(self.fileobj, "seekable"):
# XXX gzip.GzipFile and bz2.BZ2File
return True
return self.fileobj.seekable()
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
fileobj.close()
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# All *open() methods are registered here.
def seekable(self):
if not hasattr(self.fileobj, "seekable"):
# XXX gzip.GzipFile and bz2.BZ2File
return True
return self.fileobj.seekable()
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
fileobj.close()
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# All *open() methods are registered here.
def seekable(self):
if not hasattr(self.fileobj, "seekable"):
# XXX gzip.GzipFile and bz2.BZ2File
return True
return self.fileobj.seekable()
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
fileobj.close()
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# All *open() methods are registered here.
def seekable(self):
if not hasattr(self.fileobj, "seekable"):
# XXX gzip.GzipFile and bz2.BZ2File
return True
return self.fileobj.seekable()