python类default_timer()的实例源码

callbacks.py 文件源码 项目:vinci 作者: Phylliade 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def on_episode_end(self, episode, logs):
        duration = timeit.default_timer() - self.starts[episode]

        metrics = self.metrics[episode]
        if np.isnan(metrics).all():
            mean_metrics = np.array([np.nan for _ in self.metrics_names])
        else:
            mean_metrics = np.nanmean(metrics, axis=0)
        assert len(mean_metrics) == len(self.metrics_names)

        data = list(zip(self.metrics_names, mean_metrics))
        data += list(logs.items())
        data += [('episode', episode), ('duration', duration)]
        for key, value in data:
            if key not in self.data:
                self.data[key] = []
            self.data[key].append(value)

        if self.interval is not None and episode % self.interval == 0:
            self.save_data()

        # Clean up.
        del self.metrics[episode]
        del self.starts[episode]
util.py 文件源码 项目:sporco 作者: bwohlberg 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def elapsed(self, total=True):
        """Return the elapsed time for the timer.

        Parameters
        ----------
        total : bool, optional (default True)
          If ``True`` return the total elapsed time since the first
          call of :meth:`start` for the selected timer, otherwise
          return the elapsed time since the most recent call of
          :meth:`start` for which there has not been a corresponding
          call to :meth:`stop`.

        Returns
        -------
        dlt : float
          Elapsed time
        """

        return self.timer.elapsed(self.label, total=total)
vanitygen.py 文件源码 项目:py-vanitygen 作者: 1200wd 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def address_search(pipeout, search_for='12o'):
    privkey = random.randrange(2**256)
    address = ''
    count = 0
    start = timeit.default_timer()

    os.write(pipeout, "Searching for %s (pid %s)" % (search_for, os.getpid()))

    while not search_for in address:
        privkey += 1
        pubkey_point = fast_multiply(G, privkey)
        address = pubkey_to_address(pubkey_point)
        count += 1
        if not count % 1000:
            os.write(pipeout, "Searched %d in %d seconds (pid %d)" % (count, timeit.default_timer()-start, os.getpid()))

    os.write(pipeout, "Found address %s" % address)
    os.write(pipeout, "Private key HEX %s" % encode_privkey(privkey,'hex'))
extratest_zeta.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_zetazero():
    cases = [\
    (399999999, 156762524.6750591511),
    (241389216, 97490234.2276711795),
    (526196239, 202950727.691229534),
    (542964976, 209039046.578535272),
    (1048449112, 388858885.231056486),
    (1048449113, 388858885.384337406),
    (1048449114, 388858886.002285122),
    (1048449115, 388858886.00239369),
    (1048449116, 388858886.690745053)
    ]
    for n, v in cases:
        print(n, v)
        t1 = clock()
        ok = zetazero(n).ae(complex(0.5,v))
        t2 = clock()
        print("ok =", ok, ("(time = %s)" % round(t2-t1,3)))
    print("Now computing two huge zeros (this may take hours)")
    print("Computing zetazero(8637740722917)")
    ok = zetazero(8637740722917).ae(complex(0.5,2124447368584.39296466152))
    print("ok =", ok)
    ok = zetazero(8637740722918).ae(complex(0.5,2124447368584.39298170604))
    print("ok =", ok)
__init__.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def doctests(filter=[]):
    try:
        import psyco; psyco.full()
    except ImportError:
        pass
    import sys
    from timeit import default_timer as clock
    for i, arg in enumerate(sys.argv):
        if '__init__.py' in arg:
            filter = [sn for sn in sys.argv[i+1:] if not sn.startswith("-")]
            break
    import doctest
    globs = globals().copy()
    for obj in globs: #sorted(globs.keys()):
        if filter:
            if not sum([pat in obj for pat in filter]):
                continue
        sys.stdout.write(str(obj) + " ")
        sys.stdout.flush()
        t1 = clock()
        doctest.run_docstring_examples(globs[obj], {}, verbose=("-v" in sys.argv))
        t2 = clock()
        print(round(t2-t1, 3))
util.py 文件源码 项目:treecat 作者: posterior 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.elapsed -= default_timer()
util.py 文件源码 项目:treecat 作者: posterior 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __exit__(self, type, value, traceback):
        self.elapsed += default_timer()
        self.count += 1
