python类replace()的实例源码

recipe-52217.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def eval_python(self, result):
        code = result.group('code')
        code = string.replace(code, '\t', '    ')
        try:
            if self.local_dict:
                result = eval(code, self.global_dict, self.local_dict)
            else:
                result = eval(code, self.global_dict)
            return str(result)
        except:
            self.errorLogger('\n---- Error parsing: ----\n')
            self.errorLogger(code)
            self.errorLogger('\n------------------------\n')
            raise

#---------------------------------------------------------------------------
# This routine is only called when OUTPUT() is included in executed Python
# code from the templates.  It evaluates its parameter as if it was a
# template and appends the result to the OUTPUT_TEXT variable in the global
# dictionary.
dropbox.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _extract_file_info(self, _file):
        """Extract file information."""
        _file = json.loads(_file)
        info = {'filename': _file['name'],
                'link_raw': string.replace(_file['link'], 'dl=0', 'raw=1'),
                'link': _file['link']}
        if self._is_image_file(_file['name']):
            extra_fields = {'url_m': info['link_raw'],
                            'url_b': info['link_raw'],
                            'title': info['filename']}
            info.update(extra_fields)
        if self._is_video_file(_file['name']):
            url = self._create_raw_cors_link(_file['link'])
            extra_fields = {'video_url': url}
            info.update(extra_fields)
        if self._is_audio_file(_file['name']):
            url = self._create_raw_cors_link(_file['link'])
            extra_fields = {'audio_url': url}
            info.update(extra_fields)
        if self._is_pdf_file(_file['name']):
            url = self._create_raw_cors_link(_file['link'])
            extra_fields = {'pdf_url': url}
            info.update(extra_fields)
        return {'info': info}
