python类time()的实例源码

store.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def reschedule(self, now=None):
        now = now or time()
        items, _, size = (self.client.pipeline()
                          .zrangebyscore(SCHEDULE_KEY, '-inf', now)
                          .zremrangebyscore(SCHEDULE_KEY, '-inf', now)
                          .zcard(SCHEDULE_KEY)
                          .execute())

        for chunk in iter_chunks(items, 5000):
            pipe = self.client.pipeline(False)
            for r in chunk:
                queue, _, task = r.partition(b':')
                pipe.rpush(rqname(queue), task)
            pipe.execute()

        return size
gen.py 文件源码 项目:tree-gen 作者: friggog 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make(self):
        """make the tree"""
        start_time = time()
        if __logging__:
            print('** Generating Tree **')
        # create parent object
        self.tree_obj = bpy.data.objects.new('Tree', None)
        bpy.context.scene.objects.link(self.tree_obj)
        bpy.context.scene.objects.active = self.tree_obj
        # create branches
        self.create_branches()
        # create leaf mesh if needed
        self.create_leaf_mesh()
        g_time = time() - start_time
        if __logging__:
            print('Tree generated in %f seconds' % g_time)
ui.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        # The Windows terminal does not support the hide/show cursor ANSI codes
        # even with colorama. So we'll ensure that hide_cursor is False on
        # Windows.
        # This call neds to go before the super() call, so that hide_cursor
        # is set in time. The base progress bar class writes the "hide cursor"
        # code to the terminal in its init, so if we don't set this soon
        # enough, we get a "hide" with no corresponding "show"...
        if WINDOWS and self.hide_cursor:
            self.hide_cursor = False

        super(WindowsMixin, self).__init__(*args, **kwargs)

        # Check if we are running on Windows and we have the colorama module,
        # if we do then wrap our file with it.
        if WINDOWS and colorama:
            self.file = colorama.AnsiToWin32(self.file)
            # The progress code expects to be able to call self.file.isatty()
            # but the colorama.AnsiToWin32() object doesn't have that, so we'll
            # add it.
            self.file.isatty = lambda: self.file.wrapped.isatty()
            # The progress code expects to be able to call self.file.flush()
            # but the colorama.AnsiToWin32() object doesn't have that, so we'll
            # add it.
            self.file.flush = lambda: self.file.wrapped.flush()
random.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _test_generator(n, func, args):
    import time
    print(n, 'times', func.__name__)
    total = 0.0
    sqsum = 0.0
    smallest = 1e10
    largest = -1e10
    t0 = time.time()
    for i in range(n):
        x = func(*args)
        total += x
        sqsum = sqsum + x*x
        smallest = min(x, smallest)
        largest = max(x, largest)
    t1 = time.time()
    print(round(t1-t0, 3), 'sec,', end=' ')
    avg = total/n
    stddev = _sqrt(sqsum/n - avg*avg)
    print('avg %g, stddev %g, min %g, max %g\n' % \
              (avg, stddev, smallest, largest))
random.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _test(N=2000):
    _test_generator(N, random, ())
    _test_generator(N, normalvariate, (0.0, 1.0))
    _test_generator(N, lognormvariate, (0.0, 1.0))
    _test_generator(N, vonmisesvariate, (0.0, 1.0))
    _test_generator(N, gammavariate, (0.01, 1.0))
    _test_generator(N, gammavariate, (0.1, 1.0))
    _test_generator(N, gammavariate, (0.1, 2.0))
    _test_generator(N, gammavariate, (0.5, 1.0))
    _test_generator(N, gammavariate, (0.9, 1.0))
    _test_generator(N, gammavariate, (1.0, 1.0))
    _test_generator(N, gammavariate, (2.0, 1.0))
    _test_generator(N, gammavariate, (20.0, 1.0))
    _test_generator(N, gammavariate, (200.0, 1.0))
    _test_generator(N, gauss, (0.0, 1.0))
    _test_generator(N, betavariate, (3.0, 3.0))
    _test_generator(N, triangular, (0.0, 1.0, 1.0/3.0))

# Create one instance, seeded from current time, and export its methods
# as module-level functions.  The functions share state across all uses
#(both in the user's code and in the Python libraries), but that's fine
# for most programs and is easier for the casual user than making them
# instantiate their own Random() instance.
main0.py 文件源码 项目:PGO-mapscan-opt 作者: seikur0 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def do_full_login(account):
    lock_network.acquire()
    time.sleep(locktime)
    lock_network.release()
    if account['type'] == 'ptc':
        login_ptc(account)
    elif account['type'] == 'google':
        login_google(account)
        new_session(account)
    else:
        lprint('[{}] Error: Login type should be either ptc or google.'.format(account['num']))
        sys.exit()

    cursor_accs = db_accs.cursor()
    while True:
        try:
            cursor_accs.execute("INSERT OR REPLACE INTO accounts VALUES(?,?,?,?,?,?,?)", [account['user'], account['access_token'], account['access_expire_timestamp'], account['api_url'], 0, '0', '0'])
            db_accs.commit()
            return
        except sqlite3.OperationalError as e:
            lprint('[-] Sqlite operational error: {}, account: {} Retrying...'.format(e, account['user']))
        except sqlite3.InterfaceError as e:
            lprint('[-] Sqlite interface error: {}, account: {} Retrying...'.format(e, account['user']))
