python类seconds()的实例源码

entities.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_progress(self, set=False):
        """
        Return progress (between 0 and 1 - 0 is full blue control,
        1 is full green control) and optionally set the current
        progress.
        """
        # TODO: wtf is this thing, and why are we setting values in
        # a getter. Should this be refactored into two functions
        # called `update_progress` and `get_progress` instead?
        rate = self.rate_value
        if rate == 0.0 or self.start is None:
            return self.progress
        dt = reactor.seconds() - self.start
        progress = max(0, min(1, self.progress + rate * dt))
        if set:
            self.progress = progress
        return progress
weapon.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def set_shoot(self, value):
        if value == self.shoot:
            return
        current_time = reactor.seconds()
        if value:
            self.start = current_time
            if self.current_ammo <= 0:
                return
            elif self.reloading and not self.slow_reload:
                return
            self.shoot_time = max(current_time, self.next_shot)
            if self.reloading:
                self.reloading = False
                self.reload_call.cancel()
        else:
            ammo = self.current_ammo
            self.current_ammo = self.get_ammo(True)
            self.next_shot = self.shoot_time + self.delay * (
                ammo - self.current_ammo)
        self.shoot = value
player.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def hit(self, value, by=None, kill_type=WEAPON_KILL):
        if self.hp is None:
            return
        if by is not None and self.team is by.team:
            friendly_fire = self.protocol.friendly_fire
            if friendly_fire == 'on_grief':
                if (kill_type == MELEE_KILL and
                        not self.protocol.spade_teamkills_on_grief):
                    return
                hit_time = self.protocol.friendly_fire_time
                if (self.last_block_destroy is None
                        or reactor.seconds() - self.last_block_destroy >= hit_time):
                    return
            elif not friendly_fire:
                return
        self.set_hp(self.hp - value, by, kill_type=kill_type)
spawn_protect.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def apply_script(protocol, connection, config):
    spawn_protect_time = config.get('spawn_protect_time', 3.0)

    class SpawnProtectConnection(connection):
        spawn_timestamp = None

        def on_spawn(self, pos):
            self.spawn_timestamp = reactor.seconds()
            return connection.on_spawn(self, pos)

        def on_hit(self, hit_amount, hit_player, type, grenade):
            cur_timestamp = reactor.seconds() - spawn_protect_time
            if cur_timestamp < hit_player.spawn_timestamp:
                timespan = -(cur_timestamp - hit_player.spawn_timestamp)
                self.send_chat(
                    "%s is spawn-protected for %s." %
                    (hit_player.name, prettify_timespan(
                        timespan, True)))
                return False
            return connection.on_hit(
                self, hit_amount, hit_player, type, grenade)
    return protocol, SpawnProtectConnection
votekick.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def start(cls, instigator, victim, reason=None):
        protocol = instigator.protocol
        last_votekick = instigator.last_votekick
        reason = reason.strip() if reason else None
        if protocol.votekick:
            raise VotekickFailure(S_IN_PROGRESS)
        elif instigator is victim:
            raise VotekickFailure(S_SELF_VOTEKICK)
        elif protocol.get_required_votes() <= 0:
            raise VotekickFailure(S_NOT_ENOUGH_PLAYERS)
        elif victim.admin or victim.rights.cancel or victim.local:
            raise VotekickFailure(S_VOTEKICK_IMMUNE)
        elif not instigator.admin and (last_votekick is not None and
                                       seconds() - last_votekick < cls.interval):
            raise VotekickFailure(S_NOT_YET)
        elif REQUIRE_REASON and not reason:
            raise VotekickFailure(S_NEED_REASON)

        result = protocol.on_votekick_start(instigator, victim, reason)
        if result is not None:
            raise VotekickFailure(result)

        reason = reason or S_DEFAULT_REASON
        return cls(instigator, victim, reason)