test_s3_importer.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def test_tasks_attributes_for_image_files(self):
        #For image file extensions: link, filename, url, url_m, url_b, title
        image_ext = ['png', 'jpg', 'jpeg', 'gif']
        file_data = 'myfile.extension'

        for ext in image_ext:
            data = string.replace(file_data,'extension', ext)
            form_data = {
                'files': [data],
                'bucket': 'mybucket'
            }
            tasks = BulkTaskS3Import(**form_data).tasks()

            assert tasks[0]['info']['filename'] == "myfile.%s" % ext
            assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
            assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
            assert tasks[0]['info']['url_m'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
            assert tasks[0]['info']['url_b'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
            assert tasks[0]['info']['title'] == "myfile.%s" % ext
test_s3_importer.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_tasks_attributes_for_video_files(self):
        #For video file extension: link, filename, url, video_url
        video_ext = ['mp4', 'm4v', 'ogg', 'ogv', 'webm', 'avi']
        file_data = 'myfile.extension'

        for ext in video_ext:
            data = string.replace(file_data,'extension', ext)
            form_data = {
                'files': [data],
                'bucket': 'mybucket'
            }
            tasks = BulkTaskS3Import(**form_data).tasks()

            assert tasks[0]['info']['filename'] == "myfile.%s" % ext
            assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
            assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
            assert tasks[0]['info']['video_url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
test_s3_importer.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def test_tasks_attributes_for_audio_files(self):
        #For audio file extension: link, filename, url, audio_url
        audio_ext = ['mp4', 'm4a', 'mp3', 'ogg', 'oga', 'webm', 'wav']
        file_data = 'myfile.extension'

        for ext in audio_ext:
            data = string.replace(file_data,'extension', ext)
            form_data = {
                'files': [data],
                'bucket': 'mybucket'
            }
            tasks = BulkTaskS3Import(**form_data).tasks()

            assert tasks[0]['info']['filename'] == "myfile.%s" % ext
            assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
            assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
            assert tasks[0]['info']['audio_url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
test_dropbox_importer.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_tasks_attributes_for_video_files(self):
        #For video file extension: link, filename, link_raw, video_url
        video_ext = ['mp4', 'm4v', 'ogg', 'ogv', 'webm', 'avi']
        file_data = (u'{"bytes":286,'
        u'"link":"https://www.dropbox.com/s/l2b77qvlrequ6gl/test.extension?dl=0",'
        u'"name":"test.extension",'
        u'"icon":"https://www.dropbox.com/static/images/icons64/page_white_text.png"}')

        for ext in video_ext:
            data = string.replace(file_data,'extension', ext)
            form_data = {'files': [data]}
            tasks = BulkTaskDropboxImport(**form_data).tasks()

            assert tasks[0]['info']['filename'] == "test.%s" % ext
            assert tasks[0]['info']['link'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?dl=0" % ext
            assert tasks[0]['info']['link_raw'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?raw=1" % ext
            assert tasks[0]['info']['video_url'] == "https://dl.dropboxusercontent.com/s/l2b77qvlrequ6gl/test.%s" % ext
test_dropbox_importer.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_tasks_attributes_for_audio_files(self):
        #For audio file extension: link, filename, link_raw, audio_url
        audio_ext = ['mp4', 'm4a', 'mp3', 'ogg', 'oga', 'webm', 'wav']
        file_data = (u'{"bytes":286,'
        u'"link":"https://www.dropbox.com/s/l2b77qvlrequ6gl/test.extension?dl=0",'
        u'"name":"test.extension",'
        u'"icon":"https://www.dropbox.com/static/images/icons64/page_white_text.png"}')

        for ext in audio_ext:
            data = string.replace(file_data,'extension', ext)
            form_data = {'files': [data]}
            tasks = BulkTaskDropboxImport(**form_data).tasks()

            assert tasks[0]['info']['filename'] == "test.%s" % ext
            assert tasks[0]['info']['link'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?dl=0" % ext
            assert tasks[0]['info']['link_raw'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?raw=1" % ext
            assert tasks[0]['info']['audio_url'] == "https://dl.dropboxusercontent.com/s/l2b77qvlrequ6gl/test.%s" % ext
rwhois.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def ParseWhois_INT(self):
        int_contacts = (
            {"page_field": "Registrant", "rec_field": "registrant"},
            {"page_field": "Administrative Contact", "rec_field": "administrative"},
            {"page_field": "Technical Contact", "rec_field": "technical"})

        page = string.replace(self.page, "\r\n", "\n")
        for contact in int_contacts:
            page_field = contact['page_field']
            s = "%s:(.*)\n\W" % page_field
            m = re.search(s,  page, re.DOTALL)
            #if m: print m.group(1)
            print "-------------------"

##
## ----------------------------------------------------------------------
##




##
## ----------------------------------------------------------------------
##
common.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def strToHex(string):
    """
    @param string: string to be converted into its hexadecimal value.
    @type string: C{str}

    @return: the hexadecimal converted string.
    @rtype: C{str}
    """

    hexStr = ""

    for character in string:
        if character == "\n":
            character = " "

        hexChar = "%2x" % ord(character)
        hexChar = hexChar.replace(" ", "0")
        hexChar = hexChar.upper()

        hexStr += hexChar

    return hexStr
common.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def fileToStr(fileName):
    """
    @param fileName: file path to read the content and return as a no
    NEWLINE string.
    @type fileName: C{file.open}

    @return: the file content as a string without TAB and NEWLINE.
    @rtype: C{str}
    """

    filePointer = open(fileName, "r")
    fileText = filePointer.read()

    fileText = fileText.replace("    ", "")
    fileText = fileText.replace("\t", "")
    fileText = fileText.replace("\r", "")
    fileText = fileText.replace("\n", " ")

    return fileText
c14n.py 文件源码 项目:p2pool-bch 作者: amarian12 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _do_attr(self, n, value):
        ''''_do_attr(self, node) -> None
        Process an attribute.'''

        W = self.write
        W(' ')
        W(n)
        W('="')
        s = string.replace(value, "&", "&")
        s = string.replace(s, "<", "&lt;")
        s = string.replace(s, '"', '&quot;')
        s = string.replace(s, '\011', '&#x9')
        s = string.replace(s, '\012', '&#xA')
        s = string.replace(s, '\015', '&#xD')
        W(s)
        W('"')
c14n.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _do_attr(self, n, value):
        ''''_do_attr(self, node) -> None
        Process an attribute.'''

        W = self.write
        W(' ')
        W(n)
        W('="')
        s = string.replace(value, "&", "&amp;")
        s = string.replace(s, "<", "&lt;")
        s = string.replace(s, '"', '&quot;')
        s = string.replace(s, '\011', '&#x9')
        s = string.replace(s, '\012', '&#xA')
        s = string.replace(s, '\015', '&#xD')
        W(s)
        W('"')
producers.py 文件源码 项目:aquests 作者: hansroh 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def more (self):
        esc_from = self.esc_from
        esc_to   = self.esc_to

        buffer = self.buffer + self.producer.more()

        if buffer:
            buffer = string.replace (buffer, esc_from, esc_to)
            i = self.find_prefix_at_end (buffer, esc_from)
            if i:
                # we found a prefix
                self.buffer = buffer[-i:]
                return buffer[:-i]
            else:
                # no prefix, return it all
                self.buffer = b''
                return buffer
        else:
            return buffer
mmcexec.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def do_put(self, s):
        try:
            params = s.split(' ')
            if len(params) > 1:
                src_path = params[0]
                dst_path = params[1]
            elif len(params) == 1:
                src_path = params[0]
                dst_path = ''

            src_file = os.path.basename(src_path)
            fh = open(src_path, 'rb')
            dst_path = string.replace(dst_path, '/','\\')
            import ntpath
            pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file)
            drive, tail = ntpath.splitdrive(pathname)
            logging.info("Uploading %s to %s" % (src_file, pathname))
            self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read)
            fh.close()
        except Exception, e:
            logging.critical(str(e))
            pass
ntfs-read.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def do_cat(self, line, command = sys.stdout.write):
        pathName = string.replace(line,'/','\\')
        pathName = ntpath.normpath(ntpath.join(self.pwd,pathName))
        res = self.findPathName(pathName)
        if res is None:
            logging.error("Not found!")
            return
        if res.isDirectory() > 0:
            logging.error("It's a directory!")
            return
        if res.isCompressed() or res.isEncrypted() or res.isSparse():
            logging.error('Cannot handle compressed/encrypted/sparse files! :(')
            return
        stream = res.getStream(None)
        chunks = 4096*10
        written = 0
        for i in range(stream.getDataSize()/chunks):
            buf = stream.read(i*chunks, chunks)
            written += len(buf)
            command(buf)
        if stream.getDataSize() % chunks:
            buf = stream.read(written, stream.getDataSize() % chunks)
            command(buf)
        logging.info("%d bytes read" % stream.getDataSize())
wmiexec.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def do_put(self, s):
        try:
            params = s.split(' ')
            if len(params) > 1:
                src_path = params[0]
                dst_path = params[1]
            elif len(params) == 1:
                src_path = params[0]
                dst_path = ''

            src_file = os.path.basename(src_path)
            fh = open(src_path, 'rb')
            dst_path = string.replace(dst_path, '/','\\')
            import ntpath
            pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file)
            drive, tail = ntpath.splitdrive(pathname)
            logging.info("Uploading %s to %s" % (src_file, pathname))
            self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read)
            fh.close()
        except Exception, e:
            logging.critical(str(e))
            pass
goldenPac.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def do_put(self, s):
        try:
            if self.transferClient is None:
                self.connect_transferClient()
            params = s.split(' ')
            if len(params) > 1:
                src_path = params[0]
                dst_path = params[1]
            elif len(params) == 1:
                src_path = params[0]
                dst_path = '/'

            src_file = os.path.basename(src_path)
            fh = open(src_path, 'rb')
            f = dst_path + '/' + src_file
            pathname = string.replace(f,'/','\\')
            logging.info("Uploading %s to %s\%s" % (src_file, self.share, dst_path))
            self.transferClient.putFile(self.share, pathname, fh.read)
            fh.close()
        except Exception, e:
            logging.error(str(e))
            pass

        self.send_data('\r\n')
psexec.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def do_put(self, s):
        try:
            if self.transferClient is None:
                self.connect_transferClient()
            params = s.split(' ')
            if len(params) > 1:
                src_path = params[0]
                dst_path = params[1]
            elif len(params) == 1:
                src_path = params[0]
                dst_path = '/'

            src_file = os.path.basename(src_path)
            fh = open(src_path, 'rb')
            f = dst_path + '/' + src_file
            pathname = string.replace(f,'/','\\')
            logging.info("Uploading %s to %s\%s" % (src_file, self.share, dst_path))
            self.transferClient.putFile(self.share, pathname.decode(sys.stdin.encoding), fh.read)
            fh.close()
        except Exception, e:
            logging.error(str(e))
            pass

        self.send_data('\r\n')
serviceinstall.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def copy_file(self, src, tree, dst):
        LOG.info("Uploading file %s" % dst)
        if isinstance(src, str):
            # We have a filename
            fh = open(src, 'rb')
        else:
            # We have a class instance, it must have a read method
            fh = src
        f = dst
        pathname = string.replace(f,'/','\\')
        try:
            self.connection.putFile(tree, pathname, fh.read)
        except:
            LOG.critical("Error uploading file %s, aborting....." % dst)
            raise
        fh.close()
smb.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def retr_file(self, service, filename, callback, mode = FILE_OPEN, offset = 0, password = None, shareAccessMode = SMB_ACCESS_READ):
        filename = string.replace(filename, '/', '\\')

        fid = -1
        tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
        try:
            fid = self.nt_create_andx(tid, filename, shareAccessMode = shareAccessMode, accessMask = 0x20089)

            res = self.query_file_info(tid, fid)
            datasize = SMBQueryFileStandardInfo(res)['EndOfFile']

            self.__nonraw_retr_file(tid, fid, offset, datasize, callback)
        finally:
            if fid >= 0:
                self.close(tid, fid)
            self.disconnect_tree(tid)
smb.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def check_dir(self, service, path, password = None):
        path = string.replace(path,'/', '\\')
        tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
        try:
            smb = NewSMBPacket()
            smb['Tid'] = tid
            smb['Mid'] = 0

            cmd = SMBCommand(SMB.SMB_COM_CHECK_DIRECTORY)
            cmd['Parameters'] = ''
            cmd['Data'] = SMBCheckDirectory_Data(flags = self.__flags2)
            cmd['Data']['DirectoryName'] = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path
            smb.addCommand(cmd)

            self.sendSMB(smb)

            while 1:
                s = self.recvSMB()
                if s.isValidAnswer(SMB.SMB_COM_CHECK_DIRECTORY):
                    return
        finally:
            self.disconnect_tree(tid)
smb.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def rmdir(self, service, path, password = None):
        path = string.replace(path,'/', '\\')
        # Check that the directory exists
        self.check_dir(service, path, password)

        tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
        try:
            path = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path

            smb = NewSMBPacket()
            smb['Tid'] = tid
            createDir = SMBCommand(SMB.SMB_COM_DELETE_DIRECTORY)
            createDir['Data'] = SMBDeleteDirectory_Data(flags=self.__flags2)
            createDir['Data']['DirectoryName'] = path
            smb.addCommand(createDir)

            self.sendSMB(smb)

            while 1:
                s = self.recvSMB()
                if s.isValidAnswer(SMB.SMB_COM_DELETE_DIRECTORY):
                    return
        finally:
            self.disconnect_tree(tid)
smb.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def mkdir(self, service, path, password = None):
        path = string.replace(path,'/', '\\')
        tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
        try:
            path = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path

            smb = NewSMBPacket()
            smb['Tid'] = tid
            smb['Mid'] = 0

            createDir = SMBCommand(SMB.SMB_COM_CREATE_DIRECTORY)
            createDir['Data'] = SMBCreateDirectory_Data(flags=self.__flags2)
            createDir['Data']['DirectoryName'] = path
            smb.addCommand(createDir)

            self.sendSMB(smb)

            smb = self.recvSMB()
            if smb.isValidAnswer(SMB.SMB_COM_CREATE_DIRECTORY):
                return 1
            return 0
        finally:
            self.disconnect_tree(tid)
smb.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def rename(self, service, old_path, new_path, password = None):
        old_path = string.replace(old_path,'/', '\\')
        new_path = string.replace(new_path,'/', '\\')
        tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
        try:
            smb = NewSMBPacket()
            smb['Tid'] = tid
            smb['Mid'] = 0

            renameCmd = SMBCommand(SMB.SMB_COM_RENAME)
            renameCmd['Parameters'] = SMBRename_Parameters()
            renameCmd['Parameters']['SearchAttributes'] = ATTR_SYSTEM | ATTR_HIDDEN | ATTR_DIRECTORY
            renameCmd['Data'] = SMBRename_Data(flags = self.__flags2)
            renameCmd['Data']['OldFileName'] = old_path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else old_path
            renameCmd['Data']['NewFileName'] = new_path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else new_path
            smb.addCommand(renameCmd)

            self.sendSMB(smb)

            smb = self.recvSMB()
            if smb.isValidAnswer(SMB.SMB_COM_RENAME):
               return 1
            return 0
        finally:
            self.disconnect_tree(tid)
smb3.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def rmdir(self, shareName, pathName, password = None):
        # ToDo: Handle situations where share is password protected
        pathName = string.replace(pathName,'/', '\\')
        pathName = ntpath.normpath(pathName)
        if len(pathName) > 0 and pathName[0] == '\\':
            pathName = pathName[1:]

        treeId = self.connectTree(shareName)

        fileId = None
        try:
            fileId = self.create(treeId, pathName, desiredAccess=DELETE | FILE_READ_ATTRIBUTES | SYNCHRONIZE,
                                 shareMode=FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
                                 creationOptions=FILE_DIRECTORY_FILE | FILE_OPEN_REPARSE_POINT,
                                 creationDisposition=FILE_OPEN, fileAttributes=0)
            from impacket import smb
            delete_req = smb.SMBSetFileDispositionInfo()
            delete_req['DeletePending'] = True
            self.setInfo(treeId, fileId, inputBlob=delete_req, fileInfoClass=SMB2_FILE_DISPOSITION_INFO)
        finally:
            if fileId is not None:
                self.close(treeId, fileId)
            self.disconnectTree(treeId) 

        return True
smb3.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def remove(self, shareName, pathName, password = None):
        # ToDo: Handle situations where share is password protected
        pathName = string.replace(pathName,'/', '\\')
        pathName = ntpath.normpath(pathName)
        if len(pathName) > 0 and pathName[0] == '\\':
            pathName = pathName[1:]

        treeId = self.connectTree(shareName)

        fileId = None
        try:
            fileId = self.create(treeId, pathName,DELETE | FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE, FILE_NON_DIRECTORY_FILE | FILE_DELETE_ON_CLOSE, FILE_OPEN, 0)
        finally:
            if fileId is not None:
                self.close(treeId, fileId)
            self.disconnectTree(treeId) 

        return True
smb3.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def storeFile(self, shareName, path, callback, mode = FILE_OVERWRITE_IF, offset = 0, password = None, shareAccessMode = FILE_SHARE_WRITE):
        # ToDo: Handle situations where share is password protected
        path = string.replace(path,'/', '\\')
        path = ntpath.normpath(path)
        if len(path) > 0 and path[0] == '\\':
            path = path[1:]

        treeId = self.connectTree(shareName)
        fileId = None
        try:
            fileId = self.create(treeId, path, FILE_WRITE_DATA, shareAccessMode, FILE_NON_DIRECTORY_FILE, mode, 0)
            finished = False
            writeOffset = offset
            while not finished:
                data = callback(self._Connection['MaxWriteSize'])
                if len(data) == 0:
                    break
                written = self.write(treeId, fileId, data, writeOffset, len(data))
                writeOffset += written
        finally:
            if fileId is not None:
                self.close(treeId, fileId)
            self.disconnectTree(treeId)
logging_plotting.py 文件源码 项目:merlin 作者: CSTR-Edinburgh 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def save_plot(self,plot_name,**kwargs):
        logger = logging.getLogger("plotting")
        if plot_name not in self.plots:
            logger.warn('Tried to generate a plot called %s that does not exist' % plot_name)
            # raise an exception here?
        else:
            # # the filename to save to is known by the handler, which needs to be assigned to this logger
            # # look at the handlers attached to this logger instance
            # ph=None
            # for h in self.handlers:
            #     # we want an instance of a PlotHandler - we'll take the first one we find
            #     # (behaviour will be unpredictable if there is more than one handler of this type)
            #     if isinstance(h,PlotHandler):
            #         ph=h
            #         break
            # if ph:
            # TO DO - need to be sure of safe file names
            if not os.path.isdir(self.plot_path):
                os.makedirs(self.plot_path)
            filename = self.plot_path + "/" + string.replace(plot_name, " ", "_") + ".pdf"
            logger.info('Generating a plot in file %s' % filename)
            self.plots[plot_name].generate_plot(filename,**kwargs)
            # else:
            #     logger.warn('No handler of type PlotHandler is attached to this logger - cannot save plots')
c14n.py 文件源码 项目:p2pool-unitus 作者: amarian12 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _do_attr(self, n, value):
        ''''_do_attr(self, node) -> None
        Process an attribute.'''

        W = self.write
        W(' ')
        W(n)
        W('="')
        s = string.replace(value, "&", "&amp;")
        s = string.replace(s, "<", "&lt;")
        s = string.replace(s, '"', '&quot;')
        s = string.replace(s, '\011', '&#x9')
        s = string.replace(s, '\012', '&#xA')
        s = string.replace(s, '\015', '&#xD')
        W(s)
        W('"')
st.py 文件源码 项目:WebExploitationTool 作者: AutoSecTools 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def parseResp(self, resp):
        def fan(v):
            def var_func_00000026(x):
                return [x, (x.upper)(), (x.lower)()]

            return distinct((lambda var_0000002B:selectMany(var_func_00000026, var_0000002B))([v, ((v.replace)("\\", "\\\\").replace)("\"", "\\\"")]))

        def split(v, t):
            for tag in fan(t):
                if (len(v) != 2):
                    v = (v[0].split)(tag)
                else:
                    break
            return v

        e = ((self.injector).emitter)
        if ((e.prefix) == None):
            return resp
        p = split([resp], (e.prefix))
        if (len(p) < 2):
            return resp
        return split([p[1]], (e.suffix))[0]


问题


面经


文章

微信
公众号

扫码关注公众号