def __init__(self, name, sensors_list, pwm_path, minPWM, startPWM,
lock, pwm_enable, algo):
self.sensors_list = sensors_list
# this is outside limits so we force the fan to write a new value
self.lastvalue = -100
self.pwm_enable = pwm_enable
self.pwm_path = pwm_path
self.name = name
self.minPWM = int(minPWM)
self.startPWM = int(startPWM)
self.sleep = False
self.lock = int(lock)
self.alock = 0
self.pwm_history = deque('9876543210', 10)
self.logger = logging.getLogger("fanicontrol")
self.algo = algo
python类deque()的实例源码
def set_config(self):
data=deque(open('New_Session.txt','r'),18)
header={}
final_request={}
form_request=[]
for i in data:
print(i)
if ':' in str(i):
special_data=str(i).split(':')[1].replace('\n','')
header[str(i).split(':')[0]]=special_data
else:
form_request.append(i)
if 'CONNECT mp.weixin.qq.com' in header and 'Request header' in header and 'Request body' in header :
list(map(header.pop,['CONNECT mp.weixin.qq.com','Request header','Request body','Request url','Proxy-Connection']))
#??????????list map??????????
split_request=form_request[0].replace('GET ','').split('&')
for j in split_request[1:]:
final_request[str(j).split('=')[0]]=str(j).split('=')[1]
final_request[split_request[0].split('?')[1].split('=')[0]]=split_request[0].split('?')[1].split('=')[1]
return [header,final_request]
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.
"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.
"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
def __init__(self, trace, title='Optimizations', **kwargs):
# context should be a dictionary containing the backward traced result of each relevant register
super(OptimizationViewer, self).__init__(title)
self.orig_trace = trace
self.trace = deepcopy(trace)
self.undo_stack = deque([deepcopy(trace), deepcopy(trace), deepcopy(trace)], maxlen=3)
self.opti_map = dict(zip(optimization_names, optimizations))
self.order = []
self.foldable_regs = []
self.save = kwargs.get('save', None)
def __init__(self, trace, title='Optimizations (legacy)', **kwargs):
# context should be a dictionary containing the backward traced result of each relevant register
super(OptimizationViewer, self).__init__(title)
self.orig_trace = trace
self.trace = deepcopy(trace)
self.undo_stack = deque([deepcopy(trace), deepcopy(trace), deepcopy(trace)], maxlen=3)
self.opti_map = dict(zip(optimization_names, optimizations))
self.order = []
self.foldable_regs = []
self.save = kwargs.get('save', None)
def __init__(self, clustered_trace, bb_func, ctx_reg_size, title='Clustering Analysis Result (legacy)', save_func=None):
# context should be a dictionary containing the backward traced result of each relevant register
super(ClusterViewer, self).__init__(title)
self.orig_trace = clustered_trace
self.trace = deepcopy(self.orig_trace)
self.bb_func = bb_func
self.ctx_reg_size = ctx_reg_size
self.save = save_func
self.undo_stack = deque([deepcopy(self.trace)], maxlen=3)
def Restore(self):
self.undo_stack = deque([deepcopy(self.trace)], maxlen=3)
self.trace = deepcopy(self.orig_trace)
self.PopulateModel()
def __init__(self, clustered_trace, bb_func, ctx_reg_size, title='Clustering Analysis Result', save_func=None):
# context should be a dictionary containing the backward traced result of each relevant register
super(ClusterViewer, self).__init__(title)
self.orig_trace = clustered_trace
self.trace = deepcopy(self.orig_trace)
self.bb_func = bb_func
self.ctx_reg_size = ctx_reg_size
self.save = save_func
self.undo_stack = deque([deepcopy(self.trace)], maxlen=3)
def test_startbit(self):
u = self.u
u.cntrl._ag.data.extend([(START, 0), (NOP, 0)])
u.clk_cnt_initVal._ag.data.append(4)
self.doSim(600 * Time.ns)
self.assertEqual(u.i2c._ag.bits, deque([I2cAgent.START]))