statusserver.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_overview(self):
        current_time = reactor.seconds()
        if (self.last_overview is None or
                self.last_map_name != self.protocol.map_info.name or
                current_time - self.last_overview > OVERVIEW_UPDATE_INTERVAL):
            overview = self.protocol.map.get_overview(rgba=True)
            image = Image.frombytes('RGBA', (512, 512), overview)
            data = BytesIO()
            image.save(data, 'png')
            self.overview = data.getvalue()
            self.last_overview = current_time
            self.last_map_name = self.protocol.map_info.name
        return self.overview
server.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_ban(self, ip, reason, duration, name=None):
        """
        Ban an ip with an optional reason and duration in minutes. If duration
        is None, ban is permanent.
        """
        network = ip_network(text_type(ip), strict=False)
        for connection in list(self.connections.values()):
            if ip_address(connection.address[0]) in network:
                name = connection.name
                connection.kick(silent=True)
        if duration:
            duration = reactor.seconds() + duration * 60
        else:
            duration = None
        self.bans[ip] = (name or '(unknown)', reason, duration)
        self.save_bans()
server.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def update_world(self):
        last_time = self.last_time
        current_time = reactor.seconds()
        if last_time is not None:
            dt = current_time - last_time
            if dt > 1.0:
                print('(warning: high CPU usage detected - %s)' % dt)
        self.last_time = current_time
        ServerProtocol.update_world(self)
        time_taken = reactor.seconds() - current_time
        if time_taken > 1.0:
            print(
                'World update iteration took %s, objects: %s' %
                (time_taken, self.world.objects))

    # events
appcache.py 文件源码 项目:privacyidea-ldap-proxy 作者: NetKnights-GmbH 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_to_cache(self, dn, marker):
        """
        Add the entry to the app cache. It will be automatically removed after ``timeout`` seconds.
        If an entry for ``dn`` (with any marker) already exists, it will be overwritten.
        Keep in mind that removal will then provoke a "Removal from app
        cache failed: ... mapped to ... " log message!
        If an entry for ``dn`` with the same marker exists, the eviction timeout will *not*
        be extended if it is added again.
        This function respects the ``case_insensitive`` option.
        :param dn: DN
        :param marker: App marker (a string)
        """
        if dn in self._entries:
            log.info('Entry {dn!r} already cached {marker!r}, overwriting ...',
                     dn=dn, marker=self._entries[dn])
        current_time = reactor.seconds()
        log.info('Adding to app cache: dn={dn!r}, marker={marker!r}, time={time!r}',
                 dn=dn, time=current_time, marker=marker)
        self._entries[dn] = (marker, current_time)
        self.callLater(self.timeout, self.remove_from_cache, dn, marker)
appcache.py 文件源码 项目:privacyidea-ldap-proxy 作者: NetKnights-GmbH 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_cached_marker(self, dn):
        """
        Retrieve the cached marker for the distinguished name ``dn``. This actually checks that the stored entry
        is still valid. If ``dn`` is not found in the cache, ``None`` is returned and a message is written to the log.
        This function respects the ``case_insensitive`` option.
        :param dn: DN
        :return: string or None
        """
        if dn in self._entries:
            marker, timestamp = self._entries[dn]
            current_time = reactor.seconds()
            if current_time - timestamp < self.timeout:
                return marker
            else:
                log.warn('Inconsistent app cache: dn={dn!r}, inserted={inserted!r}, current={current!r}',
                    dn=dn, inserted=timestamp, current=current_time
                )
        else:
            log.info('No entry in app cache for dn={dn!r}', dn=dn)
        return None
bindcache.py 文件源码 项目:privacyidea-ldap-proxy 作者: NetKnights-GmbH 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def add_to_cache(self, dn, app_marker, password):
        """
        Add the credentials to the bind cache. They are automatically removed from the cache after ``self.timeout``
        seconds using the ``reactor.callLater`` mechanism.
        If the credentials are already found in the bind cache, the time until their removal is **not** extended!
        :param dn: user distinguished name
        :param app_marker: app marker
        :param password: user password
        """
        item = (dn, app_marker, password)
        if item not in self._cache:
            current_time = reactor.seconds()
            log.info('Adding to bind cache: dn={dn!r}, marker={marker!r}, time={time!r}',
                     dn=dn, marker=app_marker, time=current_time)
            self._cache[item] = current_time
            self.callLater(self.timeout, self.remove_from_cache, dn, app_marker, password)
        else:
            log.info('Already in the bind cache: dn={dn!r}, marker={marker!r}',
                     dn=dn, marker=app_marker)
