def __init__(self, fs_imp=None):
# we're definitely going to be importing something in the future,
# so let's just load the OS-related facilities.
if not _os_stat:
_os_bootstrap()
# This is the Importer that we use for grabbing stuff from the
# filesystem. It defines one more method (import_from_dir) for our use.
if fs_imp is None:
cls = self.clsFilesystemImporter or _FilesystemImporter
fs_imp = cls()
self.fs_imp = fs_imp
# Initialize the set of suffixes that we recognize and import.
# The default will import dynamic-load modules first, followed by
# .py files (or a .py file's cached bytecode)
for desc in imp.get_suffixes():
if desc[2] == imp.C_EXTENSION:
self.add_suffix(desc[0],
DynLoadSuffixImporter(desc).import_file)
self.add_suffix('.py', py_suffix_importer)
python类load()的实例源码
def _import_one(self, parent, modname, fqname):
"Import a single module."
# has the module already been imported?
try:
return sys.modules[fqname]
except KeyError:
pass
# load the module's code, or fetch the module itself
result = self.get_code(parent, modname, fqname)
if result is None:
return None
module = self._process_result(result, fqname)
# insert the module into its parent
if parent:
setattr(parent, modname, module)
return module
def py_suffix_importer(filename, finfo, fqname):
file = filename[:-3] + _suffix
t_py = long(finfo[8])
t_pyc = _timestamp(file)
code = None
if t_pyc is not None and t_pyc >= t_py:
f = open(file, 'rb')
if f.read(4) == imp.get_magic():
t = struct.unpack('<I', f.read(4))[0]
if t == t_py:
code = marshal.load(f)
f.close()
if code is None:
file = filename
code = _compile(file, t_py)
return 0, code, { '__file__' : file }
def load_stats(self, arg):
if not arg: self.stats = {}
elif isinstance(arg, basestring):
f = open(arg, 'rb')
self.stats = marshal.load(f)
f.close()
try:
file_stats = os.stat(arg)
arg = time.ctime(file_stats.st_mtime) + " " + arg
except: # in case this is not unix
pass
self.files = [ arg ]
elif hasattr(arg, 'create_stats'):
arg.create_stats()
self.stats = arg.stats
arg.stats = {}
if not self.stats:
raise TypeError, "Cannot create or construct a %r object from '%r''" % (
self.__class__, arg)
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def _write_pyc(state, co, source_stat, pyc):
# Technically, we don't have to have the same pyc format as
# (C)Python, since these "pycs" should never be seen by builtin
# import. However, there's little reason deviate, and I hope
# sometime to be able to use imp.load_compiled to load them. (See
# the comment in load_module above.)
try:
fp = open(pyc, "wb")
except IOError:
err = sys.exc_info()[1].errno
state.trace("error writing pyc file at %s: errno=%s" %(pyc, err))
# we ignore any failure to write the cache file
# there are many reasons, permission-denied, __pycache__ being a
# file etc.
return False
try:
fp.write(imp.get_magic())
mtime = int(source_stat.mtime)
size = source_stat.size & 0xFFFFFFFF
fp.write(struct.pack("<ll", mtime, size))
marshal.dump(co, fp)
finally:
fp.close()
return True
def __init__(self, fs_imp=None):
# we're definitely going to be importing something in the future,
# so let's just load the OS-related facilities.
if not _os_stat:
_os_bootstrap()
# This is the Importer that we use for grabbing stuff from the
# filesystem. It defines one more method (import_from_dir) for our use.
if fs_imp is None:
cls = self.clsFilesystemImporter or _FilesystemImporter
fs_imp = cls()
self.fs_imp = fs_imp
# Initialize the set of suffixes that we recognize and import.
# The default will import dynamic-load modules first, followed by
# .py files (or a .py file's cached bytecode)
for desc in imp.get_suffixes():
if desc[2] == imp.C_EXTENSION:
self.add_suffix(desc[0],
DynLoadSuffixImporter(desc).import_file)
self.add_suffix('.py', py_suffix_importer)
def _import_one(self, parent, modname, fqname):
"Import a single module."
# has the module already been imported?
try:
return sys.modules[fqname]
except KeyError:
pass
# load the module's code, or fetch the module itself
result = self.get_code(parent, modname, fqname)
if result is None:
return None
module = self._process_result(result, fqname)
# insert the module into its parent
if parent:
setattr(parent, modname, module)
return module
def get_code(self, parent, modname, fqname):
if parent:
# these modules definitely do not occur within a package context
return None
# look for the module
if imp.is_builtin(modname):
type = imp.C_BUILTIN
elif imp.is_frozen(modname):
type = imp.PY_FROZEN
else:
# not found
return None
# got it. now load and return it.
module = imp.load_module(modname, None, modname, ('', '', type))
return 0, module, { }
######################################################################
#
# Internal importer used for importing from the filesystem
#
def py_suffix_importer(filename, finfo, fqname):
file = filename[:-3] + _suffix
t_py = long(finfo[8])
t_pyc = _timestamp(file)
code = None
if t_pyc is not None and t_pyc >= t_py:
f = open(file, 'rb')
if f.read(4) == imp.get_magic():
t = struct.unpack('<I', f.read(4))[0]
if t == t_py:
code = marshal.load(f)
f.close()
if code is None:
file = filename
code = _compile(file, t_py)
return 0, code, { '__file__' : file }
def load_stats(self, arg):
if not arg: self.stats = {}
elif isinstance(arg, basestring):
f = open(arg, 'rb')
self.stats = marshal.load(f)
f.close()
try:
file_stats = os.stat(arg)
arg = time.ctime(file_stats.st_mtime) + " " + arg
except: # in case this is not unix
pass
self.files = [ arg ]
elif hasattr(arg, 'create_stats'):
arg.create_stats()
self.stats = arg.stats
arg.stats = {}
if not self.stats:
raise TypeError("Cannot create or construct a %r object from %r"
% (self.__class__, arg))
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def process(ifile, ofile):
logger.info('Opening file ' + ifile)
ifPtr = open(ifile, 'rb')
header = ifPtr.read(8)
if not header.startswith('\x03\xF3\x0D\x0A'):
raise SystemExit('[!] Header mismatch. The input file is not a valid pyc file.')
logger.info('Input pyc file header matched')
logger.debug('Unmarshalling file')
rootCodeObject = marshal.load(ifPtr)
ifPtr.close()
deob = parse_code_object(rootCodeObject)
logger.info('Writing deobfuscated code object to disk')
ofPtr = open(ofile, 'wb')
ofPtr.write(header)
marshal.dump(deob, ofPtr)
ofPtr.close()
logger.info('Success')
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def test_imp_module(self):
# Verify that the imp module can correctly load and find .py files
# XXX (ncoghlan): It would be nice to use support.CleanImport
# here, but that breaks because the os module registers some
# handlers in copy_reg on import. Since CleanImport doesn't
# revert that registration, the module is left in a broken
# state after reversion. Reinitialising the module contents
# and just reverting os.environ to its previous state is an OK
# workaround
orig_path = os.path
orig_getenv = os.getenv
with EnvironmentVarGuard():
x = imp.find_module("os")
self.addCleanup(x[0].close)
new_os = imp.load_module("os", *x)
self.assertIs(os, new_os)
self.assertIs(orig_path, new_os.path)
self.assertIsNot(orig_getenv, new_os.getenv)
def test_foreign_code(self):
py_compile.compile(self.file_name)
with open(self.compiled_name, "rb") as f:
header = f.read(8)
code = marshal.load(f)
constants = list(code.co_consts)
foreign_code = test_main.__code__
pos = constants.index(1)
constants[pos] = foreign_code
code = type(code)(code.co_argcount, code.co_kwonlyargcount,
code.co_nlocals, code.co_stacksize,
code.co_flags, code.co_code, tuple(constants),
code.co_names, code.co_varnames, code.co_filename,
code.co_name, code.co_firstlineno, code.co_lnotab,
code.co_freevars, code.co_cellvars)
with open(self.compiled_name, "wb") as f:
f.write(header)
marshal.dump(code, f)
mod = self.import_module()
self.assertEqual(mod.constant.co_filename, foreign_code.co_filename)
def test_recursion_limit(self):
# Create a deeply nested structure.
head = last = []
# The max stack depth should match the value in Python/marshal.c.
if os.name == 'nt' and hasattr(sys, 'gettotalrefcount'):
MAX_MARSHAL_STACK_DEPTH = 1500
else:
MAX_MARSHAL_STACK_DEPTH = 2000
for i in range(MAX_MARSHAL_STACK_DEPTH - 2):
last.append([0])
last = last[-1]
# Verify we don't blow out the stack with dumps/load.
data = marshal.dumps(head)
new_head = marshal.loads(data)
# Don't use == to compare objects, it can exceed the recursion limit.
self.assertEqual(len(new_head), len(head))
self.assertEqual(len(new_head[0]), len(head[0]))
self.assertEqual(len(new_head[-1]), len(head[-1]))
last.append([0])
self.assertRaises(ValueError, marshal.dumps, head)
def test_multiple_dumps_and_loads(self):
# Issue 12291: marshal.load() should be callable multiple times
# with interleaved data written by non-marshal code
# Adapted from a patch by Engelbert Gruber.
data = (1, 'abc', b'def', 1.0, (2, 'a', ['b', b'c']))
for interleaved in (b'', b'0123'):
ilen = len(interleaved)
positions = []
try:
with open(support.TESTFN, 'wb') as f:
for d in data:
marshal.dump(d, f)
if ilen:
f.write(interleaved)
positions.append(f.tell())
with open(support.TESTFN, 'rb') as f:
for i, d in enumerate(data):
self.assertEqual(d, marshal.load(f))
if ilen:
f.read(ilen)
self.assertEqual(positions[i], f.tell())
finally:
support.unlink(support.TESTFN)
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return