python类run()的实例源码

test.py 文件源码 项目:emot 作者: NeelShah18 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    cProfile.run('test_emo()',sort = 'time')
    return None
pipetteServer.py 文件源码 项目:firecloud_developer_toolkit 作者: broadinstitute 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, commdir):
        # This code is run when starting a new instance, and not when the instance is unpickled.
        self._file_ext = '.launch.txt'
        self._job_by_jobId = {}
        self._commdir = commdir
        launch_subdir = 'launch'
        archive_subdir = 'archive'
        self._launch_dir = os.path.join(self._commdir, launch_subdir)
        self._archive_dir = os.path.join(self._commdir, archive_subdir)
        try:
            os.makedirs(self._launch_dir,exist_ok=True)
        except:
            raise Exception('Failed to create launchdir; communicationDirBase may be read-only')

        os.makedirs(self._archive_dir,exist_ok=True)

        existing_launchfiles = os.listdir(self._launch_dir)
        if existing_launchfiles:
            print ('Starting with ' + str(len(existing_launchfiles)) + ' launched but unprocessed pipelines.')
        else:
            print ('Starting with 0 pipelines')
        existing_archivefiles = os.listdir(self._archive_dir)
        if len(existing_archivefiles) > 0:
            raise Exception ('starting with ' + str(len(existing_archivefiles)) + ' in-process jobs... either these should be purged from:' + 
                             self._archive_dir + ' or things shoud be restarted from the persisted state')
profiler.py 文件源码 项目:panflute 作者: sergiocorreia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run():
    print('\nLoading JSON...')
    input_fn = 'benchmark.json'
    output_fn = 'panflute.json'

    with open(input_fn, encoding='utf-8') as f:
        doc = pf.load(f)

    print('\nApplying trivial filter...')
    doc = doc.walk(action=empty_test, doc=doc)

    print('Dumping JSON...')
    with open(output_fn, mode='w', encoding='utf-8') as f:
        pf.dump(doc, f)
        f.write('\n')

    print(' - Done!')
execution.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __str__(self):
        pm = '+-'
        if hasattr(sys.stdout, 'encoding') and sys.stdout.encoding:
            try:
                u'\xb1'.encode(sys.stdout.encoding)
                pm = u'\xb1'
            except:
                pass
        return (
            u"{mean} {pm} {std} per loop (mean {pm} std. dev. of {runs} run{run_plural}, {loops} loop{loop_plural} each)"
                .format(
                    pm = pm,
                    runs = self.repeat,
                    loops = self.loops,
                    loop_plural = "" if self.loops == 1 else "s",
                    run_plural = "" if self.repeat == 1 else "s",
                    mean = _format_time(self.average, self._precision),
                    std = _format_time(self.stdev, self._precision))
                )
common.py 文件源码 项目:utils 作者: jianbing 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def profile_func(call_func_str):
    """?????????

        def f():
            d = AndroidDevice("192.168.1.120")
            d.swipe_position(650, 700, 50, 700, 30)
            d.swipe_position(130, 800, 850, 800, 50)

        profile_func("f()")

    :param call_func_str:
    :return:
    """
    import cProfile
    cProfile.run(call_func_str, "prof.txt")
    import pstats
    p = pstats.Stats("prof.txt")
    p.sort_stats("time").print_stats()
jt65.py 文件源码 项目:weakmon 作者: rtmrtmrtmrtm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
      self.done = False
      self.msgs_lock = threading.Lock()
      self.msgs = [ ]
      self.verbose = False
      self.enabled = True # True -> run process(); False -> don't

      self.jrate = int(11025/2) # sample rate for processing (FFT &c)
      self.jblock = int(4096/2) # samples per symbol

      # set self.start_time to the UNIX time of the start
      # of a UTC minute.
      now = int(time.time())
      gm = time.gmtime(now)
      self.start_time = now - gm.tm_sec

  # seconds per cycle
test_profile_py_lets_be_rational.py 文件源码 项目:py_lets_be_rational 作者: vollib 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run_norm_cdf(lets_be_rational_version):
    start = time.clock()
    for i in range(TestCases):
        z = _z[i]
        actual = lets_be_rational_version.norm_cdf(z)
