def get_mimetype(file_path):
mimetypes.init()
return mimetypes.guess_type(file_path)[0]
python类init()的实例源码
def setUp(self):
# ensure all entries actually come from the Windows registry
self.original_types_map = mimetypes.types_map.copy()
mimetypes.types_map.clear()
mimetypes.init()
self.db = mimetypes.MimeTypes()
def setUp(self):
# ensure all entries actually come from the Windows registry
self.original_types_map = mimetypes.types_map.copy()
mimetypes.types_map.clear()
mimetypes.init()
self.db = mimetypes.MimeTypes()
def __init__(self, config_file=None, mimetype_files=None):
ScriptBase.__init__(self, config_file)
self.name = self.__class__.__name__
self.working_dir = boto.config.get('Pyami', 'working_dir')
self.sd = ServiceDef(config_file)
self.retry_count = self.sd.getint('retry_count', 5)
self.loop_delay = self.sd.getint('loop_delay', 30)
self.processing_time = self.sd.getint('processing_time', 60)
self.input_queue = self.sd.get_obj('input_queue')
self.output_queue = self.sd.get_obj('output_queue')
self.output_domain = self.sd.get_obj('output_domain')
if mimetype_files:
mimetypes.init(mimetype_files)
def get_image_format(extension):
mimetypes.init()
return mimetypes.types_map[extension.lower()]
def __init__(self, config_file=None, mimetype_files=None):
ScriptBase.__init__(self, config_file)
self.name = self.__class__.__name__
self.working_dir = boto.config.get('Pyami', 'working_dir')
self.sd = ServiceDef(config_file)
self.retry_count = self.sd.getint('retry_count', 5)
self.loop_delay = self.sd.getint('loop_delay', 30)
self.processing_time = self.sd.getint('processing_time', 60)
self.input_queue = self.sd.get_obj('input_queue')
self.output_queue = self.sd.get_obj('output_queue')
self.output_domain = self.sd.get_obj('output_domain')
if mimetype_files:
mimetypes.init(mimetype_files)
def load_config(pathname=None, server=None):
"""Load the ViewVC configuration file. SERVER is the server object
that will be using this configuration. Consult the environment for
the variable VIEWVC_CONF_PATHNAME and VIEWCVS_CONF_PATHNAME (its
legacy name) and, if set, use its value as the path of the
configuration file; otherwise, use PATHNAME (if provided). Failing
all else, use a hardcoded default configuration path."""
debug.t_start('load-config')
# See if the environment contains overrides to the configuration
# path. If we have a SERVER object, consult its environment; use
# the OS environment otherwise.
env_get = server and server.getenv or os.environ.get
env_pathname = (env_get("VIEWVC_CONF_PATHNAME")
or env_get("VIEWCVS_CONF_PATHNAME"))
# Try to find the configuration pathname by searching these ordered
# locations: the environment, the passed-in PATHNAME, the hard-coded
# default.
pathname = (env_pathname
or pathname
or os.path.join(os.path.dirname(os.path.dirname(__file__)),
"viewvc.conf"))
# Load the configuration!
cfg = config.Config()
cfg.set_defaults()
cfg.load_config(pathname, env_get("HTTP_HOST"))
# Load mime types file(s), but reverse the order -- our
# configuration uses a most-to-least preferred approach, but the
# 'mimetypes' package wants things the other way around.
if cfg.general.mime_types_files:
files = cfg.general.mime_types_files[:]
files.reverse()
files = map(lambda x, y=pathname: os.path.join(os.path.dirname(y), x), files)
mimetypes.init(files)
debug.t_end('load-config')
return cfg
def __init__(self, config_file=None, mimetype_files=None):
super(Service, self).__init__(config_file)
self.name = self.__class__.__name__
self.working_dir = boto.config.get('Pyami', 'working_dir')
self.sd = ServiceDef(config_file)
self.retry_count = self.sd.getint('retry_count', 5)
self.loop_delay = self.sd.getint('loop_delay', 30)
self.processing_time = self.sd.getint('processing_time', 60)
self.input_queue = self.sd.get_obj('input_queue')
self.output_queue = self.sd.get_obj('output_queue')
self.output_domain = self.sd.get_obj('output_domain')
if mimetype_files:
mimetypes.init(mimetype_files)
def _fakeInit(self, paths):
"""
A mock L{mimetypes.init} that records the value of the passed C{paths}
argument.
@param paths: The paths that will be recorded.
"""
self.paths = paths
def test_defaultArgumentIsNone(self):
"""
By default, L{None} is passed to C{mimetypes.init}.
"""
static.loadMimeTypes(init=self._fakeInit)
self.assertIdentical(self.paths, None)
def test_extraLocationsWork(self):
"""
Passed MIME type files are passed to C{mimetypes.init}.
"""
paths = ["x", "y", "z"]
static.loadMimeTypes(paths, init=self._fakeInit)
self.assertIdentical(self.paths, paths)
def test_usesGlobalInitFunction(self):
"""
By default, C{mimetypes.init} is called.
"""
# Checking mimetypes.inited doesn't always work, because
# something, somewhere, calls mimetypes.init. Yay global
# mutable state :)
if _PY3:
signature = inspect.signature(static.loadMimeTypes)
self.assertIs(signature.parameters["init"].default,
mimetypes.init)
else:
args, _, _, defaults = inspect.getargspec(static.loadMimeTypes)
defaultInit = defaults[args.index("init")]
self.assertIs(defaultInit, mimetypes.init)
def loadMimeTypes(mimetype_locations=None, init=mimetypes.init):
"""
Produces a mapping of extensions (with leading dot) to MIME types.
It does this by calling the C{init} function of the L{mimetypes} module.
This will have the side effect of modifying the global MIME types cache
in that module.
Multiple file locations containing mime-types can be passed as a list.
The files will be sourced in that order, overriding mime-types from the
files sourced beforehand, but only if a new entry explicitly overrides
the current entry.
@param mimetype_locations: Optional. List of paths to C{mime.types} style
files that should be used.
@type mimetype_locations: iterable of paths or L{None}
@param init: The init function to call. Defaults to the global C{init}
function of the C{mimetypes} module. For internal use (testing) only.
@type init: callable
"""
init(mimetype_locations)
mimetypes.types_map.update(
{
'.conf': 'text/plain',
'.diff': 'text/plain',
'.flac': 'audio/x-flac',
'.java': 'text/plain',
'.oz': 'text/x-oz',
'.swf': 'application/x-shockwave-flash',
'.wml': 'text/vnd.wap.wml',
'.xul': 'application/vnd.mozilla.xul+xml',
'.patch': 'text/plain'
}
)
return mimetypes.types_map
def setUp(self):
# ensure all entries actually come from the Windows registry
self.original_types_map = mimetypes.types_map.copy()
mimetypes.types_map.clear()
mimetypes.init()
self.db = mimetypes.MimeTypes()
def __init__(self, config_file=None, mimetype_files=None):
ScriptBase.__init__(self, config_file)
self.name = self.__class__.__name__
self.working_dir = boto.config.get('Pyami', 'working_dir')
self.sd = ServiceDef(config_file)
self.retry_count = self.sd.getint('retry_count', 5)
self.loop_delay = self.sd.getint('loop_delay', 30)
self.processing_time = self.sd.getint('processing_time', 60)
self.input_queue = self.sd.get_obj('input_queue')
self.output_queue = self.sd.get_obj('output_queue')
self.output_domain = self.sd.get_obj('output_domain')
if mimetype_files:
mimetypes.init(mimetype_files)
def __init__(self, queue, mutex, slaves):
"""
Initialization.
Args:
queue (queue.Queue): global message queue
slaves (dict): Dictionary of slaves
"""
super().__init__(queue, mutex)
self.slaves = slaves
try:
self.bot = telegram.ext.Updater(getattr(config, self.channel_id)['token'])
except (AttributeError, KeyError):
raise ValueError("Token is not properly defined. Please define it in `config.py`.")
mimetypes.init(files=["mimetypes"])
self.admins = getattr(config, self.channel_id)['admins']
self.logger = logging.getLogger("plugins.%s.TelegramChannel" % self.channel_id)
self.me = self.bot.bot.get_me()
self.bot.dispatcher.add_handler(WhitelistHandler(self.admins))
self.bot.dispatcher.add_handler(telegram.ext.CommandHandler("link", self.link_chat_show_list, pass_args=True))
self.bot.dispatcher.add_handler(telegram.ext.CommandHandler("chat", self.start_chat_list, pass_args=True))
self.bot.dispatcher.add_handler(telegram.ext.CommandHandler("recog", self.recognize_speech, pass_args=True))
self.bot.dispatcher.add_handler(telegram.ext.CallbackQueryHandler(self.callback_query_dispatcher))
self.bot.dispatcher.add_handler(telegram.ext.CommandHandler("start", self.start, pass_args=True))
self.bot.dispatcher.add_handler(telegram.ext.CommandHandler("extra", self.extra_help))
self.bot.dispatcher.add_handler(telegram.ext.CommandHandler("help", self.help))
self.bot.dispatcher.add_handler(telegram.ext.CommandHandler("unlink_all", self.unlink_all))
self.bot.dispatcher.add_handler(telegram.ext.CommandHandler("info", self.info))
self.bot.dispatcher.add_handler(
telegram.ext.RegexHandler(r"^/(?P<id>[0-9]+)_(?P<command>[a-z0-9_-]+)", self.extra_call,
pass_groupdict=True))
self.bot.dispatcher.add_handler(telegram.ext.MessageHandler(
telegram.ext.Filters.text |
telegram.ext.Filters.photo |
telegram.ext.Filters.sticker |
telegram.ext.Filters.document |
telegram.ext.Filters.venue |
telegram.ext.Filters.location |
telegram.ext.Filters.audio |
telegram.ext.Filters.voice |
telegram.ext.Filters.video,
self.msg
))
self.bot.dispatcher.add_error_handler(self.error)
self.MUTE_CHAT_ID = self.MUTE_CHAT_ID.format(chat_id=self.channel_id)
# Truncate string by bytes
# Written by Mark Tolonen
# http://stackoverflow.com/a/13738452/1989455
def __init__(self, url, working_dir='./.module_installer', root_dir=None):
'''Initialize ModuleInstaller. Url should contain a fragment (#) with
:param url: URL of file to install
query parameters:
http(s)://url/file.tar.gz#module_name=modname&module_path=Some/Path&move_to=/
module_name = local name of the module
save_as = force download to save as name
module_path = relative path, once the module has been extracted, to the
module dir to "install" (copy the folder/file to the move_to path)
move_to = path to extract the module to, relative to install_root
'''
mimetypes.init()
self.url = url
parsed_url = urlparse.urlparse(url)
qs = urlparse.parse_qs(parsed_url.fragment)
#Make sure required query params exist
if not qs.get('module_name') \
or not qs.get('module_path') \
or not qs.get('move_to'):
raise ModuleDownloadException('ModuleInstaller: Missing query string parameters')
self.module_path = qs['module_path'][0]
self.move_to = qs['move_to'][0]
self.mime_type = mimetypes.guess_type(parsed_url.path)
self.working_dir = os.path.abspath(working_dir)
self.module_name = qs['module_name'][0]
ext = os.path.splitext(parsed_url.path)
ext = os.path.splitext(ext[0])[1] + ext[1] #In case it's a .tar.gz
save_as = qs.get('save_as')
self.download_name = save_as[0] if save_as else self.module_name + ext
#Try to get the mime type from save_as name, if one doesn't exist
if not self.mime_type[0]:
self.mime_type = mimetypes.guess_type(self.download_name)
self.install_root = root_dir if root_dir else os.getcwd()
#Absolute path where the modules will be installed
self.full_install_path = os.path.join(self.install_root, self.move_to)