def __init__(self, attachment_file, mime_type, attachment_filename=None):
"""
:param attachment_file ?????????????file????StringIO???
????????????????????
:param mime_type ???mime type???application/octet-stream
:param attachment_filename ?????????
"""
if attachment_filename is None:
if isinstance(attachment_file, types.StringTypes):
self._attachment_filename = os.path.split(
attachment_file)[1]
elif isinstance(attachment_file, types.FileType):
self._attachment_filename = os.path.split(
attachment_file.name)[1]
else:
raise InvalidArgumentException(
u"????attachement_filename?????????")
python类FileType()的实例源码
def save(self, width='100%', height='100%'):
""" Write the XML string to **filename**. """
test = False
import io
# Fix height and width
self['height'] = height
self['width'] = width
if sys.version_info[0] == 2:
test = type(self.filename) == types.FileType or type(self.filename) == types.InstanceType
elif sys.version_info[0] == 3:
test = type(self.filename) == io.TextIOWrapper
if test:
self.write(self.filename)
else:
fileobj = io.open(str(self.filename), mode='w', encoding='utf-8')
self.write(fileobj)
fileobj.close()
############################################################################
def __init__(self, basis_file, delta_file):
"""PatchedFile initializer - call with basis delta
Here basis_file must be a true Python file, because we may
need to seek() around in it a lot, and this is done in C.
delta_file only needs read() and close() methods.
"""
LikeFile.__init__(self, delta_file)
if not isinstance(basis_file, types.FileType):
""" tempfile.TemporaryFile() only guarantees a true file
object on posix platforms. on cygwin/windows a file-like
object whose file attribute is the underlying true file
object is returned.
"""
if hasattr(basis_file, 'file') and isinstance(basis_file.file, types.FileType):
basis_file = basis_file.file
else:
raise TypeError(_("basis_file must be a (true) file or an object whose "
"file attribute is the underlying true file object"))
try:
self.maker = _librsync.new_patchmaker(basis_file)
except _librsync.librsyncError as e:
raise librsyncError(str(e))
def is_seq(seq):
"""Test if `seq` is some kind of sequence, based on calling iter(seq), i.e.
if the object is iterable.
Exclude cases which are iterable but that we still don't like:
StringTypes = StringType + UnicodeType
FileType
UnicodeType is for lists of unicode strings [u'aaa', u'bbb']. In fact, we
wish to catch list, tuple, numpy array.
Parameters
----------
seq : (nested) sequence of arbitrary objects
"""
if isinstance(seq, types.StringTypes) or \
isinstance(seq, types.FileType):
return False
else:
try:
x=iter(seq)
return True
except:
return False
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def _gen_payload(self):
# ???????
if isinstance(self._attachment_file, types.FileType):
try:
return self._attachment_file.read()
finally:
self._attachment_file.close()
# ????????
elif isinstance(self._attachment_file, types.StringTypes):
with open(self._attachment_file, "r") as f:
return f.read()
# StringIO or cStringIO
else:
self._attachment_file.seek(0)
return self._attachment_file.read()
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not hasattr(sys.stdin, "isatty"):
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not hasattr(sys.stdin, "isatty"):
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not hasattr(sys.stdin, "isatty"):
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not hasattr(sys.stdin, "isatty"):
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def __init__(self,file,skey='END',struct=0,show=0,check=0, verbose=0, mode=1):
"""
"""
self.verbose = int(verbose)
self.nbytes = 0l # number of bytes read so far
self.POS = [] # position of headers
self.SIZE = []
self.datasum = [] # datasum of headers if check!=0
self.show = int(show) # print the header if show!=0
self.struct = int(struct) # examine the structure of the file
self.check = int(check) # calculate datasums
self.Extension = [] # list of HeadDict instances
self.Mode = mode # if 0 it is assumed that the input does
# not contain data (.hdr file)
self.KKeys = ['SIMPLE','EXTEND','NAXIS[0-9]{0,2}','BITPIX','XTENSION', 'END',]
if skey != 'END': self.KKeys.append(skey)
if type(file) == types.StringType:
(self.fd, self.size) = self.openFile(file)
if self.size == -1:
errMsg = "*** File %s does not exists ****" % file
raise Exception, errMsg
elif type(file) == types.FileType: # if fd != 0 we assume that a file object is passed
(self.fd, self.size) = (file, file.tell())
self.ID = self.fd.name
self.name = self.fd.name
elif type(file).__name__ == 'StringI': # this is a cStringIO object
(self.fd, self.size) = (file, file.tell())
self.ID = ""
self.name = ""
else:
errMsg = "Invalid type passed to file parameter during __init__"
raise Exception, errMsg
self.HEAD = [] # list of list(s) of header cards
self.analyzeStruct()
def to(self, *args):
'''
get/set the 'device' to which messages are sent.
Valid targets are:
string filenames: '/tmp/test.log'
remote hosts: 'pretoria:1701'
system devices: sys.stdout, sys.stderr
special names: 'stdout'
file handles: open('/tmp/test.log')
'''
if len(args):
self._logFile = args[0]
if self._logHandle and self._logHandle != sys.stdout:
self._logHandle.close()
# if type(self._logFile) is types.FileType:
if isinstance(self._logFile, IOBase):
self._logHandle = self._logFile
elif self._logFile == 'stdout':
self._logHandle = sys.stdout
elif self.socket_parse(self._logFile):
self._logHandle = C_dgmsocket(
self._socketRemote,
int(self._socketPort))
else:
self._logHandle = open(self._logFile, "a")
self._sys_stdout = self._logHandle
else:
return self._logFile
def isNonPrimitiveInstance(x):
return isinstance(x,types.InstanceType) or not isinstance(x,(float,int,long,type,tuple,list,dict,bool,unicode,str,buffer,complex,slice,types.NoneType,
types.FunctionType,types.LambdaType,types.CodeType,types.GeneratorType,
types.ClassType,types.UnboundMethodType,types.MethodType,types.BuiltinFunctionType,
types.BuiltinMethodType,types.ModuleType,types.FileType,types.XRangeType,
types.TracebackType,types.FrameType,types.EllipsisType,types.DictProxyType,
types.NotImplementedType,types.GetSetDescriptorType,types.MemberDescriptorType
))
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not hasattr(sys.stdin, "isatty"):
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def __init__(self, basis_file, delta_file):
"""PatchedFile initializer - call with basis delta
Here basis_file must be a true Python file, because we may
need to seek() around in it a lot, and this is done in C.
delta_file only needs read() and close() methods.
"""
LikeFile.__init__(self, delta_file)
if hasattr(basis_file, 'file'):
basis_file = basis_file.file
if type(basis_file) is not types.FileType:
raise TypeError("basis_file must be a (true) file")
try: self.maker = _librsync.new_patchmaker(basis_file)
except _librsync.librsyncError, e: raise librsyncError(str(e))
def _install_fake_file(config, python_lib_paths, path_override_hook):
"""Install a stub file implementation to enforce sandbox rules."""
stubs.FakeFile.set_allowed_paths(config.application_root,
python_lib_paths[1:] +
path_override_hook.extra_accessible_paths)
stubs.FakeFile.set_skip_files(config.skip_files)
stubs.FakeFile.set_static_files(config.static_files)
__builtin__.file = stubs.FakeFile
__builtin__.open = stubs.FakeFile
types.FileType = stubs.FakeFile
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not hasattr(sys.stdin, "isatty"):
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not hasattr(sys.stdin, "isatty"):
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def put(self, file='', content_type='', content_enc='',
isbin=re.compile(r'[\000-\006\177-\277]').search,
**kw):
headers = self.__get_headers(kw)
filetype = type(file)
if isinstance(filetype, str) and (isbin(file) is None) and \
os.path.exists(file):
ob = open(file, 'rb')
body = ob.read()
ob.close()
c_type, c_enc = guess_type(file)
elif filetype is FileType:
body = file.read()
c_type, c_enc = guess_type(file.name)
elif isinstance(filetype, str):
body = file
c_type, c_enc = guess_type(self.url)
else:
raise ValueError('File must be a filename, file or string.')
content_type = content_type or c_type
content_enc = content_enc or c_enc
if content_type:
headers['Content-Type'] = content_type
if content_enc:
headers['Content-Encoding'] = content_enc
headers['Content-Length'] = str(len(body))
return self.__snd_request('PUT', self.uri, headers, body)
def objgrep(start, goal, eq=isLike, path='', paths=None, seen=None, showUnknowns=0, maxDepth=None):
'''An insanely CPU-intensive process for finding stuff.
'''
if paths is None:
paths = []
if seen is None:
seen = {}
if eq(start, goal):
paths.append(path)
if seen.has_key(id(start)):
if seen[id(start)] is start:
return
if maxDepth is not None:
if maxDepth == 0:
return
maxDepth -= 1
seen[id(start)] = start
if isinstance(start, types.DictionaryType):
r = []
for k, v in start.items():
objgrep(k, goal, eq, path+'{'+repr(v)+'}', paths, seen, showUnknowns, maxDepth)
objgrep(v, goal, eq, path+'['+repr(k)+']', paths, seen, showUnknowns, maxDepth)
elif isinstance(start, types.ListType) or isinstance(start, types.TupleType):
for idx in xrange(len(start)):
objgrep(start[idx], goal, eq, path+'['+str(idx)+']', paths, seen, showUnknowns, maxDepth)
elif isinstance(start, types.MethodType):
objgrep(start.im_self, goal, eq, path+'.im_self', paths, seen, showUnknowns, maxDepth)
objgrep(start.im_func, goal, eq, path+'.im_func', paths, seen, showUnknowns, maxDepth)
objgrep(start.im_class, goal, eq, path+'.im_class', paths, seen, showUnknowns, maxDepth)
elif hasattr(start, '__dict__'):
for k, v in start.__dict__.items():
objgrep(v, goal, eq, path+'.'+k, paths, seen, showUnknowns, maxDepth)
if isinstance(start, types.InstanceType):
objgrep(start.__class__, goal, eq, path+'.__class__', paths, seen, showUnknowns, maxDepth)
elif isinstance(start, weakref.ReferenceType):
objgrep(start(), goal, eq, path+'()', paths, seen, showUnknowns, maxDepth)
elif (isinstance(start, types.StringTypes+
(types.IntType, types.FunctionType,
types.BuiltinMethodType, RegexType, types.FloatType,
types.NoneType, types.FileType)) or
type(start).__name__ in ('wrapper_descriptor', 'method_descriptor',
'member_descriptor', 'getset_descriptor')):
pass
elif showUnknowns:
print 'unknown type', type(start), start
return paths
def objgrep(start, goal, eq=isLike, path='', paths=None, seen=None, showUnknowns=0, maxDepth=None):
'''An insanely CPU-intensive process for finding stuff.
'''
if paths is None:
paths = []
if seen is None:
seen = {}
if eq(start, goal):
paths.append(path)
if seen.has_key(id(start)):
if seen[id(start)] is start:
return
if maxDepth is not None:
if maxDepth == 0:
return
maxDepth -= 1
seen[id(start)] = start
if isinstance(start, types.DictionaryType):
r = []
for k, v in start.items():
objgrep(k, goal, eq, path+'{'+repr(v)+'}', paths, seen, showUnknowns, maxDepth)
objgrep(v, goal, eq, path+'['+repr(k)+']', paths, seen, showUnknowns, maxDepth)
elif isinstance(start, types.ListType) or isinstance(start, types.TupleType):
for idx in xrange(len(start)):
objgrep(start[idx], goal, eq, path+'['+str(idx)+']', paths, seen, showUnknowns, maxDepth)
elif isinstance(start, types.MethodType):
objgrep(start.im_self, goal, eq, path+'.im_self', paths, seen, showUnknowns, maxDepth)
objgrep(start.im_func, goal, eq, path+'.im_func', paths, seen, showUnknowns, maxDepth)
objgrep(start.im_class, goal, eq, path+'.im_class', paths, seen, showUnknowns, maxDepth)
elif hasattr(start, '__dict__'):
for k, v in start.__dict__.items():
objgrep(v, goal, eq, path+'.'+k, paths, seen, showUnknowns, maxDepth)
if isinstance(start, types.InstanceType):
objgrep(start.__class__, goal, eq, path+'.__class__', paths, seen, showUnknowns, maxDepth)
elif isinstance(start, weakref.ReferenceType):
objgrep(start(), goal, eq, path+'()', paths, seen, showUnknowns, maxDepth)
elif (isinstance(start, types.StringTypes+
(types.IntType, types.FunctionType,
types.BuiltinMethodType, RegexType, types.FloatType,
types.NoneType, types.FileType)) or
type(start).__name__ in ('wrapper_descriptor', 'method_descriptor',
'member_descriptor', 'getset_descriptor')):
pass
elif showUnknowns:
print 'unknown type', type(start), start
return paths
def csv(self, csv=None):
"""
Create CSV output from projects
Keyword arguments:
csv -- string, filename to save to OR file object OR None
"""
if len(self.tasks) == 0:
__LOG__.warning('** Empty project : {0}'.format(self.name))
return
if csv is not None:
csv_text = bytes.decode(codecs.BOM_UTF8, 'utf-8')
csv_text += '"State";"Task Name";"Start date";"End date";"Duration";"Resources";\r\n'
else:
csv_text = ''
for t in self.tasks:
c = t.csv()
if c is not None:
if sys.version_info[0] == 2:
try:
c = unicode(c, "utf-8")
except TypeError:
pass
csv_text += c
elif sys.version_info[0] == 3:
csv_text += c
else:
csv_text += c
if csv is not None:
test = False
import io
if sys.version_info[0] == 2:
test = type(csv) == types.FileType or type(csv) == types.InstanceType
elif sys.version_info[0] == 3:
test = type(csv) == io.TextIOWrapper
if test:
csv.write(csv_text)
else:
fileobj = io.open(csv, mode='w', encoding='utf-8')
fileobj.write(csv_text)
fileobj.close()
return csv_text
# MAIN -------------------