def setup_limit(self):
"""set up the process limit"""
assert currentThread().getName() == 'MainThread'
os.setpgrp()
if self._limit_set <= 0:
if self.max_time is not None:
self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
self._time_out)
self._start_time = int(time())
self._timer.start()
if self.max_cpu_time is not None:
self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
setrlimit(RLIMIT_CPU, cpu_limit)
if self.max_memory is not None:
self._msentinel = MemorySentinel(1, int(self.max_memory) )
self._old_max_memory = getrlimit(RLIMIT_AS)
self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
as_limit = (int(self.max_memory), self._old_max_memory[1])
setrlimit(RLIMIT_AS, as_limit)
self._msentinel.start()
self._limit_set += 1
python类RLIMIT_AS的实例源码
def clean_limit(self):
"""reinstall the old process limit"""
if self._limit_set > 0:
if self.max_time is not None:
self._timer.cancel()
self._elapse_time += int(time())-self._start_time
self._timer = None
signal(SIGUSR2, self._old_usr2_hdlr)
if self.max_cpu_time is not None:
setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
signal(SIGXCPU, self._old_sigxcpu_hdlr)
if self.max_memory is not None:
self._msentinel.stop()
self._msentinel = None
setrlimit(RLIMIT_AS, self._old_max_memory)
signal(SIGUSR1, self._old_usr1_hdlr)
self._limit_set -= 1
def setup_limit(self):
"""set up the process limit"""
assert currentThread().getName() == 'MainThread'
os.setpgrp()
if self._limit_set <= 0:
if self.max_time is not None:
self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
self._time_out)
self._start_time = int(time())
self._timer.start()
if self.max_cpu_time is not None:
self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
setrlimit(RLIMIT_CPU, cpu_limit)
if self.max_memory is not None:
self._msentinel = MemorySentinel(1, int(self.max_memory) )
self._old_max_memory = getrlimit(RLIMIT_AS)
self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
as_limit = (int(self.max_memory), self._old_max_memory[1])
setrlimit(RLIMIT_AS, as_limit)
self._msentinel.start()
self._limit_set += 1
def clean_limit(self):
"""reinstall the old process limit"""
if self._limit_set > 0:
if self.max_time is not None:
self._timer.cancel()
self._elapse_time += int(time())-self._start_time
self._timer = None
signal(SIGUSR2, self._old_usr2_hdlr)
if self.max_cpu_time is not None:
setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
signal(SIGXCPU, self._old_sigxcpu_hdlr)
if self.max_memory is not None:
self._msentinel.stop()
self._msentinel = None
setrlimit(RLIMIT_AS, self._old_max_memory)
signal(SIGUSR1, self._old_usr1_hdlr)
self._limit_set -= 1
def setup_limit(self):
"""set up the process limit"""
assert currentThread().getName() == 'MainThread'
os.setpgrp()
if self._limit_set <= 0:
if self.max_time is not None:
self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
self._time_out)
self._start_time = int(time())
self._timer.start()
if self.max_cpu_time is not None:
self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
setrlimit(RLIMIT_CPU, cpu_limit)
if self.max_memory is not None:
self._msentinel = MemorySentinel(1, int(self.max_memory) )
self._old_max_memory = getrlimit(RLIMIT_AS)
self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
as_limit = (int(self.max_memory), self._old_max_memory[1])
setrlimit(RLIMIT_AS, as_limit)
self._msentinel.start()
self._limit_set += 1
def clean_limit(self):
"""reinstall the old process limit"""
if self._limit_set > 0:
if self.max_time is not None:
self._timer.cancel()
self._elapse_time += int(time())-self._start_time
self._timer = None
signal(SIGUSR2, self._old_usr2_hdlr)
if self.max_cpu_time is not None:
setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
signal(SIGXCPU, self._old_sigxcpu_hdlr)
if self.max_memory is not None:
self._msentinel.stop()
self._msentinel = None
setrlimit(RLIMIT_AS, self._old_max_memory)
signal(SIGUSR1, self._old_usr1_hdlr)
self._limit_set -= 1
def setup_limit(self):
"""set up the process limit"""
assert currentThread().getName() == 'MainThread'
os.setpgrp()
if self._limit_set <= 0:
if self.max_time is not None:
self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
self._time_out)
self._start_time = int(time())
self._timer.start()
if self.max_cpu_time is not None:
self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
setrlimit(RLIMIT_CPU, cpu_limit)
if self.max_memory is not None:
self._msentinel = MemorySentinel(1, int(self.max_memory) )
self._old_max_memory = getrlimit(RLIMIT_AS)
self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
as_limit = (int(self.max_memory), self._old_max_memory[1])
setrlimit(RLIMIT_AS, as_limit)
self._msentinel.start()
self._limit_set += 1
def clean_limit(self):
"""reinstall the old process limit"""
if self._limit_set > 0:
if self.max_time is not None:
self._timer.cancel()
self._elapse_time += int(time())-self._start_time
self._timer = None
signal(SIGUSR2, self._old_usr2_hdlr)
if self.max_cpu_time is not None:
setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
signal(SIGXCPU, self._old_sigxcpu_hdlr)
if self.max_memory is not None:
self._msentinel.stop()
self._msentinel = None
setrlimit(RLIMIT_AS, self._old_max_memory)
signal(SIGUSR1, self._old_usr1_hdlr)
self._limit_set -= 1
def limitMemory(nbytes):
setrlimit(RLIMIT_AS, (nbytes, -1))
def SetMemoryLimits():
try:
from resource import getrlimit, setrlimit, RLIMIT_AS
soft, hard = getrlimit(RLIMIT_AS)
setrlimit(RLIMIT_AS, (hard, hard))
soft, hard = getrlimit(RLIMIT_AS)
except:
pass
def set_memory_limit(memory):
"""*memory* must be given in bytes or None."""
if memory is None:
return
assert can_set_limits()
_set_limit(resource.RLIMIT_AS, memory)
def _get_external_memory_limit():
"""Return external soft memory limit in bytes or None if not set."""
if not can_set_limits():
return None
return _get_external_limit(resource.RLIMIT_AS)
def set_memory_limit(memory):
"""*memory* must be given in bytes or None."""
if memory is None:
return
assert can_set_limits()
_set_limit(resource.RLIMIT_AS, memory)
def _get_external_memory_limit():
"""Return external soft memory limit in bytes or None if not set."""
if not can_set_limits():
return None
return _get_external_limit(resource.RLIMIT_AS)
def getMemoryLimit(): # noqa
try:
limit = getrlimit(RLIMIT_AS)[0]
if 0 < limit:
limit *= PAGE_SIZE
return limit
except ValueError:
return None
def setMemoryLimit(max_mem): # noqa
if max_mem is None:
max_mem = -1
try:
setrlimit(RLIMIT_AS, (max_mem, -1))
return True
except ValueError:
return False
def getMemoryLimit():
try:
limit = getrlimit(RLIMIT_AS)[0]
if 0 < limit:
limit *= PAGE_SIZE
return limit
except ValueError:
return None
def setMemoryLimit(max_mem):
if max_mem is None:
max_mem = -1
try:
setrlimit(RLIMIT_AS, (max_mem, -1))
return True
except ValueError:
return False