def parse_user_config(config, source_code=None):
try:
if source_code is None:
with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f:
source_code = f.read()
scope = {}
code = compile(source_code, config["base"]["strategy_file"], 'exec')
six.exec_(code, scope)
__config__ = scope.get("__config__", {})
deep_update(__config__, config)
for sub_key, sub_dict in six.iteritems(__config__):
if sub_key not in config["whitelist"]:
continue
deep_update(sub_dict, config[sub_key])
except Exception as e:
system_log.error(_('in parse_user_config, exception: {e}').format(e=e))
finally:
return config
python类exec_()的实例源码
def parse_user_config_from_code(config, source_code=None):
try:
if source_code is None:
with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f:
source_code = f.read()
scope = {}
code = compile(source_code, config["base"]["strategy_file"], 'exec')
six.exec_(code, scope)
__config__ = scope.get("__config__", {})
for sub_key, sub_dict in six.iteritems(__config__):
if sub_key not in config["whitelist"]:
continue
deep_update(sub_dict, config[sub_key])
except Exception as e:
system_log.error(_(u"in parse_user_config, exception: {e}").format(e=e))
finally:
return config
def get_wrapped(func, wrapper_template, evaldict):
# Preserve the argspec for the wrapped function so that testing
# tools such as pytest can continue to use their fixture injection.
args, a, kw, defaults = inspect.getargspec(func)
signature = inspect.formatargspec(args, a, kw, defaults)
is_bound_method = hasattr(func, '__self__')
if is_bound_method:
args = args[1:] # Omit 'self'
callargs = inspect.formatargspec(args, a, kw, None)
ctx = {'signature': signature, 'funcargs': callargs}
six.exec_(wrapper_template % ctx, evaldict)
wrapper = evaldict['wrapper']
update_wrapper(wrapper, func)
if is_bound_method:
wrapper = wrapper.__get__(func.__self__, type(func.__self__))
return wrapper
def test_exec_():
def f():
l = []
six.exec_("l.append(1)")
assert l == [1]
f()
ns = {}
six.exec_("x = 42", ns)
assert ns["x"] == 42
glob = {}
loc = {}
six.exec_("global y; y = 42; x = 12", glob, loc)
assert glob["y"] == 42
assert "x" not in glob
assert loc["x"] == 12
assert "y" not in loc
def test_exec_():
def f():
l = []
six.exec_("l.append(1)")
assert l == [1]
f()
ns = {}
six.exec_("x = 42", ns)
assert ns["x"] == 42
glob = {}
loc = {}
six.exec_("global y; y = 42; x = 12", glob, loc)
assert glob["y"] == 42
assert "x" not in glob
assert loc["x"] == 12
assert "y" not in loc
def test_big_linenos(self):
def func(count):
namespace = {}
func = "def foo():\n " + "".join(["\n "] * count + ["spam\n"])
six.exec_(func, namespace)
return namespace['foo']
# Test all small ranges
big_lineno_format = _BIG_LINENO_FORMAT_36 if PY36 else _BIG_LINENO_FORMAT
for i in range(1, 300):
expected = big_lineno_format % (i + 2)
self.do_disassembly(func(i), expected)
# Test some larger ranges too
for i in range(300, 5000, 10):
expected = big_lineno_format % (i + 2)
self.do_disassembly(func(i), expected)
def code_config(config, source_code=None):
try:
if source_code is None:
with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f:
source_code = f.read()
# FIXME: hardcode for parametric mod
def noop(*args, **kwargs):
pass
scope = {'define_parameter': noop}
code = compile(source_code, config["base"]["strategy_file"], 'exec')
six.exec_(code, scope)
return scope.get('__config__', {})
except Exception as e:
system_log.error(_(u"in parse_user_config, exception: {e}").format(e=e))
return {}
def test_exec_():
def f():
l = []
six.exec_("l.append(1)")
assert l == [1]
f()
ns = {}
six.exec_("x = 42", ns)
assert ns["x"] == 42
glob = {}
loc = {}
six.exec_("global y; y = 42; x = 12", glob, loc)
assert glob["y"] == 42
assert "x" not in glob
assert loc["x"] == 12
assert "y" not in loc
def load_module(self, full_name):
orig = full_name.split('.')[-1]
if orig not in sklearn.__all__:
raise ImportError('%s not in sklearn' % orig)
mod = sys.modules.setdefault(full_name, imp.new_module(full_name))
mod.__file__ = ''
mod.__name__ = full_name
mod.__path__ = ''
mod.__loader__ = self
mod.__package__ = '.'.join(full_name.split('.')[:-1])
code = _code.substitute({'mod_name': orig})
six.exec_(code, mod.__dict__)
return mod
def _set_signature(mock, original, instance=False):
# creates a function with signature (*args, **kwargs) that delegates to a
# mock. It still does signature checking by calling a lambda with the same
# signature as the original.
if not _callable(original):
return
skipfirst = isinstance(original, ClassTypes)
result = _get_signature_object(original, instance, skipfirst)
if result is None:
return
func, sig = result
def checksig(*args, **kwargs):
sig.bind(*args, **kwargs)
_copy_func_details(func, checksig)
name = original.__name__
if not _isidentifier(name):
name = 'funcopy'
context = {'_checksig_': checksig, 'mock': mock}
src = """def %s(*args, **kwargs):
_checksig_(*args, **kwargs)
return mock(*args, **kwargs)""" % name
six.exec_(src, context)
funcopy = context[name]
_setup_func(funcopy, mock)
return funcopy
def _set_signature(mock, original, instance=False):
# creates a function with signature (*args, **kwargs) that delegates to a
# mock. It still does signature checking by calling a lambda with the same
# signature as the original.
if not _callable(original):
return
skipfirst = isinstance(original, ClassTypes)
result = _get_signature_object(original, instance, skipfirst)
if result is None:
return
func, sig = result
def checksig(*args, **kwargs):
sig.bind(*args, **kwargs)
_copy_func_details(func, checksig)
name = original.__name__
if not _isidentifier(name):
name = 'funcopy'
context = {'_checksig_': checksig, 'mock': mock}
src = """def %s(*args, **kwargs):
_checksig_(*args, **kwargs)
return mock(*args, **kwargs)""" % name
six.exec_(src, context)
funcopy = context[name]
_setup_func(funcopy, mock)
return funcopy
def visitCode(self, code):
if code.buffer:
val = code.val.lstrip()
val = self.var_processor(val)
val = self._do_eval(val)
if code.escape:
val = str(val).replace('&', '&').replace('<', '<').replace('>', '>')
self.buf.append(val)
if code.block:
self.visit(code.block)
if not code.buffer and not code.block:
six.exec_(code.val.lstrip(), self.global_context, self.local_context)
def load_module(self, fullname):
if self.mod is None:
self.mod = mod = imp.new_module(fullname)
else:
mod = self.mod
mod.__file__ = '<%s>' % self.name
mod.__loader__ = self
mod.__project__ = self.project
mod.__package__ = ''
code = self.get_code(fullname)
six.exec_(code, mod.__dict__)
linecache.clearcache()
if sys.version_info[:2] == (3, 3):
sys.modules[fullname] = mod
return mod
def remove_coding(text):
"""
Remove the coding comment, which six.exec_ doesn't like.
"""
sub_re = re.compile("^#\s*-\*-\s*coding:\s*.*-\*-$", flags=re.MULTILINE)
return sub_re.sub("", text)
# ------------------------------------------------------------------------------
# Template
# ------------------------------------------------------------------------------
def byte_EXEC_STMT(self):
stmt, globs, locs = self.popn(3)
six.exec_(stmt, globs, locs)
def load_module(self, fullname):
if self.mod is None:
self.mod = mod = imp.new_module(fullname)
else:
mod = self.mod
mod.__file__ = '<%s>' % self.name
mod.__loader__ = self
mod.__project__ = self.project
mod.__package__ = ''
code = self.get_code(fullname)
six.exec_(code, mod.__dict__)
linecache.clearcache()
return mod
def execwrap(content, from_file=None):
from_file = from_file or '<stdin>'
if isinstance(content, six.string_types):
content = compile_text(content, from_file=from_file)
def _inner():
global_env = exec_globals()
local_env = global_env
six.exec_(content, global_env, local_env)
return global_env
globals_ = {}
output_handler = None
try:
with reopen_stdout_stderr() as output_handler:
globals_ = _inner()
except Exception:
if output_handler is not None:
output = "%s%s" % (output_handler.read(),
_exception(from_file))
else:
output = _exception(from_file)
else:
output = output_handler.read()
output = strutil.ensure_text(output)
return output, globals_
def render_expression(expression, bind, stream=None):
"""Generate a SQL expression from the passed python expression.
Only the global variable, `engine`, is available for use in the
expression. Additional local variables may be passed in the context
parameter.
Note this function is meant for convenience and protected usage. Do NOT
blindly pass user input to this function as it uses exec.
:param bind: A SQLAlchemy engine or bind URL.
:param stream: Render all DDL operations to the stream.
"""
# Create a stream if not present.
if stream is None:
stream = six.moves.cStringIO()
engine = create_mock_engine(bind, stream)
# Navigate the stack and find the calling frame that allows the
# expression to execuate.
for frame in inspect.stack()[1:]:
try:
frame = frame[0]
local = dict(frame.f_locals)
local['engine'] = engine
six.exec_(expression, frame.f_globals, local)
break
except:
pass
else:
raise ValueError('Not a valid python expression', engine)
return stream
def compile_strategy(source_code, strategy, scope):
try:
code = compile(source_code, strategy, 'exec')
six.exec_(code, scope)
return scope
except Exception as e:
exc_type, exc_val, exc_tb = sys.exc_info()
exc_val = patch_user_exc(exc_val, force=True)
try:
msg = str(exc_val)
except Exception as e1:
msg = ""
six.print_(e1)
error = CustomError()
error.set_msg(msg)
error.set_exc(exc_type, exc_val, exc_tb)
stackinfos = list(traceback.extract_tb(exc_tb))
if isinstance(e, (SyntaxError, IndentationError)):
error.add_stack_info(exc_val.filename, exc_val.lineno, "", exc_val.text)
else:
for item in stackinfos:
filename, lineno, func_name, code = item
if strategy == filename:
error.add_stack_info(*item)
# avoid empty stack
if error.stacks_length == 0:
error.add_stack_info(*item)
raise CustomException(error)
def remove_coding(text):
"""
Remove the coding comment, which six.exec_ doesn't like.
"""
sub_re = re.compile("^#\s*-\*-\s*coding:\s*.*-\*-$", flags=re.MULTILINE)
return sub_re.sub("", text)
#------------------------------------------------------------------------------
# Template
#------------------------------------------------------------------------------
def mock_engine(engine, stream=None):
"""Mocks out the engine specified in the passed bind expression.
Note this function is meant for convenience and protected usage. Do NOT
blindly pass user input to this function as it uses exec.
:param engine: A python expression that represents the engine to mock.
:param stream: Render all DDL operations to the stream.
"""
# Create a stream if not present.
if stream is None:
stream = six.moves.cStringIO()
# Navigate the stack and find the calling frame that allows the
# expression to execuate.
for frame in inspect.stack()[1:]:
try:
frame = frame[0]
expression = '__target = %s' % engine
six.exec_(expression, frame.f_globals, frame.f_locals)
target = frame.f_locals['__target']
break
except:
pass
else:
raise ValueError('Not a valid python expression', engine)
# Evaluate the expression and get the target engine.
frame.f_locals['__mock'] = create_mock_engine(target, stream)
# Replace the target with our mock.
six.exec_('%s = __mock' % engine, frame.f_globals, frame.f_locals)
# Give control back.
yield stream
# Put the target engine back.
frame.f_locals['__target'] = target
six.exec_('%s = __target' % engine, frame.f_globals, frame.f_locals)
six.exec_('del __target', frame.f_globals, frame.f_locals)
six.exec_('del __mock', frame.f_globals, frame.f_locals)
def main(argv=None):
args = parser.parse_args(argv)
args.nodepy_path.insert(0, '.')
if args.version:
print(VERSION)
return 0
args.pmd = check_pmd_envvar() or args.pmd
sys.argv = [sys.argv[0]] + args.request[1:]
maindir = pathlib.Path(args.maindir) if args.maindir else pathlib.Path.cwd()
ctx = nodepy.context.Context(maindir)
# Updat the module search path.
args.nodepy_path.insert(0, ctx.modules_directory)
ctx.resolver.paths.extend(x for x in map(pathlib.Path, args.nodepy_path) if x.is_dir())
ctx.localimport.path.extend(args.python_path)
# Create the module in which we run the REPL or the command
# specified via -c.
if args.c or not args.request:
filename = nodepy.utils.path.VoidPath('<repl>')
directory = pathlib.Path.cwd()
repl_module = ReplModule(ctx, None, filename, directory)
repl_module.init()
with ctx.enter():
if args.pmd:
install_pmd(ctx)
if args.c:
repl_module.set_exec_handler(lambda: six.exec_(args.c, vars(repl_module.namespace)))
repl_module.load()
if args.request:
try:
filename = path.urlpath.make(args.request[0])
except ValueError:
filename = args.request[0]
ctx.main_module = ctx.resolve(filename)
if not args.keep_arg0:
sys.argv[0] = str(ctx.main_module.filename)
ctx.main_module.init()
if args.pymain:
ctx.main_module.namespace.__name__ = '__main__'
ctx.load_module(ctx.main_module, do_init=False)
elif not args.c:
ctx.main_module = repl_module
repl_module.set_exec_handler(lambda: code.interact('', local=vars(repl_module.namespace)))
repl_module.load()