cubester.py 文件源码 项目:CubeSter 作者: BlendingJake 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def execute(self, context):
        verts, faces = [], []

        start = timeit.default_timer()         
        scene = bpy.context.scene
        error = False

        if scene.cubester_audio_image == "image":
            if create_mesh_from_image(self, scene, verts, faces) == -1:
                error = True

            frames = find_sequence_images(self, context)
            created = len(frames[0])
        else:
            create_mesh_from_audio(self, scene, verts, faces)
            created = int(scene.cubester_audio_file_length)

        stop = timeit.default_timer()    

        if not error:
            if scene.cubester_mesh_style == "blocks" or scene.cubester_audio_image == "audio":
                self.report({"INFO"}, "CubeSter: {} blocks and {} frame(s) in {}s".format(str(int(len(verts) / 8)),
                                                                                          str(created),
                                                                                          str(round(stop - start, 4))))
            else:
                self.report({"INFO"}, "CubeSter: {} points and {} frame(s) in {}s" .format(str(len(verts)),
                                                                                           str(created),
                                                                                           str(round(stop - start, 4))))

        return {"FINISHED"}
call.py 文件源码 项目:run_lambda 作者: ethantkoenig 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def __init__(self, context):
            self._context = context

            self._start_mem = memory_profiler.memory_usage()[0]

            self._log = StringIO()
            self._log.write("START RequestId: {r} Version: {v}\n".format(
                r=context.aws_request_id, v=context.function_version
            ))
            self._start_time = timeit.default_timer()
            self._previous_stdout = sys.stdout

            handler = logging.StreamHandler(stream=self._log)
            logging.getLogger().addHandler(handler)
            sys.stdout = self._log
benchmark.py 文件源码 项目:public-dns 作者: ssut 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
    from publicdns.client import PublicDNS

    domains = []
    filename = os.path.join(os.path.dirname(__file__), 'google_domains.txt')
    with open(filename, 'r') as f:
        domains = f.read().split('\n')
    size = len(domains)

    tqdmargs = {
        'total': 100,
        'unit': 'it',
        'unit_scale': True,
        'leave': True,
    }

    with ThreadPoolExecutor(max_workers=4) as pool:
        print('- dns.resolver')
        started = timeit.default_timer()
        resolver = dns_resolver.Resolver()
        resolver.nameservers = ['8.8.8.8', '8.8.4.4']
        futures = [pool.submit(resolver.query, domains[i % size], 'A')
                   for i in range(100)]
        for _ in tqdm(as_completed(futures), **tqdmargs):
            pass
        elapsed = timeit.default_timer() - started
        print('dns.resolver * 100 - took {}s'.format(elapsed))

    with ThreadPoolExecutor(max_workers=4) as pool:
        print('- PublicDNS')
        started = timeit.default_timer()
        client = PublicDNS()
        futures = [pool.submit(client.query, domains[i % size], 'A')
                   for i in range(100)]
        for _ in tqdm(as_completed(futures), **tqdmargs):
            pass
        elapsed = timeit.default_timer() - started
        print('\nPublicDNS * 100 - took {}s'.format(elapsed))
roles.py 文件源码 项目:metadataproxy 作者: lyft 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.start_time = timeit.default_timer()
        return self
roles.py 文件源码 项目:metadataproxy 作者: lyft 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __exit__(self, *args):
        self.end_time = timeit.default_timer()
        self.exec_duration = self.end_time - self.start_time
aa_tasks.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def export_scitools(ctx, udb_path, output_path):
    if os.path.exists(output_path):
        try:
            os.remove(output_path)
        except OSError as e:
            print("Error: %s - %s." % (e.filename, e.strerror))
    scitools_db = scitools_to_structs(udb_path)
    start = timer()
    with open(output_path, 'w') as output_stream:
        yaml.dump(scitools_db, output_stream)
    end = timer()
    execution_time = end - start
    print('transfer time:', timedelta(seconds=execution_time))
test_metadata.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_nts_token_performance(self):
        """
        Tests to ensure that when rf exceeds the number of nodes available, that we dont'
        needlessly iterate trying to construct tokens for nodes that don't exist.

        @since 3.7
        @jira_ticket PYTHON-379
        @expected_result timing with 1500 rf should be same/similar to 3rf if we have 3 nodes

        @test_category metadata
        """

        token_to_host_owner = {}
        ring = []
        dc1hostnum = 3
        current_token = 0
        vnodes_per_host = 500
        for i in range(dc1hostnum):

            host = Host('dc1.{0}'.format(i), SimpleConvictionPolicy)
            host.set_location_info('dc1', "rack1")
            for vnode_num in range(vnodes_per_host):
                md5_token = MD5Token(current_token+vnode_num)
                token_to_host_owner[md5_token] = host
                ring.append(md5_token)
            current_token += 1000

        nts = NetworkTopologyStrategy({'dc1': 3})
        start_time = timeit.default_timer()
        nts.make_token_replica_map(token_to_host_owner, ring)
        elapsed_base = timeit.default_timer() - start_time

        nts = NetworkTopologyStrategy({'dc1': 1500})
        start_time = timeit.default_timer()
        nts.make_token_replica_map(token_to_host_owner, ring)
        elapsed_bad = timeit.default_timer() - start_time
        difference = elapsed_bad - elapsed_base
        self.assertTrue(difference < 1 and difference > -1)