#        print(z, " norm_cdf = ", actual)
    end = time.clock()
    return end - start

# profile.run("run_black(py_lets_be_rational)")
# profile.run("run_implied_volatility_from_a_transformed_rational_guess(py_lets_be_rational)")
# profile.run("run_implied_volatility_from_a_transformed_rational_guess_with_limited_iterations(py_lets_be_rational)")
# profile.run("run_normalised_black(py_lets_be_rational)")
# profile.run("run_normalised_black_call(py_lets_be_rational)")
# profile.run("run_normalised_vega(py_lets_be_rational)")
# profile.run("run_normalised_implied_volatility_from_a_transformed_rational_guess(py_lets_be_rational)")
# profile.run("run_normalised_implied_volatility_from_a_transformed_rational_guess_with_limited_iterations(py_lets_be_rational)")
# profile.run("run_norm_cdf(py_lets_be_rational)")
benchmark_performance.py 文件源码 项目:py-keycolval 作者: travisfischer 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _run_get_on_all_columns(self):
        column_lookup = self._get_key_column_lookup()

        start_time = datetime.now()

        for key in column_lookup.keys():
            for col in column_lookup[key]:
                self._store.get(key, col)

        end_time = datetime.now()

        total_time = end_time - start_time

        print "Took %s to run get on all %s keys and %s columns." % (
                                                total_time,
                                                len(column_lookup.keys()),
                                                len(column_lookup[column_lookup.keys()[0]]))
MyBot.py 文件源码 项目:halite_bot 作者: ewirkerman 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main_loop():
    global last_map
    profiling = True
    profiling = False
    #import objgraph

    if not profiling:
        while True:
            main()
    else:
        if os.path.exists("stats"):
            if os.path.exists("stats\*"):
                os.remove("stats\*")
        while True:
            currpath = 'stats\mybot-%s.stats' % (turnCounter+1)
            cProfile.run('main()', currpath)


# with open("bot." + "debug", "a") as f:
    # logger.debug("Baselining at %s" % getframeinfo(currentframe()).lineno)
    # objgraph.show_growth(limit=20,file=f)
MyBot.py 文件源码 项目:halite_bot 作者: ewirkerman 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main_loop():
    global last_map
    profiling = True
    profiling = False
    #import objgraph

    if not profiling:
        while True:
            main()
    else:
        if os.path.exists("stats"):
            if os.path.exists("stats\*"):
                os.remove("stats\*")
        while True:
            currpath = 'stats\mybot-%s.stats' % (turnCounter+1)
            cProfile.run('main()', currpath)


# with open("bot." + "debug", "a") as f:
    # logger.debug("Baselining at %s" % getframeinfo(currentframe()).lineno)
    # objgraph.show_growth(limit=20,file=f)
MyBot.py 文件源码 项目:halite_bot 作者: ewirkerman 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main_loop():
    profiling = True
    profiling = False
    #import objgraph

    if not profiling:
        while True:
            main()
    else:
        if os.path.exists("stats"):
            if os.path.exists("stats\*"):
                os.remove("stats\*")
        while True:
            currpath = 'stats\mybot-%s.stats' % (turnCounter+1)
            cProfile.run('main()', currpath)


# with open("bot." + "debug", "a") as f:
    # logger.error("Baselining at %s" % getframeinfo(currentframe()).lineno)
    # objgraph.show_growth(limit=20,file=f)
MyBot.py 文件源码 项目:halite_bot 作者: ewirkerman 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main_loop():
    global last_map
    profiling = True
    profiling = False
    #import objgraph

    if not profiling:
        while True:
            main()
    else:
        if os.path.exists("stats"):
            if os.path.exists("stats\*"):
                os.remove("stats\*")
        while True:
            currpath = 'stats\mybot-%s.stats' % (turnCounter+1)
            cProfile.run('main()', currpath)


# with open("bot." + "debug", "a") as f:
#   # logger.debug("Baselining at %s" % getframeinfo(currentframe()).lineno)
    # objgraph.show_growth(limit=20,file=f)
