python类getsource()的实例源码

mess_control.py 文件源码 项目:ml-utils 作者: LinxiFan 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def mess_control(f, *, debug=0):
    assert inspect.isfunction(f)
    members = dict(inspect.getmembers(f))
    global_dict = members["__globals__"]
    source_filename = inspect.getsourcefile(f)
    f_ast = ast.parse(inspect.getsource(f))
    _, starting_line = inspect.getsourcelines(f)
    # ast_demo.increment_lineno(f_ast, starting_line - 1)
    if debug:
        print("AST:", ast.dump(f_ast))
    visitor = ControlMess()
    new_ast = visitor.visit(f_ast)
    if debug:
        print('NEW AST:', ast.dump(new_ast))
    ast.fix_missing_locations(new_ast)
    co = compile(new_ast, '<ast_demo>', 'exec')
    fake_locals = {}
    # exec will define the new function into fake_locals scope
    # this is to avoid conflict with vars in the real locals()
    # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3
    exec(co, None, fake_locals)
    # new_f = locals()[visitor._new_func_name(f.__name__)]
    return fake_locals[f.__name__]
source.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, *parts, **kwargs):
        self.lines = lines = []
        de = kwargs.get('deindent', True)
        rstrip = kwargs.get('rstrip', True)
        for part in parts:
            if not part:
                partlines = []
            if isinstance(part, Source):
                partlines = part.lines
            elif isinstance(part, (tuple, list)):
                partlines = [x.rstrip("\n") for x in part]
            elif isinstance(part, py.builtin._basestring):
                partlines = part.split('\n')
                if rstrip:
                    while partlines:
                        if partlines[-1].strip():
                            break
                        partlines.pop()
            else:
                partlines = getsource(part, deindent=de).lines
            if de:
                partlines = deindent(partlines)
            lines.extend(partlines)
source.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, *parts, **kwargs):
        self.lines = lines = []
        de = kwargs.get('deindent', True)
        rstrip = kwargs.get('rstrip', True)
        for part in parts:
            if not part:
                partlines = []
            if isinstance(part, Source):
                partlines = part.lines
            elif isinstance(part, (tuple, list)):
                partlines = [x.rstrip("\n") for x in part]
            elif isinstance(part, py.builtin._basestring):
                partlines = part.split('\n')
                if rstrip:
                    while partlines:
                        if partlines[-1].strip():
                            break
                        partlines.pop()
            else:
                partlines = getsource(part, deindent=de).lines
            if de:
                partlines = deindent(partlines)
            lines.extend(partlines)
comment_eater.py 文件源码 项目:icing 作者: slipguru 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_class_traits(klass):
    """ Yield all of the documentation for trait definitions on a class object.
    """
    # FIXME: gracefully handle errors here or in the caller?
    source = inspect.getsource(klass)
    cb = CommentBlocker()
    cb.process_file(StringIO(source))
    mod_ast = compiler.parse(source)
    class_ast = mod_ast.node.nodes[0]
    for node in class_ast.code.nodes:
        # FIXME: handle other kinds of assignments?
        if isinstance(node, compiler.ast.Assign):
            name = node.nodes[0].name
            rhs = unparse(node.expr).strip()
            doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
            yield name, rhs, doc
control.py 文件源码 项目:esdc-ce 作者: erigones 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def local_settings_update(self, changes):
        """Update local_settings.py with new content created according to the changes parameter.
         The changes parameter should be a list generated by check_modules()"""
        if not local_settings:
            raise SystemError('Missing local_settings.py!')

        logger.info('Creating new local_settings.py with following changes: %s', self._show_changes(changes))
        target = inspect.getsourcefile(local_settings)
        data = self._local_settings_new(changes)
        backup = inspect.getsource(local_settings)

        logger.warn('Updating %s', target)
        self._save_file(target, data)

        try:
            reload_module(local_settings)
        except ImportError as e:
            logger.exception(e)
            logger.warn('Restoring %s from backup', target)
            self._save_file(target, backup)
        else:
            # Force reloading of django settings
            settings._wrapped = empty
