def detectValidExtensions(self,extensions,maxN,extList=None) :
self.logger.info("### Starting detection of valid extensions ...")
n = 0
if extList :
tmpExtList = []
for e in extList :
tmpExtList.append((e,getMime(extensions,e)))
else :
tmpExtList = extensions
validExtensions = []
extensionsToTest = tmpExtList[0:maxN]
with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor :
futures = []
try :
for ext in extensionsToTest:
f = executor.submit(self.uploadFile,"."+ext[0],ext[1],os.urandom(self.size))
f.ext = ext
f.add_done_callback(self.detectValidExtension)
futures.append(f)
for future in concurrent.futures.as_completed(futures) :
a = future.result()
n += 1
except KeyboardInterrupt :
self.shouldLog = False
executor.shutdown(wait=False)
self.stopThreads = True
executor._threads.clear()
concurrent.futures.thread._threads_queues.clear()
return n
#detects if code execution is gained, given an url to request and a regex supposed to match the executed code output
评论列表
文章目录