def eval_python(self, result):
code = result.group('code')
code = string.replace(code, '\t', ' ')
try:
if self.local_dict:
result = eval(code, self.global_dict, self.local_dict)
else:
result = eval(code, self.global_dict)
return str(result)
except:
self.errorLogger('\n---- Error parsing: ----\n')
self.errorLogger(code)
self.errorLogger('\n------------------------\n')
raise
#---------------------------------------------------------------------------
# This routine is only called when OUTPUT() is included in executed Python
# code from the templates. It evaluates its parameter as if it was a
# template and appends the result to the OUTPUT_TEXT variable in the global
# dictionary.
python类replace()的实例源码
def _extract_file_info(self, _file):
"""Extract file information."""
_file = json.loads(_file)
info = {'filename': _file['name'],
'link_raw': string.replace(_file['link'], 'dl=0', 'raw=1'),
'link': _file['link']}
if self._is_image_file(_file['name']):
extra_fields = {'url_m': info['link_raw'],
'url_b': info['link_raw'],
'title': info['filename']}
info.update(extra_fields)
if self._is_video_file(_file['name']):
url = self._create_raw_cors_link(_file['link'])
extra_fields = {'video_url': url}
info.update(extra_fields)
if self._is_audio_file(_file['name']):
url = self._create_raw_cors_link(_file['link'])
extra_fields = {'audio_url': url}
info.update(extra_fields)
if self._is_pdf_file(_file['name']):
url = self._create_raw_cors_link(_file['link'])
extra_fields = {'pdf_url': url}
info.update(extra_fields)
return {'info': info}
def test_tasks_attributes_for_image_files(self):
#For image file extensions: link, filename, url, url_m, url_b, title
image_ext = ['png', 'jpg', 'jpeg', 'gif']
file_data = 'myfile.extension'
for ext in image_ext:
data = string.replace(file_data,'extension', ext)
form_data = {
'files': [data],
'bucket': 'mybucket'
}
tasks = BulkTaskS3Import(**form_data).tasks()
assert tasks[0]['info']['filename'] == "myfile.%s" % ext
assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url_m'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url_b'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['title'] == "myfile.%s" % ext
def test_tasks_attributes_for_video_files(self):
#For video file extension: link, filename, url, video_url
video_ext = ['mp4', 'm4v', 'ogg', 'ogv', 'webm', 'avi']
file_data = 'myfile.extension'
for ext in video_ext:
data = string.replace(file_data,'extension', ext)
form_data = {
'files': [data],
'bucket': 'mybucket'
}
tasks = BulkTaskS3Import(**form_data).tasks()
assert tasks[0]['info']['filename'] == "myfile.%s" % ext
assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['video_url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
def test_tasks_attributes_for_audio_files(self):
#For audio file extension: link, filename, url, audio_url
audio_ext = ['mp4', 'm4a', 'mp3', 'ogg', 'oga', 'webm', 'wav']
file_data = 'myfile.extension'
for ext in audio_ext:
data = string.replace(file_data,'extension', ext)
form_data = {
'files': [data],
'bucket': 'mybucket'
}
tasks = BulkTaskS3Import(**form_data).tasks()
assert tasks[0]['info']['filename'] == "myfile.%s" % ext
assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['audio_url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
def test_tasks_attributes_for_video_files(self):
#For video file extension: link, filename, link_raw, video_url
video_ext = ['mp4', 'm4v', 'ogg', 'ogv', 'webm', 'avi']
file_data = (u'{"bytes":286,'
u'"link":"https://www.dropbox.com/s/l2b77qvlrequ6gl/test.extension?dl=0",'
u'"name":"test.extension",'
u'"icon":"https://www.dropbox.com/static/images/icons64/page_white_text.png"}')
for ext in video_ext:
data = string.replace(file_data,'extension', ext)
form_data = {'files': [data]}
tasks = BulkTaskDropboxImport(**form_data).tasks()
assert tasks[0]['info']['filename'] == "test.%s" % ext
assert tasks[0]['info']['link'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?dl=0" % ext
assert tasks[0]['info']['link_raw'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?raw=1" % ext
assert tasks[0]['info']['video_url'] == "https://dl.dropboxusercontent.com/s/l2b77qvlrequ6gl/test.%s" % ext
def test_tasks_attributes_for_audio_files(self):
#For audio file extension: link, filename, link_raw, audio_url
audio_ext = ['mp4', 'm4a', 'mp3', 'ogg', 'oga', 'webm', 'wav']
file_data = (u'{"bytes":286,'
u'"link":"https://www.dropbox.com/s/l2b77qvlrequ6gl/test.extension?dl=0",'
u'"name":"test.extension",'
u'"icon":"https://www.dropbox.com/static/images/icons64/page_white_text.png"}')
for ext in audio_ext:
data = string.replace(file_data,'extension', ext)
form_data = {'files': [data]}
tasks = BulkTaskDropboxImport(**form_data).tasks()
assert tasks[0]['info']['filename'] == "test.%s" % ext
assert tasks[0]['info']['link'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?dl=0" % ext
assert tasks[0]['info']['link_raw'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?raw=1" % ext
assert tasks[0]['info']['audio_url'] == "https://dl.dropboxusercontent.com/s/l2b77qvlrequ6gl/test.%s" % ext
def ParseWhois_INT(self):
int_contacts = (
{"page_field": "Registrant", "rec_field": "registrant"},
{"page_field": "Administrative Contact", "rec_field": "administrative"},
{"page_field": "Technical Contact", "rec_field": "technical"})
page = string.replace(self.page, "\r\n", "\n")
for contact in int_contacts:
page_field = contact['page_field']
s = "%s:(.*)\n\W" % page_field
m = re.search(s, page, re.DOTALL)
#if m: print m.group(1)
print "-------------------"
##
## ----------------------------------------------------------------------
##
##
## ----------------------------------------------------------------------
##
def strToHex(string):
"""
@param string: string to be converted into its hexadecimal value.
@type string: C{str}
@return: the hexadecimal converted string.
@rtype: C{str}
"""
hexStr = ""
for character in string:
if character == "\n":
character = " "
hexChar = "%2x" % ord(character)
hexChar = hexChar.replace(" ", "0")
hexChar = hexChar.upper()
hexStr += hexChar
return hexStr
def fileToStr(fileName):
"""
@param fileName: file path to read the content and return as a no
NEWLINE string.
@type fileName: C{file.open}
@return: the file content as a string without TAB and NEWLINE.
@rtype: C{str}
"""
filePointer = open(fileName, "r")
fileText = filePointer.read()
fileText = fileText.replace(" ", "")
fileText = fileText.replace("\t", "")
fileText = fileText.replace("\r", "")
fileText = fileText.replace("\n", " ")
return fileText
def _do_attr(self, n, value):
''''_do_attr(self, node) -> None
Process an attribute.'''
W = self.write
W(' ')
W(n)
W('="')
s = string.replace(value, "&", "&")
s = string.replace(s, "<", "<")
s = string.replace(s, '"', '"')
s = string.replace(s, '\011', '	')
s = string.replace(s, '\012', '
')
s = string.replace(s, '\015', '
')
W(s)
W('"')
def _do_attr(self, n, value):
''''_do_attr(self, node) -> None
Process an attribute.'''
W = self.write
W(' ')
W(n)
W('="')
s = string.replace(value, "&", "&")
s = string.replace(s, "<", "<")
s = string.replace(s, '"', '"')
s = string.replace(s, '\011', '	')
s = string.replace(s, '\012', '
')
s = string.replace(s, '\015', '
')
W(s)
W('"')
def more (self):
esc_from = self.esc_from
esc_to = self.esc_to
buffer = self.buffer + self.producer.more()
if buffer:
buffer = string.replace (buffer, esc_from, esc_to)
i = self.find_prefix_at_end (buffer, esc_from)
if i:
# we found a prefix
self.buffer = buffer[-i:]
return buffer[:-i]
else:
# no prefix, return it all
self.buffer = b''
return buffer
else:
return buffer
def do_put(self, s):
try:
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = ''
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
dst_path = string.replace(dst_path, '/','\\')
import ntpath
pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file)
drive, tail = ntpath.splitdrive(pathname)
logging.info("Uploading %s to %s" % (src_file, pathname))
self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read)
fh.close()
except Exception, e:
logging.critical(str(e))
pass
def do_cat(self, line, command = sys.stdout.write):
pathName = string.replace(line,'/','\\')
pathName = ntpath.normpath(ntpath.join(self.pwd,pathName))
res = self.findPathName(pathName)
if res is None:
logging.error("Not found!")
return
if res.isDirectory() > 0:
logging.error("It's a directory!")
return
if res.isCompressed() or res.isEncrypted() or res.isSparse():
logging.error('Cannot handle compressed/encrypted/sparse files! :(')
return
stream = res.getStream(None)
chunks = 4096*10
written = 0
for i in range(stream.getDataSize()/chunks):
buf = stream.read(i*chunks, chunks)
written += len(buf)
command(buf)
if stream.getDataSize() % chunks:
buf = stream.read(written, stream.getDataSize() % chunks)
command(buf)
logging.info("%d bytes read" % stream.getDataSize())
def do_put(self, s):
try:
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = ''
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
dst_path = string.replace(dst_path, '/','\\')
import ntpath
pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file)
drive, tail = ntpath.splitdrive(pathname)
logging.info("Uploading %s to %s" % (src_file, pathname))
self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read)
fh.close()
except Exception, e:
logging.critical(str(e))
pass
def do_put(self, s):
try:
if self.transferClient is None:
self.connect_transferClient()
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = '/'
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
f = dst_path + '/' + src_file
pathname = string.replace(f,'/','\\')
logging.info("Uploading %s to %s\%s" % (src_file, self.share, dst_path))
self.transferClient.putFile(self.share, pathname, fh.read)
fh.close()
except Exception, e:
logging.error(str(e))
pass
self.send_data('\r\n')
def do_put(self, s):
try:
if self.transferClient is None:
self.connect_transferClient()
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = '/'
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
f = dst_path + '/' + src_file
pathname = string.replace(f,'/','\\')
logging.info("Uploading %s to %s\%s" % (src_file, self.share, dst_path))
self.transferClient.putFile(self.share, pathname.decode(sys.stdin.encoding), fh.read)
fh.close()
except Exception, e:
logging.error(str(e))
pass
self.send_data('\r\n')
def copy_file(self, src, tree, dst):
LOG.info("Uploading file %s" % dst)
if isinstance(src, str):
# We have a filename
fh = open(src, 'rb')
else:
# We have a class instance, it must have a read method
fh = src
f = dst
pathname = string.replace(f,'/','\\')
try:
self.connection.putFile(tree, pathname, fh.read)
except:
LOG.critical("Error uploading file %s, aborting....." % dst)
raise
fh.close()
def retr_file(self, service, filename, callback, mode = FILE_OPEN, offset = 0, password = None, shareAccessMode = SMB_ACCESS_READ):
filename = string.replace(filename, '/', '\\')
fid = -1
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
fid = self.nt_create_andx(tid, filename, shareAccessMode = shareAccessMode, accessMask = 0x20089)
res = self.query_file_info(tid, fid)
datasize = SMBQueryFileStandardInfo(res)['EndOfFile']
self.__nonraw_retr_file(tid, fid, offset, datasize, callback)
finally:
if fid >= 0:
self.close(tid, fid)
self.disconnect_tree(tid)
def check_dir(self, service, path, password = None):
path = string.replace(path,'/', '\\')
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
smb = NewSMBPacket()
smb['Tid'] = tid
smb['Mid'] = 0
cmd = SMBCommand(SMB.SMB_COM_CHECK_DIRECTORY)
cmd['Parameters'] = ''
cmd['Data'] = SMBCheckDirectory_Data(flags = self.__flags2)
cmd['Data']['DirectoryName'] = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path
smb.addCommand(cmd)
self.sendSMB(smb)
while 1:
s = self.recvSMB()
if s.isValidAnswer(SMB.SMB_COM_CHECK_DIRECTORY):
return
finally:
self.disconnect_tree(tid)
def rmdir(self, service, path, password = None):
path = string.replace(path,'/', '\\')
# Check that the directory exists
self.check_dir(service, path, password)
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
path = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path
smb = NewSMBPacket()
smb['Tid'] = tid
createDir = SMBCommand(SMB.SMB_COM_DELETE_DIRECTORY)
createDir['Data'] = SMBDeleteDirectory_Data(flags=self.__flags2)
createDir['Data']['DirectoryName'] = path
smb.addCommand(createDir)
self.sendSMB(smb)
while 1:
s = self.recvSMB()
if s.isValidAnswer(SMB.SMB_COM_DELETE_DIRECTORY):
return
finally:
self.disconnect_tree(tid)
def mkdir(self, service, path, password = None):
path = string.replace(path,'/', '\\')
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
path = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path
smb = NewSMBPacket()
smb['Tid'] = tid
smb['Mid'] = 0
createDir = SMBCommand(SMB.SMB_COM_CREATE_DIRECTORY)
createDir['Data'] = SMBCreateDirectory_Data(flags=self.__flags2)
createDir['Data']['DirectoryName'] = path
smb.addCommand(createDir)
self.sendSMB(smb)
smb = self.recvSMB()
if smb.isValidAnswer(SMB.SMB_COM_CREATE_DIRECTORY):
return 1
return 0
finally:
self.disconnect_tree(tid)
def rename(self, service, old_path, new_path, password = None):
old_path = string.replace(old_path,'/', '\\')
new_path = string.replace(new_path,'/', '\\')
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
smb = NewSMBPacket()
smb['Tid'] = tid
smb['Mid'] = 0
renameCmd = SMBCommand(SMB.SMB_COM_RENAME)
renameCmd['Parameters'] = SMBRename_Parameters()
renameCmd['Parameters']['SearchAttributes'] = ATTR_SYSTEM | ATTR_HIDDEN | ATTR_DIRECTORY
renameCmd['Data'] = SMBRename_Data(flags = self.__flags2)
renameCmd['Data']['OldFileName'] = old_path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else old_path
renameCmd['Data']['NewFileName'] = new_path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else new_path
smb.addCommand(renameCmd)
self.sendSMB(smb)
smb = self.recvSMB()
if smb.isValidAnswer(SMB.SMB_COM_RENAME):
return 1
return 0
finally:
self.disconnect_tree(tid)
def rmdir(self, shareName, pathName, password = None):
# ToDo: Handle situations where share is password protected
pathName = string.replace(pathName,'/', '\\')
pathName = ntpath.normpath(pathName)
if len(pathName) > 0 and pathName[0] == '\\':
pathName = pathName[1:]
treeId = self.connectTree(shareName)
fileId = None
try:
fileId = self.create(treeId, pathName, desiredAccess=DELETE | FILE_READ_ATTRIBUTES | SYNCHRONIZE,
shareMode=FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
creationOptions=FILE_DIRECTORY_FILE | FILE_OPEN_REPARSE_POINT,
creationDisposition=FILE_OPEN, fileAttributes=0)
from impacket import smb
delete_req = smb.SMBSetFileDispositionInfo()
delete_req['DeletePending'] = True
self.setInfo(treeId, fileId, inputBlob=delete_req, fileInfoClass=SMB2_FILE_DISPOSITION_INFO)
finally:
if fileId is not None:
self.close(treeId, fileId)
self.disconnectTree(treeId)
return True
def remove(self, shareName, pathName, password = None):
# ToDo: Handle situations where share is password protected
pathName = string.replace(pathName,'/', '\\')
pathName = ntpath.normpath(pathName)
if len(pathName) > 0 and pathName[0] == '\\':
pathName = pathName[1:]
treeId = self.connectTree(shareName)
fileId = None
try:
fileId = self.create(treeId, pathName,DELETE | FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE, FILE_NON_DIRECTORY_FILE | FILE_DELETE_ON_CLOSE, FILE_OPEN, 0)
finally:
if fileId is not None:
self.close(treeId, fileId)
self.disconnectTree(treeId)
return True
def storeFile(self, shareName, path, callback, mode = FILE_OVERWRITE_IF, offset = 0, password = None, shareAccessMode = FILE_SHARE_WRITE):
# ToDo: Handle situations where share is password protected
path = string.replace(path,'/', '\\')
path = ntpath.normpath(path)
if len(path) > 0 and path[0] == '\\':
path = path[1:]
treeId = self.connectTree(shareName)
fileId = None
try:
fileId = self.create(treeId, path, FILE_WRITE_DATA, shareAccessMode, FILE_NON_DIRECTORY_FILE, mode, 0)
finished = False
writeOffset = offset
while not finished:
data = callback(self._Connection['MaxWriteSize'])
if len(data) == 0:
break
written = self.write(treeId, fileId, data, writeOffset, len(data))
writeOffset += written
finally:
if fileId is not None:
self.close(treeId, fileId)
self.disconnectTree(treeId)
def save_plot(self,plot_name,**kwargs):
logger = logging.getLogger("plotting")
if plot_name not in self.plots:
logger.warn('Tried to generate a plot called %s that does not exist' % plot_name)
# raise an exception here?
else:
# # the filename to save to is known by the handler, which needs to be assigned to this logger
# # look at the handlers attached to this logger instance
# ph=None
# for h in self.handlers:
# # we want an instance of a PlotHandler - we'll take the first one we find
# # (behaviour will be unpredictable if there is more than one handler of this type)
# if isinstance(h,PlotHandler):
# ph=h
# break
# if ph:
# TO DO - need to be sure of safe file names
if not os.path.isdir(self.plot_path):
os.makedirs(self.plot_path)
filename = self.plot_path + "/" + string.replace(plot_name, " ", "_") + ".pdf"
logger.info('Generating a plot in file %s' % filename)
self.plots[plot_name].generate_plot(filename,**kwargs)
# else:
# logger.warn('No handler of type PlotHandler is attached to this logger - cannot save plots')
def _do_attr(self, n, value):
''''_do_attr(self, node) -> None
Process an attribute.'''
W = self.write
W(' ')
W(n)
W('="')
s = string.replace(value, "&", "&")
s = string.replace(s, "<", "<")
s = string.replace(s, '"', '"')
s = string.replace(s, '\011', '	')
s = string.replace(s, '\012', '
')
s = string.replace(s, '\015', '
')
W(s)
W('"')
def parseResp(self, resp):
def fan(v):
def var_func_00000026(x):
return [x, (x.upper)(), (x.lower)()]
return distinct((lambda var_0000002B:selectMany(var_func_00000026, var_0000002B))([v, ((v.replace)("\\", "\\\\").replace)("\"", "\\\"")]))
def split(v, t):
for tag in fan(t):
if (len(v) != 2):
v = (v[0].split)(tag)
else:
break
return v
e = ((self.injector).emitter)
if ((e.prefix) == None):
return resp
p = split([resp], (e.prefix))
if (len(p) < 2):
return resp
return split([p[1]], (e.suffix))[0]