setup.py 文件源码 项目:contact_map 作者: dwhswenson 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def write_version_py(filename):
    # Adding the git rev number needs to be done inside write_version_py(),
    # otherwise the import of numpy.version messes up the build under Python 3.
    git_version_code = inspect.getsource(get_git_version)
    if os.path.exists('.git'):
        GIT_REVISION = get_git_version()
    else:
        GIT_REVISION = 'Unknown'

    content = VERSION_PY_CONTENT % {
        'version': PACKAGE_VERSION,
        'git_revision': GIT_REVISION,
        'is_release': str(IS_RELEASE),
        'git_version_code': str(git_version_code)
    }

    with open(filename, 'w') as version_file:
        version_file.write(content)


################################################################################
# Installation
################################################################################
queries.py 文件源码 项目:esper 作者: scanner-research 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def query(name):
    frame = inspect.stack()[1]
    module = inspect.getmodule(frame[0])
    filename = module.__file__
    dataset = os.path.abspath(filename).split('/')[-2]

    def wrapper(f):
        lines = inspect.getsource(f).split('\n')
        lines = lines[:-1]  # Seems to include a trailing newline

        # Hacky way to get just the function body
        i = 0
        while True:
            if "():" in lines[i]:
                break
            i = i + 1

        fn = lines[i:]
        fn += ['FN = ' + f.__name__]
        queries[dataset].append([name, '\n'.join(fn)])

        return f

    return wrapper