main0.py 文件源码 项目:PGO-mapscan-opt 作者: seikur0 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def update_data():
    timenow = int(round(time.time(),0))
    cursor_data = db_data.cursor()
    for l in range(0,len(data_buffer)):
        [pokeid, spawnid, latitude, longitude, expiretime, addinfo] = data_buffer.pop()
        db_repeat = True
        while db_repeat:
            try:
                cursor_data.execute("INSERT OR REPLACE INTO spawns VALUES(?,?,?,?,?,?,?,?)", [spawnid, round(latitude, 5), round(longitude, 5), addinfo, pokeid, expiretime, timenow, wID])
                db_repeat = False
            except sqlite3.OperationalError as e:
                lprint('[-] Sqlite operational error: {} Retrying...'.format(e))

    while True:
        try:
            db_data.commit()
            return
        except sqlite3.OperationalError as e:
            lprint('[-] Sqlite operational error: {} Retrying...'.format(e))
cve_scan_v2.py 文件源码 项目:nova 作者: hubblestack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_cache(ttl, cache_path):
    '''
    If url contains valid cache, returns it, else returns empty list.
    '''
    # Check if we have a valid cached version.
    try:
        cached_time = os.path.getmtime(cache_path)
    except OSError:
        return []
    if current_time() - cached_time < ttl:
        log.debug('%s is less than ttl', cache_path)
        try:
            with open(cache_path) as json_file:
                loaded_json = json.load(json_file)
                return loaded_json
        except IOError:
            return []
        except ValueError:
            log.error('%s was not json formatted', cache_path)
            return []
    else:
        log.debug('%s was older than ttl', cache_path)
        return []
rca.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run_demo(updated_services):

    # build both graphs from the edge descriptions
    graph_a = Graph()
    build_dummy_graph(GRAPH_A, graph_a)

    graph_b = Graph()
    build_dummy_graph(GRAPH_B, graph_b)

    # print them as .dot files for reference
    graph_a.print_graph("graph-a", True, True)
    graph_b.print_graph("graph-b", False, True)

    # generate the differences by considering an update on service 'a'
    start_time = time.time()
    diff_graph = graph_a.get_diff(graph_b, updated_services)
    print("--- graph differences calculated in %s seconds --- " % (time.time() - start_time))
    diff_graph.print_graph("graph-diff", False, True)
zhihu.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def login(self, response):
        response_text = response.text
        match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
        xsrf = ''
        if match_obj:
            xsrf = (match_obj.group(1))

        if xsrf:
            post_url = "https://www.zhihu.com/login/phone_num"
            post_data = {
                "_xsrf": xsrf,
                "phone_num": "18487255487",
                "password": "ty158917",
                "captcha": ""
            }

            import time
            t = str(int(time.time() * 1000))
            captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
            yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.login_after_captcha)
tasks.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pycos_proc(n, task=None):
    s = random.uniform(0.5, 3)
    print('%f: process %d sleeping for %f seconds' % (time.time(), n, s))
    yield task.sleep(s)
    print('%f: process %d terminating' % (time.time(), n))