recipe-577896.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, timer=None, disable_gc=False, verbose=True):
        if timer is None:
            timer = timeit.default_timer
        self.timer = timer
        self.disable_gc = disable_gc
        self.verbose = verbose
        self.start = self.end = self.interval = None
aesop.py 文件源码 项目:aesop 作者: BioMoDeL 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run_parallel(self, n_workers=None):
        """Summary
        Perform a computational alanine scan on the initialized Alascan class
        using multiple processes in parallel.

        Parameters
        ----------
        n_workers : int
            Number of processes to run. If None, method will use all available
            threads.

        Returns
        -------
        None
            Outputs text to STDOUT when run is complete, will be made optional
            in the future.
        """
        start = ti.default_timer()
        self.logs = []
        self.genTruncatedPQR()
        self.calcAPBS_parallel(n_workers)
        self.calcCoulomb_parallel(n_workers)
        self.status = 1
        stop = ti.default_timer()
        print '%s:\tAESOP alanine scan completed in %.2f seconds' % (
            self.jobname, stop - start)
        warn = self.checkwarnings()
        err = self.checkerrors()
        if warn != 0:
            print 'WARNINGS detected, please view log files!'
        if err != 0:
            print 'ERRORS detected, please view log files!'
aesop.py 文件源码 项目:aesop 作者: BioMoDeL 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run_parallel(self, n_workers=None):
        """Summary
        Perform a computational directed mutagenesis scan on the initialized
        class using multiple processes in parallel.

        Parameters
        ----------
        n_workers : int
            Number of processes to run. If None, method will use all
            available threads.

        Returns
        -------
        None
            Outputs text to STDOUT when run is complete, will be made
            optional in the future.
        """
        start = ti.default_timer()
        self.logs = []
        self.genPDB()
        self.genPQR()
        self.calcAPBS_parallel()
        self.calcCoulomb_parallel()
        stop = ti.default_timer()
        print '%s:\tAESOP directed mutagenesis scan completed' \
            ' in %.2f seconds' % (self.jobname, stop - start)
        warn = self.checkwarnings()
        err = self.checkerrors()
        if warn != 0:
            print 'WARNINGS detected, please view log files!'
        if err != 0:
            print 'ERRORS detected, please view log files!'
aesop.py 文件源码 项目:aesop 作者: BioMoDeL 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run(self,
            center=False,
            superpose=False,
            esi=False,
            esd=True,
            selstr=None,
            idx=0,
            minim=False):
        start = ti.default_timer()
        self.logs = []
        if center:
            self.centerPDB()
        if self.minim or minim:
            self.minimPDB()
        if superpose:
            self.superposePDB()
        self.initializeGrid()
        self.genPQR()
        if selstr is not None:
            self.mutatePQR(selstr=selstr)
        if len(self.pdbfiles) == 1 and selstr is None:
            self.mutatePQR()
        self.genDX()
        if esd:
            self.calcESD()
        if esi:
            self.calcESI(idx=idx)
        stop = ti.default_timer()
        print '%s:\tAESOP electrostatic similarity comparison ' \
            'completed in %.2f seconds' % (self.jobname, stop - start)
        warn = self.checkwarnings()
        err = self.checkerrors()
        if warn != 0:
            print 'WARNINGS detected, please view log files!'
        if err != 0:
            print 'ERRORS detected, please view log files!'
speedtest.py 文件源码 项目:SmartSocks 作者: waylybaye 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run(self):
        try:
            if (timeit.default_timer() - self.starttime) <= self.timeout:
                f = urlopen(self.request)
                while (not SHUTDOWN_EVENT.isSet() and
                        (timeit.default_timer() - self.starttime) <=
                        self.timeout):
                    self.result.append(len(f.read(10240)))
                    if self.result[-1] == 0:
                        break
                f.close()
        except IOError:
            pass
speedtest.py 文件源码 项目:SmartSocks 作者: waylybaye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def read(self, n=10240):
        if ((timeit.default_timer() - self.start) <= self.timeout and
                not SHUTDOWN_EVENT.isSet()):
            chunk = self.data.read(n)
            self.total.append(len(chunk))
            return chunk
        else:
            raise SpeedtestUploadTimeout()


问题


面经


文章

微信
公众号

扫码关注公众号