test_inspect.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_proceed_with_fake_filename(self):
        '''doctest monkeypatches linecache to enable inspection'''
        fn, source = '<test>', 'def x(): pass\n'
        getlines = linecache.getlines
        def monkey(filename, module_globals=None):
            if filename == fn:
                return source.splitlines(keepends=True)
            else:
                return getlines(filename, module_globals)
        linecache.getlines = monkey
        try:
            ns = {}
            exec(compile(source, fn, 'single'), ns)
            inspect.getsource(ns["x"])
        finally:
            linecache.getlines = getlines
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
        """Internal use only.
        """
        if isinstance(gen, str):
            name = gen
        else:
            name = gen.func_name

        if name in self._xfer_funcs:
            code = None
        else:
            # if not inspect.isgeneratorfunction(gen):
            #     logger.warning('"%s" is not a valid generator function', name)
            #     raise StopIteration([])
            code = inspect.getsource(gen).lstrip()

        def _run_req(task=None):
            msg = {'req': 'job', 'auth': self._auth,
                   'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
                reply = yield task.receive()
                if isinstance(reply, Task):
                    if self.status_task:
                        msg = DispycosTaskInfo(reply, args, kwargs, time.time())
                        self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
                if not request.endswith('async'):
                    reply = yield task.receive()
            else:
                reply = None
            raise StopIteration(reply)

        yield Task(_run_req).finish()
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
        """Internal use only.
        """
        if isinstance(gen, str):
            name = gen
        else:
            name = gen.__name__

        if name in self._xfer_funcs:
            code = None
        else:
            # if not inspect.isgeneratorfunction(gen):
            #     logger.warning('"%s" is not a valid generator function', name)
            #     raise StopIteration([])
            code = inspect.getsource(gen).lstrip()

        def _run_req(task=None):
            msg = {'req': 'job', 'auth': self._auth,
                   'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
                reply = yield task.receive()
                if isinstance(reply, Task):
                    if self.status_task:
                        msg = DispycosTaskInfo(reply, args, kwargs, time.time())
                        self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
                if not request.endswith('async'):
                    reply = yield task.receive()
            else:
                reply = None
            raise StopIteration(reply)

        yield Task(_run_req).finish()
recipe-525492.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __new__(cls, name, bases, dct):
    slots = dct.get('__slots__', [])
    orig_slots = []
    for base in bases:
        if hasattr(base, "__slots__"):
        orig_slots += base.__slots__
    if '__init__' in dct:
        init = dct['__init__']
        initproc = type.__new__(cls, name, bases, dct)
        initproc_source = inspect.getsource(initproc)
        ast = compile(initproc_source, "dont_care", 'exec', _ast.PyCF_ONLY_AST)
        classdef = ast.body[0]
        stmts = classdef.body
        for declaration in stmts:
        if isinstance(declaration, _ast.FunctionDef):
            name1 = declaration.name
            if name1 == '__init__': # delete this line if you do not initialize all instance variables in __init__
            initbody = declaration.body
            for statement in initbody:
                if isinstance(statement, _ast.Assign):
                for target in statement.targets:
                    name1 = target.attr
                    if name1 not in orig_slots:
                    slots.append(name1)
        if slots:
        dct['__slots__'] = slots
    return type.__new__(cls, name, bases, dct)
recipe-576751.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def runtime(f):
    """Evaluates the given function each time it is called."""
    # get the function's name
    name = f.__name__
    # and its source code, sans decorator
    source = remove_decorators(getsource(f))
    @wraps(f)
    def wrapped(*args, **kwargs):
        # execute the function's declaration
        exec(source)
        # since the above overwrites its name in the local
        # scope we can call it here using eval
        return eval("%s(*%s, **%s)" % (name, args, kwargs))
    return wrapped
tree.py 文件源码 项目:mycroft-light 作者: MatthewScholefield 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_info(obj):
    try:
        lines = inspect.getsource(obj).split('\n')
        lines = [i for i in lines if '@' not in i]  # filter out decorators
        return lines[0].replace('def ', '').replace('):', ')').replace('  ', '').replace(
            obj.__name__, '').replace('self, ', '').replace('self', '')
    except:
        return None
wrapper.py 文件源码 项目:catalearn 作者: Catalearn 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_source(func):
    raw_source = inspect.getsource(func)
    print(raw_source)
    corrected_source = correct_indentation(raw_source)
    print(corrected_source)
    return corrected_source
source.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getsource(obj, **kwargs):
    obj = py.code.getrawcode(obj)
    try:
        strsrc = inspect.getsource(obj)
    except IndentationError:
        strsrc = "\"Buggy python version consider upgrading, cannot get source\""
    assert isinstance(strsrc, str)
    return Source(strsrc, **kwargs)
source.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getstatementrange_ast(lineno, source, assertion=False, astnode=None):
    if astnode is None:
        content = str(source)
        if sys.version_info < (2,7):
            content += "\n"
        try:
            astnode = compile(content, "source", "exec", 1024)  # 1024 for AST
        except ValueError:
            start, end = getstatementrange_old(lineno, source, assertion)
            return None, start, end
    start, end = get_statement_startend2(lineno, astnode)
    # we need to correct the end:
    # - ast-parsing strips comments
    # - there might be empty lines
    # - we might have lesser indented code blocks at the end
    if end is None:
        end = len(source.lines)

    if end > start + 1:
        # make sure we don't span differently indented code blocks
        # by using the BlockFinder helper used which inspect.getsource() uses itself
        block_finder = inspect.BlockFinder()
        # if we start with an indented line, put blockfinder to "started" mode
        block_finder.started = source.lines[start][0].isspace()
        it = ((x + "\n") for x in source.lines[start:end])
        try:
            for tok in tokenize.generate_tokens(lambda: next(it)):
                block_finder.tokeneater(*tok)
        except (inspect.EndOfBlock, IndentationError):
            end = block_finder.last + start
        except Exception:
            pass

    # the end might still point to a comment or empty line, correct it
    while end:
        line = source.lines[end - 1].lstrip()
        if line.startswith("#") or not line:
            end -= 1
        else:
            break
    return astnode, start, end
source.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def getsource(obj, **kwargs):
    import _pytest._code
    obj = _pytest._code.getrawcode(obj)
    try:
        strsrc = inspect.getsource(obj)
    except IndentationError:
        strsrc = "\"Buggy python version consider upgrading, cannot get source\""
    assert isinstance(strsrc, str)
    return Source(strsrc, **kwargs)
migration.py 文件源码 项目:django-modeltrans 作者: zostera 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_helper_functions(self):
        def to_str(fn):
            return inspect.getsource(fn) if callable(fn) else fn

        for fn in self.helper_functions:
            yield to_str(fn)

        for fn in self.get_extra_helper_functions():
            yield to_str(fn)
dijkstra.py 文件源码 项目:pygorithm 作者: OmkarPathak 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_code():
        """
        returns the code for the current class
        """
        return inspect.getsource(Dijkstra)
astar.py 文件源码 项目:pygorithm 作者: OmkarPathak 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_code():
        """
        returns the code for the current class
        """
        return inspect.getsource(OneDirectionalAStar)


问题


面经


文章

微信
公众号

扫码关注公众号