MyBot.py 文件源码 项目:halite_bot 作者: ewirkerman 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main_loop():
    while True:
        profiling = True
        # profiling = False
        if not profiling:
            main()
        else:
            currpath = 'stats\mybot-currturn.stats'
            lastpath = 'stats\mybot-lastturn.stats'

            if os.path.exists(currpath):
                if os.path.exists(lastpath):
                    os.remove(lastpath)
                os.rename(currpath, lastpath)
            cProfile.run('main()', currpath)
            # if "pypy" not in sys.executable:
            # else:
                # with open(currpath, "w") as f:
                    # vmprof.enable(f.fileno(), period=0.00099, memory=False)
                    # main()
                    # vmprof.disable()

#cProfile.run('main_loop()', 'stats\mybot.stats')
gametoy.py 文件源码 项目:GameToy 作者: rukai 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main():
    if len(sys.argv) > 1:
        path = os.path.abspath(sys.argv[1])

        if len(sys.argv) > 2:
            debug = sys.argv[2]

            if len(sys.argv) > 3:
                max_cycles = int(sys.argv[3])
                run(path, debug, max_cycles)
            else:
                run(path, debug, -1)
        else:
            run(path, "NONE", -1)
    else:
        print(help)
execution.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __str__(self):
        pm = '+-'
        if hasattr(sys.stdout, 'encoding') and sys.stdout.encoding:
            try:
                u'\xb1'.encode(sys.stdout.encoding)
                pm = u'\xb1'
            except:
                pass
        return (
            u"{mean} {pm} {std} per loop (mean {pm} std. dev. of {runs} run{run_plural}, {loops} loop{loop_plural} each)"
                .format(
                    pm = pm,
                    runs = self.repeat,
                    loops = self.loops,
                    loop_plural = "" if self.loops == 1 else "s",
                    run_plural = "" if self.repeat == 1 else "s",
                    mean = _format_time(self.average, self._precision),
                    std = _format_time(self.stdev, self._precision))
                )
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def profile(code, name='profile_run', sort='cumulative', num=30):
    """Common-use for cProfile"""
    cProfile.run(code, name)
    stats = pstats.Stats(name)
    stats.sort_stats(sort)
    stats.print_stats(num)
    return stats



#### Code for listing (nearly) all objects in the known universe
#### http://utcc.utoronto.ca/~cks/space/blog/python/GetAllObjects
# Recursively expand slist's objects
# into olist, using seen to track
# already processed objects.
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def findNew(self, regex):
        """Return all objects matching regex that were considered 'new' when the last diff() was run."""
        return self.findTypes(self.newRefs, regex)
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def findPersistent(self, regex):
        """Return all objects matching regex that were considered 'persistent' when the last diff() was run."""
        return self.findTypes(self.persistentRefs, regex)
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def start(self, interval=None):
        if interval is not None:
            self.interval = interval
        self._stop = False
        self.thread = threading.Thread(target=self.run)
        self.thread.daemon = True
        self.thread.start()
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self):
        while True:
            with self.lock:
                if self._stop is True:
                    return

            print("\n=============  THREAD FRAMES:  ================")
            for id, frame in sys._current_frames().items():
                if id == threading.current_thread().ident:
                    continue
                print("<< thread %d >>" % id)
                traceback.print_stack(frame)
            print("===============================================\n")

            time.sleep(self.interval)
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def findNew(self, regex):
        """Return all objects matching regex that were considered 'new' when the last diff() was run."""
        return self.findTypes(self.newRefs, regex)
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def findPersistent(self, regex):
        """Return all objects matching regex that were considered 'persistent' when the last diff() was run."""
        return self.findTypes(self.persistentRefs, regex)
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def start(self, interval=None):
        if interval is not None:
            self.interval = interval
        self._stop = False
        self.thread = threading.Thread(target=self.run)
        self.thread.daemon = True
        self.thread.start()
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        while True:
            with self.lock:
                if self._stop is True:
                    return

            print("\n=============  THREAD FRAMES:  ================")
            for id, frame in sys._current_frames().items():
                if id == threading.current_thread().ident:
                    continue
                print("<< thread %d >>" % id)
                traceback.print_stack(frame)
            print("===============================================\n")

            time.sleep(self.interval)