# create 10 clients
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
        """Internal use only.
        """
        if isinstance(gen, str):
            name = gen
        else:
            name = gen.func_name

        if name in self._xfer_funcs:
            code = None
        else:
            # if not inspect.isgeneratorfunction(gen):
            #     logger.warning('"%s" is not a valid generator function', name)
            #     raise StopIteration([])
            code = inspect.getsource(gen).lstrip()

        def _run_req(task=None):
            msg = {'req': 'job', 'auth': self._auth,
                   'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
                reply = yield task.receive()
                if isinstance(reply, Task):
                    if self.status_task:
                        msg = DispycosTaskInfo(reply, args, kwargs, time.time())
                        self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
                if not request.endswith('async'):
                    reply = yield task.receive()
            else:
                reply = None
            raise StopIteration(reply)

        yield Task(_run_req).finish()
tasks.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pycos_proc(n, task=None):
    s = random.uniform(0.5, 3)
    print('%f: process %d sleeping for %f seconds' % (time.time(), n, s))
    yield task.sleep(s)
    print('%f: process %d terminating' % (time.time(), n))

# create 10 clients
httpd.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, name, addr):
            self.name = name
            self.addr = addr
            self.status = None
            self.servers = {}
            self.update_time = time.time()
            self.tasks_submitted = 0
            self.tasks_done = 0
            self.avail_info = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _add_timeout(self, fd):
                if fd._timeout:
                    self._lock.acquire()
                    fd._timeout_id = _time() + fd._timeout + 0.0001
                    i = bisect_left(self._timeouts, (fd._timeout_id, fd))
                    self._timeouts.insert(i, (fd._timeout_id, fd))
                    if self._polling:
                        self.interrupt()
                    self._lock.release()
                else:
                    fd._timeout_id = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _add_timeout(self, fd):
            if fd._timeout:
                fd._timeout_id = _time() + fd._timeout + 0.0001
                i = bisect_left(self._timeouts, (fd._timeout_id, fd))
                self._timeouts.insert(i, (fd._timeout_id, fd))
            else:
                fd._timeout_id = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def wait(self, timeout=None):
        """Must be used with 'yield' as 'yield cv.wait()'.
        """
        task = Pycos.cur_task(self._scheduler)
        if self._owner != task:
            raise RuntimeError('"%s"/%s: invalid lock release - owned by "%s"/%s' %
                               (task._name, task._id, self._owner._name, self._owner._id))
        assert self._depth > 0
        depth = self._depth
        self._depth = 0
        self._owner = None
        if self._waitlist:
            wake = self._waitlist.pop(0)
            wake._proceed_(True)
        self._notifylist.append(task)
        start = _time()
        if (yield task._await_(timeout)) is None:
            try:
                self._notifylist.remove(task)
            except ValueError:
                pass
            raise StopIteration(False)
        while self._owner is not None:
            self._waitlist.insert(0, task)
            if timeout is not None:
                timeout -= (_time() - start)
                if timeout <= 0:
                    raise StopIteration(False)
                start = _time()
            if (yield task._await_(timeout)) is None:
                try:
                    self._waitlist.remove(task)
                except ValueError:
                    pass
                raise StopIteration(False)
        assert self._depth == 0
        self._owner = task
        self._depth = depth
        raise StopIteration(True)
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def receive(self, category=None, timeout=None, alarm_value=None):
        """Similar to 'receive' of Task, except it retrieves (waiting, if
        necessary) messages in given 'category'.
        """
        # assert Pycos.cur_task() == self._task
        c = self._categories.get(category, None)
        if c:
            msg = c.popleft()
            raise StopIteration(msg)
        if timeout:
            start = _time()
        while 1:
            msg = yield self._task.receive(timeout=timeout, alarm_value=alarm_value)
            if msg == alarm_value:
                raise StopIteration(msg)
            for categorize in self._categorize:
                c = categorize(msg)
                if c == category:
                    raise StopIteration(msg)
                if c is not None:
                    bucket = self._categories.get(c, None)
                    if not bucket:
                        bucket = self._categories[c] = collections.deque()
                    bucket.append(msg)
                    break
            else:
                self._categories[None].append(msg)
            if timeout:
                now = _time()
                timeout -= now - start
                start = now
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _suspend(self, task, timeout, alarm_value, state):
        """Internal use only. See sleep/suspend in Task.
        """
        self._lock.acquire()
        if self.__cur_task != task:
            self._lock.release()
            logger.warning('invalid "suspend" - "%s" != "%s"', task, self.__cur_task)
            return -1
        tid = task._id
        if state == Pycos._AwaitMsg_ and task._msgs:
            s, update = task._msgs[0]
            if s == state:
                task._msgs.popleft()
                self._lock.release()
                return update
        if timeout is None:
            task._timeout = None
        else:
            if not isinstance(timeout, (float, int)):
                logger.warning('invalid timeout %s', timeout)
                self._lock.release()
                return -1
            if timeout <= 0:
                self._lock.release()
                return alarm_value
            else:
                task._timeout = _time() + timeout + 0.0001
                heappush(self._timeouts, (task._timeout, tid, alarm_value))
        self._scheduled.discard(tid)
        self._suspended.add(tid)
        task._state = state
        self._lock.release()
        return 0
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
        """Internal use only.
        """
        if isinstance(gen, str):
            name = gen
        else:
            name = gen.__name__

        if name in self._xfer_funcs:
            code = None
        else:
            # if not inspect.isgeneratorfunction(gen):
            #     logger.warning('"%s" is not a valid generator function', name)
            #     raise StopIteration([])
            code = inspect.getsource(gen).lstrip()

        def _run_req(task=None):
            msg = {'req': 'job', 'auth': self._auth,
                   'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
                reply = yield task.receive()
                if isinstance(reply, Task):
                    if self.status_task:
                        msg = DispycosTaskInfo(reply, args, kwargs, time.time())
                        self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
                if not request.endswith('async'):
                    reply = yield task.receive()
            else:
                reply = None
            raise StopIteration(reply)

        yield Task(_run_req).finish()


问题


面经


文章

微信
公众号

扫码关注公众号