bindcache.py 文件源码 项目:privacyidea-ldap-proxy 作者: NetKnights-GmbH 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def is_cached(self, dn, app_marker, password):
        """
        Determines whether the given credentials are found in the bind cache.
        :param dn: user distinguished name
        :param app_marker: app marker as string
        :param password: user password
        :return: a boolean
        """
        item = (dn, app_marker, password)
        if item in self._cache:
            current_time = reactor.seconds()
            inserted_time = self._cache[item]
            # Even though credentials **should** be removed automatically by ``callLater``, check
            # the stored timestamp.
            if current_time - inserted_time < self.timeout:
                return True
            else:
                log.info('Inconsistent bind cache: dn={dn!r}, marker={marker!r},'
                         'inserted={inserted!r}, current={current!r}',
                    dn=dn, marker=app_marker, inserted=inserted_time, current=current_time,
                )
        return False
test_jobs.py 文件源码 项目:ccs-twistedextensions 作者: apple 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def advanceCompletely(self, amount):
        """
        Move time on this clock forward by the given amount and run whatever
        pending calls should be run. Always complete the deferred calls before
        returning.

        @type amount: C{float}
        @param amount: The number of seconds which to advance this clock's
        time.
        """
        self.rightNow += amount
        self._sortCalls()
        while self.calls and self.calls[0].getTime() <= self.seconds():
            call = self.calls.pop(0)
            call.called = 1
            yield call.func(*call.args, **call.kw)
            self._sortCalls()
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_startCheckingExpiration(self):
        """
        L{server.Session.startCheckingExpiration} causes the session to expire
        after L{server.Session.sessionTimeout} seconds without activity.
        """
        self.session.startCheckingExpiration()

        # Advance to almost the timeout - nothing should happen.
        self.clock.advance(self.session.sessionTimeout - 1)
        self.assertIn(self.uid, self.site.sessions)

        # Advance to the timeout, the session should expire.
        self.clock.advance(1)
        self.assertNotIn(self.uid, self.site.sessions)

        # There should be no calls left over, either.
        self.assertFalse(self.clock.calls)
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_nonASCII(self):
        """
        Bytes in fields of the request which are not part of ASCII are escaped
        in the result.
        """
        reactor = Clock()
        reactor.advance(1234567890)

        timestamp = http.datetimeToLogString(reactor.seconds())
        request = DummyRequestForLogTest(http.HTTPFactory(reactor=reactor))
        request.client = IPv4Address("TCP", b"evil x-forwarded-for \x80", 12345)
        request.method = b"POS\x81"
        request.protocol = b"HTTP/1.\x82"
        request.requestHeaders.addRawHeader(b"referer", b"evil \x83")
        request.requestHeaders.addRawHeader(b"user-agent", b"evil \x84")

        line = http.combinedLogFormatter(timestamp, request)
        self.assertEqual(
            u'"evil x-forwarded-for \\x80" - - [13/Feb/2009:23:31:30 +0000] '
            u'"POS\\x81 /dummy HTTP/1.0" 123 - "evil \\x83" "evil \\x84"',
            line)
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _xforwardedforTest(self, header):
        """
        Assert that a request with the given value in its I{X-Forwarded-For}
        header is logged by L{proxiedLogFormatter} the same way it would have
        been logged by L{combinedLogFormatter} but with 172.16.1.2 as the
        client address instead of the normal value.

        @param header: An I{X-Forwarded-For} header with left-most address of
            172.16.1.2.
        """
        reactor = Clock()
        reactor.advance(1234567890)

        timestamp = http.datetimeToLogString(reactor.seconds())
        request = DummyRequestForLogTest(http.HTTPFactory(reactor=reactor))
        expected = http.combinedLogFormatter(timestamp, request).replace(
            u"1.2.3.4", u"172.16.1.2")
        request.requestHeaders.setRawHeaders(b"x-forwarded-for", [header])
        line = http.proxiedLogFormatter(timestamp, request)

        self.assertEqual(expected, line)
