def get_progress(self, set=False):
"""
Return progress (between 0 and 1 - 0 is full blue control,
1 is full green control) and optionally set the current
progress.
"""
# TODO: wtf is this thing, and why are we setting values in
# a getter. Should this be refactored into two functions
# called `update_progress` and `get_progress` instead?
rate = self.rate_value
if rate == 0.0 or self.start is None:
return self.progress
dt = reactor.seconds() - self.start
progress = max(0, min(1, self.progress + rate * dt))
if set:
self.progress = progress
return progress
python类seconds()的实例源码
def set_shoot(self, value):
if value == self.shoot:
return
current_time = reactor.seconds()
if value:
self.start = current_time
if self.current_ammo <= 0:
return
elif self.reloading and not self.slow_reload:
return
self.shoot_time = max(current_time, self.next_shot)
if self.reloading:
self.reloading = False
self.reload_call.cancel()
else:
ammo = self.current_ammo
self.current_ammo = self.get_ammo(True)
self.next_shot = self.shoot_time + self.delay * (
ammo - self.current_ammo)
self.shoot = value
def hit(self, value, by=None, kill_type=WEAPON_KILL):
if self.hp is None:
return
if by is not None and self.team is by.team:
friendly_fire = self.protocol.friendly_fire
if friendly_fire == 'on_grief':
if (kill_type == MELEE_KILL and
not self.protocol.spade_teamkills_on_grief):
return
hit_time = self.protocol.friendly_fire_time
if (self.last_block_destroy is None
or reactor.seconds() - self.last_block_destroy >= hit_time):
return
elif not friendly_fire:
return
self.set_hp(self.hp - value, by, kill_type=kill_type)
def apply_script(protocol, connection, config):
spawn_protect_time = config.get('spawn_protect_time', 3.0)
class SpawnProtectConnection(connection):
spawn_timestamp = None
def on_spawn(self, pos):
self.spawn_timestamp = reactor.seconds()
return connection.on_spawn(self, pos)
def on_hit(self, hit_amount, hit_player, type, grenade):
cur_timestamp = reactor.seconds() - spawn_protect_time
if cur_timestamp < hit_player.spawn_timestamp:
timespan = -(cur_timestamp - hit_player.spawn_timestamp)
self.send_chat(
"%s is spawn-protected for %s." %
(hit_player.name, prettify_timespan(
timespan, True)))
return False
return connection.on_hit(
self, hit_amount, hit_player, type, grenade)
return protocol, SpawnProtectConnection
def start(cls, instigator, victim, reason=None):
protocol = instigator.protocol
last_votekick = instigator.last_votekick
reason = reason.strip() if reason else None
if protocol.votekick:
raise VotekickFailure(S_IN_PROGRESS)
elif instigator is victim:
raise VotekickFailure(S_SELF_VOTEKICK)
elif protocol.get_required_votes() <= 0:
raise VotekickFailure(S_NOT_ENOUGH_PLAYERS)
elif victim.admin or victim.rights.cancel or victim.local:
raise VotekickFailure(S_VOTEKICK_IMMUNE)
elif not instigator.admin and (last_votekick is not None and
seconds() - last_votekick < cls.interval):
raise VotekickFailure(S_NOT_YET)
elif REQUIRE_REASON and not reason:
raise VotekickFailure(S_NEED_REASON)
result = protocol.on_votekick_start(instigator, victim, reason)
if result is not None:
raise VotekickFailure(result)
reason = reason or S_DEFAULT_REASON
return cls(instigator, victim, reason)
def get_overview(self):
current_time = reactor.seconds()
if (self.last_overview is None or
self.last_map_name != self.protocol.map_info.name or
current_time - self.last_overview > OVERVIEW_UPDATE_INTERVAL):
overview = self.protocol.map.get_overview(rgba=True)
image = Image.frombytes('RGBA', (512, 512), overview)
data = BytesIO()
image.save(data, 'png')
self.overview = data.getvalue()
self.last_overview = current_time
self.last_map_name = self.protocol.map_info.name
return self.overview
def add_ban(self, ip, reason, duration, name=None):
"""
Ban an ip with an optional reason and duration in minutes. If duration
is None, ban is permanent.
"""
network = ip_network(text_type(ip), strict=False)
for connection in list(self.connections.values()):
if ip_address(connection.address[0]) in network:
name = connection.name
connection.kick(silent=True)
if duration:
duration = reactor.seconds() + duration * 60
else:
duration = None
self.bans[ip] = (name or '(unknown)', reason, duration)
self.save_bans()
def update_world(self):
last_time = self.last_time
current_time = reactor.seconds()
if last_time is not None:
dt = current_time - last_time
if dt > 1.0:
print('(warning: high CPU usage detected - %s)' % dt)
self.last_time = current_time
ServerProtocol.update_world(self)
time_taken = reactor.seconds() - current_time
if time_taken > 1.0:
print(
'World update iteration took %s, objects: %s' %
(time_taken, self.world.objects))
# events
def add_to_cache(self, dn, marker):
"""
Add the entry to the app cache. It will be automatically removed after ``timeout`` seconds.
If an entry for ``dn`` (with any marker) already exists, it will be overwritten.
Keep in mind that removal will then provoke a "Removal from app
cache failed: ... mapped to ... " log message!
If an entry for ``dn`` with the same marker exists, the eviction timeout will *not*
be extended if it is added again.
This function respects the ``case_insensitive`` option.
:param dn: DN
:param marker: App marker (a string)
"""
if dn in self._entries:
log.info('Entry {dn!r} already cached {marker!r}, overwriting ...',
dn=dn, marker=self._entries[dn])
current_time = reactor.seconds()
log.info('Adding to app cache: dn={dn!r}, marker={marker!r}, time={time!r}',
dn=dn, time=current_time, marker=marker)
self._entries[dn] = (marker, current_time)
self.callLater(self.timeout, self.remove_from_cache, dn, marker)
def get_cached_marker(self, dn):
"""
Retrieve the cached marker for the distinguished name ``dn``. This actually checks that the stored entry
is still valid. If ``dn`` is not found in the cache, ``None`` is returned and a message is written to the log.
This function respects the ``case_insensitive`` option.
:param dn: DN
:return: string or None
"""
if dn in self._entries:
marker, timestamp = self._entries[dn]
current_time = reactor.seconds()
if current_time - timestamp < self.timeout:
return marker
else:
log.warn('Inconsistent app cache: dn={dn!r}, inserted={inserted!r}, current={current!r}',
dn=dn, inserted=timestamp, current=current_time
)
else:
log.info('No entry in app cache for dn={dn!r}', dn=dn)
return None
def add_to_cache(self, dn, app_marker, password):
"""
Add the credentials to the bind cache. They are automatically removed from the cache after ``self.timeout``
seconds using the ``reactor.callLater`` mechanism.
If the credentials are already found in the bind cache, the time until their removal is **not** extended!
:param dn: user distinguished name
:param app_marker: app marker
:param password: user password
"""
item = (dn, app_marker, password)
if item not in self._cache:
current_time = reactor.seconds()
log.info('Adding to bind cache: dn={dn!r}, marker={marker!r}, time={time!r}',
dn=dn, marker=app_marker, time=current_time)
self._cache[item] = current_time
self.callLater(self.timeout, self.remove_from_cache, dn, app_marker, password)
else:
log.info('Already in the bind cache: dn={dn!r}, marker={marker!r}',
dn=dn, marker=app_marker)
def is_cached(self, dn, app_marker, password):
"""
Determines whether the given credentials are found in the bind cache.
:param dn: user distinguished name
:param app_marker: app marker as string
:param password: user password
:return: a boolean
"""
item = (dn, app_marker, password)
if item in self._cache:
current_time = reactor.seconds()
inserted_time = self._cache[item]
# Even though credentials **should** be removed automatically by ``callLater``, check
# the stored timestamp.
if current_time - inserted_time < self.timeout:
return True
else:
log.info('Inconsistent bind cache: dn={dn!r}, marker={marker!r},'
'inserted={inserted!r}, current={current!r}',
dn=dn, marker=app_marker, inserted=inserted_time, current=current_time,
)
return False
def advanceCompletely(self, amount):
"""
Move time on this clock forward by the given amount and run whatever
pending calls should be run. Always complete the deferred calls before
returning.
@type amount: C{float}
@param amount: The number of seconds which to advance this clock's
time.
"""
self.rightNow += amount
self._sortCalls()
while self.calls and self.calls[0].getTime() <= self.seconds():
call = self.calls.pop(0)
call.called = 1
yield call.func(*call.args, **call.kw)
self._sortCalls()
def test_startCheckingExpiration(self):
"""
L{server.Session.startCheckingExpiration} causes the session to expire
after L{server.Session.sessionTimeout} seconds without activity.
"""
self.session.startCheckingExpiration()
# Advance to almost the timeout - nothing should happen.
self.clock.advance(self.session.sessionTimeout - 1)
self.assertIn(self.uid, self.site.sessions)
# Advance to the timeout, the session should expire.
self.clock.advance(1)
self.assertNotIn(self.uid, self.site.sessions)
# There should be no calls left over, either.
self.assertFalse(self.clock.calls)
def test_nonASCII(self):
"""
Bytes in fields of the request which are not part of ASCII are escaped
in the result.
"""
reactor = Clock()
reactor.advance(1234567890)
timestamp = http.datetimeToLogString(reactor.seconds())
request = DummyRequestForLogTest(http.HTTPFactory(reactor=reactor))
request.client = IPv4Address("TCP", b"evil x-forwarded-for \x80", 12345)
request.method = b"POS\x81"
request.protocol = b"HTTP/1.\x82"
request.requestHeaders.addRawHeader(b"referer", b"evil \x83")
request.requestHeaders.addRawHeader(b"user-agent", b"evil \x84")
line = http.combinedLogFormatter(timestamp, request)
self.assertEqual(
u'"evil x-forwarded-for \\x80" - - [13/Feb/2009:23:31:30 +0000] '
u'"POS\\x81 /dummy HTTP/1.0" 123 - "evil \\x83" "evil \\x84"',
line)
def _xforwardedforTest(self, header):
"""
Assert that a request with the given value in its I{X-Forwarded-For}
header is logged by L{proxiedLogFormatter} the same way it would have
been logged by L{combinedLogFormatter} but with 172.16.1.2 as the
client address instead of the normal value.
@param header: An I{X-Forwarded-For} header with left-most address of
172.16.1.2.
"""
reactor = Clock()
reactor.advance(1234567890)
timestamp = http.datetimeToLogString(reactor.seconds())
request = DummyRequestForLogTest(http.HTTPFactory(reactor=reactor))
expected = http.combinedLogFormatter(timestamp, request).replace(
u"1.2.3.4", u"172.16.1.2")
request.requestHeaders.setRawHeaders(b"x-forwarded-for", [header])
line = http.proxiedLogFormatter(timestamp, request)
self.assertEqual(expected, line)
def test_seconds(self):
"""
L{twisted.internet.reactor.seconds} should return something
like a number.
1. This test specifically does not assert any relation to the
"system time" as returned by L{time.time} or
L{twisted.python.runtime.seconds}, because at some point we
may find a better option for scheduling calls than
wallclock-time.
2. This test *also* does not assert anything about the type of
the result, because operations may not return ints or
floats: For example, datetime-datetime == timedelta(0).
"""
now = reactor.seconds()
self.assertEqual(now-now+now, now)
def do_ping(self):
start = reactor.seconds()
yield self.get_shares(hashes=[0], parents=0, stops=[])
end = reactor.seconds()
defer.returnValue(end - start)
def update_rate(self):
rate = 0
for player in self.players:
if player.team.id:
rate += 1
else:
rate -= 1
progress = self.progress
if ((progress == 1.0 and (rate > 0 or rate == 0)) or
(progress == 0.0 and (rate < 0 or rate == 0))):
return
self.rate = rate
self.rate_value = rate * TC_CAPTURE_RATE
if self.finish_call is not None:
self.finish_call.cancel()
self.finish_call = None
if rate != 0:
self.start = reactor.seconds()
rate_value = self.rate_value
if rate_value < 0:
self.capturing_team = self.protocol.blue_team
end_time = progress / -rate_value
else:
self.capturing_team = self.protocol.green_team
end_time = (1.0 - progress) / rate_value
if self.capturing_team is not self.team:
self.finish_call = reactor.callLater(end_time, self.finish)
self.send_progress()
def get_ammo(self, no_max=False):
if self.shoot:
dt = reactor.seconds() - self.shoot_time
ammo = self.current_ammo - max(0, int(
math.ceil(dt / self.delay)))
else:
ammo = self.current_ammo
if no_max:
return ammo
return max(0, ammo)
def get_respawn_time(self):
if not self.respawn_time:
return 0
if self.protocol.respawn_waves:
offset = reactor.seconds() % self.respawn_time
else:
offset = 0
return self.respawn_time - offset
def hackinfo_player(player):
info = "%s #%s (%s) has an accuracy of: " % (
player.name, player.player_id, player.address[0])
info += accuracy_player(player, False)
ratio = player.ratio_kills / float(max(1, player.ratio_deaths))
info += " Kill-death ratio of %.2f (%s kills, %s deaths)." % (
ratio, player.ratio_kills, player.ratio_deaths)
info += " %i kills in the last %i seconds." % (
player.get_kill_count(), KILL_TIME)
info += " %i headshot snaps in the last %i seconds." % (
player.get_headshot_snap_count(), HEADSHOT_SNAP_TIME)
return info
def verify(self):
instigator = self.instigator
if instigator is None:
return True
last = instigator.last_votemap
if (last is not None and
reactor.seconds() - last < self.vote_interval):
return "You can't start a vote now."
return True
def set_cooldown(self):
if self.instigator is not None and not self.instigator.admin:
self.instigator.last_votemap = reactor.seconds()
def afk(connection, player):
player = get_player(connection.protocol, player)
elapsed = prettify_timespan(reactor.seconds() - player.last_activity, True)
return S_AFK_CHECK.format(player=player.name, time=elapsed)
def kick_afk(connection, minutes, amount=None):
protocol = connection.protocol
minutes = int(minutes)
if minutes < 1:
raise ValueError()
to_kick = []
seconds = minutes * 60.0
minutes_s = prettify_timespan(seconds)
lower_bound = reactor.seconds() - seconds
for conn in protocol.connections.values():
if not conn.admin and conn.last_activity < lower_bound:
to_kick.append(conn)
if not to_kick:
return S_NO_PLAYERS_INACTIVE.format(time=minutes_s)
to_kick.sort(key=attrgetter('last_activity'))
to_kick.sort(key=lambda conn: conn.name is None)
amount = amount or len(to_kick)
kicks = 0
for conn in to_kick[:amount]:
if conn.name:
conn.afk_kick()
kicks += 1
else:
conn.disconnect()
message = S_AFK_KICKED.format(
num_players=kicks,
num_connections=amount - kicks,
time=minutes_s)
protocol.irc_say('* ' + message)
if connection in protocol.players:
return message
def __init__(self, secs, f, *args, **kw):
self.seconds = secs
self.f = f
self.args = args
self.kw = kw
def start_or_reset(self):
if self.call and self.call.active():
self.call.reset(ZOOMV_TIME)
else:
self.call = reactor.callLater(
self.seconds, self.f, *self.args, **self.kw)
def __init__(self, protocol, team, x, y):
self.protocol = protocol
self.team = team
self.x = x
self.y = y
if self.random_colors:
self.color = choice(self.random_colors)
elif self.team_color:
self.color = make_color(*team.color)
self.blocks = set()
base_lines, base_points = self.lines, self.points
self.lines, self.points = [], []
for line in base_lines:
self.make_line(*line)
for point in base_points:
self.make_block(*point)
# find markers we're colliding with
has_timer = self.duration is not None
collisions = []
current_time = seconds()
worst_time = current_time + self.duration if has_timer else None
for marker in protocol.markers:
intersect = marker.blocks & self.blocks
if intersect:
self.blocks -= intersect
collisions.append(marker)
if has_timer and marker.expire_call:
worst_time = min(worst_time, marker.expire_call.getTime())
# forward expiration time so that colliding markers vanish all at once
if has_timer:
delay = worst_time - current_time
self.expire_call = callLater(delay, self.expire)
self.build()
team.marker_count[self.__class__] += 1
protocol.markers.append(self)
if self.background_class:
self.background = self.background_class(protocol, team, x, y)
def render_GET(self, request):
protocol = self.protocol
request.setHeader("Content-Type", "application/json")
players = []
for player in protocol.players.values():
player_data = {}
player_data['name'] = player.name
player_data['latency'] = player.latency
player_data['kills'] = player.kills
player_data['team'] = player.team.name
players.append(player_data)
dictionary = {
"serverIdentifier": protocol.identifier,
"serverName": protocol.name,
"serverVersion": protocol.version,
"serverUptime": reactor.seconds() - protocol.start_time,
"gameMode": protocol.game_mode_name,
"map": {
"name": protocol.map_info.name,
"version": protocol.map_info.version,
"author": protocol.map_info.author
},
"scripts": protocol.config.get("scripts", []),
"players": players,
"maxPlayers": protocol.max_players,
"scores": {
"currentBlueScore": protocol.blue_team.score,
"currentGreenScore": protocol.green_team.score,
"maxScore": protocol.max_score}
}
return json.dumps(dictionary).encode()