transform_hsa.py 文件源码 项目:pynufft 作者: jyhmiinlin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_cAddScalar():

    dtype = numpy.complex64

    try:
        device=pyopencl.get_platforms()[1].get_devices()

    except:
        device=pyopencl.get_platforms()[0].get_devices()
    print('using cl device=',device,device[0].max_work_group_size, device[0].max_compute_units,pyopencl.characterize.get_simd_group_size(device[0], dtype.size))

    ctx = pyopencl.Context(device) #pyopencl.create_some_context()
    queue = pyopencl.CommandQueue(ctx)
    wavefront = pyopencl.characterize.get_simd_group_size(device[0], dtype.size)

#     B = routine(wavefront)
    import cl_subroutine.cAddScalar
    prg = pyopencl.Program(ctx, cl_subroutine.cAddScalar.R).build()

    AddScalar = prg.cAddScalar
    AddScalar.set_scalar_arg_dtypes(cl_subroutine.cAddScalar.scalar_arg_dtypes)
#     indata= numpy.arange(0,128).astype(dtype)
    indata = (numpy.random.randn(128,)+numpy.random.randn(128,)*1.0j).astype(dtype)      
    indata_g = pyopencl.array.to_device(queue, indata)
    scal= 0.1+0.1j
    AddScalar(queue, (128,),None,scal, indata_g.data)
    print(-indata[0]+indata_g.get()[0])

# if __name__ == '__main__':
#     import cProfile
# #     cProfile.run('benchmark()')
#     test_init()
#     test_cAddScalar()
#     cProfile.run('test_init()')
main.py 文件源码 项目:PYKE 作者: muddyfish 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(code):
    if args.hex:
        code = codecs.decode(code.replace(b" ", b""), 'hex_codec')
    sys.stderr.write("RUNNING: {} ({} bytes)\n".format(repr(code), len(code)))

    ast = lang_ast.AST()
    ast.setup(bytearray(code), first=True)
    stack = ast.run()
    return stack
main.py 文件源码 项目:PYKE 作者: muddyfish 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run_file(filename):
    with open(filename, "rb") as f_obj:
        return run(f_obj.read())
pipetteServer.py 文件源码 项目:firecloud_developer_toolkit 作者: broadinstitute 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __setstate__(self, state):
        # This code is run when an instance is created by unpickling, and not when it is initially created.
        self.__dict__.update(state)
        # roll back jobs that were sent into some state? make these transitions tolerant of being called twice in a row?
        # how do jobs get purged that finished?

        for jobId in self._status_by_jobId:
            directory_status = self._get_module_directory_status(jobId)
            if directory_status == self._status_by_jobId[jobId]:
                continue
            if directory_status in ['AboutToFail','AboutToAbort']:
                # just try again
                self._set_module_directory_status(jobId, jobId)
            elif directory_status in ['Pass','Fail', jobId]:
                pass
            elif directory_status in ['AboutToPass']:
                # move all the stuff back into the jobdir and try again
                jobdir = self._jobDirectory_by_jobId[jobId]
                moddir = self._moduleDirectory_by_jobId[jobId]
                if not os.path.exists(jobdir):
                    os.mkdir(jobdir)
                module_files = os.listdir(jobdir)
                for fn in module_files:
                    if fn.startswith('pipette.'):
                        continue
                    old_path = os.path.join(moddir, fn)
                    new_path = os.path.join(jobdir, fn)
                    os.rename(old_path, new_path)
                # _after_ the files are moved, update the status
                self._set_module_directory_status(jobId, jobId)
            else:
                raise Exception('Module directory left in an unrepairable state: ' + \
                                self._moduleDirectory_by_jobId[jobId])
cprofiles_do.py 文件源码 项目:Python_Study 作者: thsheep 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    cProfile.run('very_slow()', 'prof.txt')
    import pstats
    p = pstats.Stats('prof.txt')
    p.sort_stats('time').print_stats()
execution.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, shell):
        super(ExecutionMagics, self).__init__(shell)
        if profile is None:
            self.prun = self.profile_missing_notice
        # Default execution function used to actually run user code.
        self.default_runner = None


问题


面经


文章

微信
公众号

扫码关注公众号