test_internet.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_seconds(self):
        """
        L{twisted.internet.reactor.seconds} should return something
        like a number.

        1. This test specifically does not assert any relation to the
           "system time" as returned by L{time.time} or
           L{twisted.python.runtime.seconds}, because at some point we
           may find a better option for scheduling calls than
           wallclock-time.
        2. This test *also* does not assert anything about the type of
           the result, because operations may not return ints or
           floats: For example, datetime-datetime == timedelta(0).
        """
        now = reactor.seconds()
        self.assertEqual(now-now+now, now)
p2p.py 文件源码 项目:p2pool-bch 作者: amarian12 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def do_ping(self):
        start = reactor.seconds()
        yield self.get_shares(hashes=[0], parents=0, stops=[])
        end = reactor.seconds()
        defer.returnValue(end - start)
entities.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_rate(self):
        rate = 0
        for player in self.players:
            if player.team.id:
                rate += 1
            else:
                rate -= 1
        progress = self.progress
        if ((progress == 1.0 and (rate > 0 or rate == 0)) or
                (progress == 0.0 and (rate < 0 or rate == 0))):
            return
        self.rate = rate
        self.rate_value = rate * TC_CAPTURE_RATE
        if self.finish_call is not None:
            self.finish_call.cancel()
            self.finish_call = None
        if rate != 0:
            self.start = reactor.seconds()
            rate_value = self.rate_value
            if rate_value < 0:
                self.capturing_team = self.protocol.blue_team
                end_time = progress / -rate_value
            else:
                self.capturing_team = self.protocol.green_team
                end_time = (1.0 - progress) / rate_value
            if self.capturing_team is not self.team:
                self.finish_call = reactor.callLater(end_time, self.finish)
        self.send_progress()
weapon.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_ammo(self, no_max=False):
        if self.shoot:
            dt = reactor.seconds() - self.shoot_time
            ammo = self.current_ammo - max(0, int(
                math.ceil(dt / self.delay)))
        else:
            ammo = self.current_ammo
        if no_max:
            return ammo
        return max(0, ammo)
player.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_respawn_time(self):
        if not self.respawn_time:
            return 0
        if self.protocol.respawn_waves:
            offset = reactor.seconds() % self.respawn_time
        else:
            offset = 0
        return self.respawn_time - offset
aimbot2.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def hackinfo_player(player):
    info = "%s #%s (%s) has an accuracy of: " % (
        player.name, player.player_id, player.address[0])
    info += accuracy_player(player, False)
    ratio = player.ratio_kills / float(max(1, player.ratio_deaths))
    info += " Kill-death ratio of %.2f (%s kills, %s deaths)." % (
        ratio, player.ratio_kills, player.ratio_deaths)
    info += " %i kills in the last %i seconds." % (
        player.get_kill_count(), KILL_TIME)
    info += " %i headshot snaps in the last %i seconds." % (
        player.get_headshot_snap_count(), HEADSHOT_SNAP_TIME)
    return info
votemap.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def verify(self):
        instigator = self.instigator
        if instigator is None:
            return True
        last = instigator.last_votemap
        if (last is not None and
                reactor.seconds() - last < self.vote_interval):
            return "You can't start a vote now."
        return True
votemap.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def set_cooldown(self):
        if self.instigator is not None and not self.instigator.admin:
            self.instigator.last_votemap = reactor.seconds()
afk.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def afk(connection, player):
    player = get_player(connection.protocol, player)
    elapsed = prettify_timespan(reactor.seconds() - player.last_activity, True)
    return S_AFK_CHECK.format(player=player.name, time=elapsed)
afk.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def kick_afk(connection, minutes, amount=None):
    protocol = connection.protocol
    minutes = int(minutes)
    if minutes < 1:
        raise ValueError()
    to_kick = []
    seconds = minutes * 60.0
    minutes_s = prettify_timespan(seconds)
    lower_bound = reactor.seconds() - seconds
    for conn in protocol.connections.values():
        if not conn.admin and conn.last_activity < lower_bound:
            to_kick.append(conn)
    if not to_kick:
        return S_NO_PLAYERS_INACTIVE.format(time=minutes_s)
    to_kick.sort(key=attrgetter('last_activity'))
    to_kick.sort(key=lambda conn: conn.name is None)
    amount = amount or len(to_kick)
    kicks = 0
    for conn in to_kick[:amount]:
        if conn.name:
            conn.afk_kick()
            kicks += 1
        else:
            conn.disconnect()
    message = S_AFK_KICKED.format(
        num_players=kicks,
        num_connections=amount - kicks,
        time=minutes_s)
    protocol.irc_say('* ' + message)
    if connection in protocol.players:
        return message
airstrike2.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, secs, f, *args, **kw):
        self.seconds = secs
        self.f = f
        self.args = args
        self.kw = kw
airstrike2.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def start_or_reset(self):
        if self.call and self.call.active():
            self.call.reset(ZOOMV_TIME)
        else:
            self.call = reactor.callLater(
                self.seconds, self.f, *self.args, **self.kw)
markers.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, protocol, team, x, y):
        self.protocol = protocol
        self.team = team
        self.x = x
        self.y = y
        if self.random_colors:
            self.color = choice(self.random_colors)
        elif self.team_color:
            self.color = make_color(*team.color)
        self.blocks = set()
        base_lines, base_points = self.lines, self.points
        self.lines, self.points = [], []
        for line in base_lines:
            self.make_line(*line)
        for point in base_points:
            self.make_block(*point)
        # find markers we're colliding with
        has_timer = self.duration is not None
        collisions = []
        current_time = seconds()
        worst_time = current_time + self.duration if has_timer else None
        for marker in protocol.markers:
            intersect = marker.blocks & self.blocks
            if intersect:
                self.blocks -= intersect
                collisions.append(marker)
                if has_timer and marker.expire_call:
                    worst_time = min(worst_time, marker.expire_call.getTime())
        # forward expiration time so that colliding markers vanish all at once
        if has_timer:
            delay = worst_time - current_time
            self.expire_call = callLater(delay, self.expire)
        self.build()
        team.marker_count[self.__class__] += 1
        protocol.markers.append(self)
        if self.background_class:
            self.background = self.background_class(protocol, team, x, y)
statusserver.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def render_GET(self, request):
        protocol = self.protocol

        request.setHeader("Content-Type", "application/json")

        players = []

        for player in protocol.players.values():
            player_data = {}
            player_data['name'] = player.name
            player_data['latency'] = player.latency
            player_data['kills'] = player.kills
            player_data['team'] = player.team.name

            players.append(player_data)

        dictionary = {
            "serverIdentifier": protocol.identifier,
            "serverName": protocol.name,
            "serverVersion": protocol.version,
            "serverUptime": reactor.seconds() - protocol.start_time,
            "gameMode": protocol.game_mode_name,
            "map": {
                "name": protocol.map_info.name,
                "version": protocol.map_info.version,
                "author": protocol.map_info.author
            },
            "scripts": protocol.config.get("scripts", []),
            "players": players,
            "maxPlayers": protocol.max_players,
            "scores": {
                "currentBlueScore": protocol.blue_team.score,
                "currentGreenScore": protocol.green_team.score,
                "maxScore": protocol.max_score}
        }

        return json.dumps(dictionary).encode()


问题


面经


文章

微信
公众号

扫码关注公众号