def http_handler(self, request):
if request.path.endswith("api.sock"):
return await self.ws_handler(request)
if request.path.endswith("/monitor/"):
data = pkgutil.get_data("rci.services.monitor",
"monitor.html").decode("utf8")
return web.Response(text=data, content_type="text/html")
if request.path.endswith("/login/github"):
if request.method == "POST":
url = self.oauth.generate_request_url(("read:org", ))
return web.HTTPFound(url)
if request.path.endswith("/oauth2/github"):
return (await self._oauth2_handler(request))
if request.path.endswith("logout"):
if request.method == "POST":
sid = request.cookies.get(self.config["cookie_name"])
del(self.sessions[sid])
return web.HTTPFound("/monitor/")
return web.HTTPNotFound()
python类get_data()的实例源码
def _monkey_patch_httplib2(extract_dir):
"""Patch things so that httplib2 works properly in a PAR.
Manually extract certificates to file to make OpenSSL happy and avoid error:
ssl.SSLError: [Errno 185090050] _ssl.c:344: error:0B084002:x509 ...
Args:
extract_dir: the directory into which we extract the necessary files.
"""
if os.path.isfile(httplib2.CA_CERTS):
# Not inside of a PAR file, so don't bother.
return
cacerts_contents = pkgutil.get_data('httplib2', 'cacerts.txt')
cacerts_filename = os.path.join(extract_dir, 'cacerts.txt')
with open(cacerts_filename, 'wb') as f:
f.write(cacerts_contents)
httplib2.CA_CERTS = cacerts_filename
def get_filename(package, resource):
"""Rewrite of pkgutil.get_data() that return the file path.
"""
loader = pkgutil.get_loader(package)
if loader is None or not hasattr(loader, 'get_data'):
return None
mod = sys.modules.get(package) or loader.load_module(package)
if mod is None or not hasattr(mod, '__file__'):
return None
# Modify the resource name to be compatible with the loader.get_data
# signature - an os.path format "filename" starting with the dirname of
# the package's __file__
parts = resource.split('/')
parts.insert(0, os.path.dirname(mod.__file__))
resource_name = os.path.normpath(os.path.join(*parts))
return resource_name
def install_kernel_resources(destination,
resource='gnuplot_kernel',
files=None):
"""
Copy the resource files to the kernelspec folder.
"""
if files is None:
files = ['logo-64x64.png', 'logo-32x32.png']
for filename in files:
try:
data = pkgutil.get_data(resource,
os.path.join('images', filename))
with open(os.path.join(destination, filename), 'wb') as fp:
fp.write(data)
except Exception as e:
sys.stderr.write(str(e))
def _generate_form(self):
font_xref = self._get_font_reference()
seal_template = PDFTemplate(pkgutil.get_data("llpdf.resources", "seal.pdft"))
seal_xref = seal_template.merge_into_pdf(self._pdf)["SealObject"]
sign_template = PDFTemplate(pkgutil.get_data("llpdf.resources", "sign_form.pdft"))
sign_template["FontXRef"] = font_xref
sign_template["SealFormXRef"] = seal_xref
signform_xref = sign_template.merge_into_pdf(self._pdf)["SignFormObject"]
signform = self._pdf.lookup(signform_xref)
signform.content[PDFName("/BBox")] = self._get_signature_bbox()
signform_data = signform.stream.decode()
(posx, posy, width, height) = self._get_signature_bbox()
signform_vars = {
"WIDTH": b"%.0f" % (width - 1),
"HEIGHT": b"%.0f" % (height - 1),
"TEXT": self._get_signing_text(),
}
for (varname, replacement) in signform_vars.items():
key = ("${" + varname + "}").encode("ascii")
signform_data = signform_data.replace(key, replacement)
signform.set_stream(EncodedObject.create(signform_data, compress = True))
return signform_xref
def initialize_kinto(loop, kinto_client, bucket, collection):
"""
Initialize the remote server with the initialization.yml file.
"""
# Leverage kinto-wizard async client.
thread_pool = ThreadPoolExecutor()
async_client = AsyncKintoClient(kinto_client, loop, thread_pool)
initialization_manifest = pkgutil.get_data('buildhub', 'initialization.yml')
config = yaml.safe_load(initialization_manifest)
# Check that we push the records at the right place.
if bucket not in config:
raise ValueError(f"Bucket '{bucket}' not specified in `initialization.yml`.")
if collection not in config[bucket]['collections']:
raise ValueError(f"Collection '{collection}' not specified in `initialization.yml`.")
await initialize_server(async_client,
config,
bucket=bucket,
collection=collection,
force=False)
def create_file_from_template (self, relpath, unique=False, template_name=None, append_data=None, subst=True, pathtype='temp'):
""" Create file from app template using app's conf dict.
If subst=False no template operations will be performed and the file is copied verbatim. """
if not template_name:
tname = template_name = os.path.basename(relpath)
else:
tname = template_name
# Try pkgutil resource locator
tpath = os.path.join('apps', self.__class__.__name__,
tname + '.template')
filedata = pkgutil.get_data('trivup', tpath)
if filedata is None:
raise FileNotFoundError('Class %s resource %s not found' %
('trivup', tpath))
if subst:
rendered = Template(filedata.decode('ascii')).substitute(self.conf)
else:
rendered = filedata.decode('ascii')
if append_data is not None:
rendered += '\n' + append_data
return self.create_file(relpath, unique, data=rendered, pathtype=pathtype)
def test_load(self):
sm = pkgutil.get_data('smeftrunner', 'tests/data/SMInput-CPV.dat').decode('utf-8')
wc = pkgutil.get_data('smeftrunner', 'tests/data/WCsInput-CPV-SMEFT.dat').decode('utf-8')
wcout = pkgutil.get_data('smeftrunner', 'tests/data/Output_SMEFTrunner.dat').decode('utf-8')
io.sm_lha2dict(pylha.load(sm))
io.wc_lha2dict(pylha.load(wc))
CSM = io.sm_lha2dict(pylha.load(wcout))
C = io.wc_lha2dict(pylha.load(wcout))
C2 = io.wc_lha2dict(io.wc_dict2lha(C))
for k in C:
npt.assert_array_equal(C[k], C2[k])
smeft = SMEFT()
smeft.load_initial((wcout,))
for k in C:
npt.assert_array_equal(definitions.symmetrize(C)[k], smeft.C_in[k], err_msg="Failed for {}".format(k))
for k in CSM:
npt.assert_array_equal(definitions.symmetrize(CSM)[k], smeft.C_in[k], err_msg="Failed for {}".format(k))
CSM2 = io.sm_lha2dict(io.sm_dict2lha(CSM))
for k in CSM:
npt.assert_array_equal(CSM[k], CSM2[k], err_msg="Failed for {}".format(k))
def get_source(self, environment, template_name):
final_path = template_name
if not template_name.startswith("/"):
for tdir in self.template_dirs:
full_path = os.path.join(tdir, template_name)
if os.path.isfile(full_path):
final_path = full_path
break
else:
full_path = os.path.join(tdir, template_name + self.template_extension)
if os.path.isfile(full_path):
final_path = full_path
break
else:
# See if parent can return it
if self.parent_loader:
return self.parent_loader.get_source(environment, template_name)
else:
source = pkgutil.get_data("onering", "data/templates/" + template_name).decode('utf-8')
return source, final_path, lambda: True
with file(final_path) as f:
source = f.read().decode('utf-8')
return source, final_path, lambda: mtime == getmtime(final_path)
def get_source(self, environment, template_name):
final_path = template_name
if not template_name.startswith("/"):
for tdir in self.template_dirs:
full_path = os.path.join(tdir, template_name)
if os.path.isfile(full_path):
final_path = full_path
break
else:
full_path = os.path.join(tdir, template_name + self.template_extension)
if os.path.isfile(full_path):
final_path = full_path
break
else:
# See if parent can return it
if self.parent_loader:
return self.parent_loader.get_source(environment, template_name)
else:
source = pkgutil.get_data("onering", "data/templates/" + template_name).decode('utf-8')
return source, final_path, lambda: True
with file(final_path) as f:
source = f.read().decode('utf-8')
return source, final_path, lambda: mtime == getmtime(final_path)
def __init__(self, cls, name):
self.cls = cls
self.name = name
self.filename = '%s.csv' % self.name
self.keys_by_name = {}
self.raw_by_key = {}
self.processed_by_key = {}
data = pkgutil.get_data(__name__, self.filename)
buf = io.StringIO(data.decode('ascii'), newline=u'')
reader = csv.DictReader(buf, lineterminator=u'\n')
self.fieldnames = reader.fieldnames
for raw in reader:
key = self.cls(raw['key'])
assert key not in self.raw_by_key
self.raw_by_key[key] = raw
name = self.name_from_raw(key, raw)
assert name not in self.keys_by_name
self.keys_by_name[name] = key
self.accessor = KnowledgeAccessor(self)
def _load_notices():
lookup = lxml.etree.ElementNamespaceClassLookup()
parser = lxml.etree.XMLParser()
parser.set_element_class_lookup(lookup)
ns = lookup.get_namespace(None)
for severity in Severity:
ns[severity.name] = Notice
ns['title'] = Title
ns['explain'] = Paragraph
ns['exception'] = ExceptionDetails
ns['var'] = Var
ns['ref'] = Ref
ns['cite'] = Cite
ns['rfc'] = CiteRFC
for tag in known_map:
ns[tag] = Known
notices_xml = pkgutil.get_data('httpolice', 'notices.xml')
root = lxml.etree.fromstring(notices_xml, parser)
r = {}
for elem in root:
if isinstance(elem, Notice):
assert elem.id not in r
r[elem.id] = elem
return r, parser
def reduce_domains(domains):
# reduce 'www.google.com' to 'google.com'
# remove invalid domains
tld_content = pkgutil.get_data('gfwlist2pac', 'resources/tld.txt')
tlds = set(tld_content.splitlines(False))
new_domains = set()
for domain in domains:
domain_parts = domain.split('.')
last_root_domain = None
for i in xrange(0, len(domain_parts)):
root_domain = '.'.join(domain_parts[len(domain_parts) - i - 1:])
if i == 0:
if not tlds.__contains__(root_domain):
# root_domain is not a valid tld
break
last_root_domain = root_domain
if tlds.__contains__(root_domain):
continue
else:
break
if last_root_domain is not None:
new_domains.add(last_root_domain)
return new_domains
def generate_pac_precise(rules, proxy):
def grep_rule(rule):
if rule:
if rule.startswith('!'):
return None
if rule.startswith('['):
return None
return rule
return None
# render the pac file
proxy_content = pkgutil.get_data('gfwlist2pac', 'resources/abp.js')
rules = filter(grep_rule, rules)
proxy_content = proxy_content.replace('__PROXY__', json.dumps(str(proxy)))
proxy_content = proxy_content.replace('__RULES__',
json.dumps(rules, indent=2))
return proxy_content
def _read_emodic(self):
""" Load emotion dictionaries """
self.emodic = {'emotem': {}, 'emotion': {}}
# Reading dictionaries of syntactical indicator of emotiveness
emotemy = ('interjections', 'exclamation', 'vulgar', 'endearments', 'emotikony', 'gitaigo')
for emotem_class in emotemy:
data = pkgutil.get_data('mlask',
os.path.join('emotemes', '%s_uncoded.txt') % emotem_class)
phrases = data.decode('utf8').splitlines()
self.emodic['emotem'][emotem_class] = phrases
# Reading dictionaries of emotion
emotions = ('aware', 'haji', 'ikari', 'iya', 'kowa', 'odoroki', 'suki', 'takaburi', 'yasu', 'yorokobi')
for emotion_class in emotions:
data = pkgutil.get_data('mlask',
os.path.join('emotions', '%s_uncoded.txt') % emotion_class)
phrases = data.decode('utf8').splitlines()
self.emodic['emotion'][emotion_class] = phrases
def has_sorted_training_set(self):
try:
pkgutil.get_data('numerai.data', 'r' + str(self.round_number) + '_numerai_sorted_training_data.csv')
return True
except IOError:
return False
def getzoneinfofile_stream():
try:
return BytesIO(get_data(__name__, _ZONEFILENAME))
except IOError as e: # TODO switch to FileNotFoundError?
warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
return None
def _http_settings(self, request):
import jinja2
template = jinja2.Template(
pkgutil.get_data("rci.services.github", "github_settings.html").decode("utf8"))
client = self._get_client(request)
if client is None:
return web.HTTPUnauthorized(text="fail")
orgs = []
for org in (await client.get("user/orgs")):
orgs.append(org)
return web.Response(text=template.render(orgs=orgs), content_type="text/html")
def pkgdata(name):
data = pkgutil.get_data("pythonwhois", name)
if sys.version_info < (3, 0):
return data
else:
return data.decode("utf-8")
def get_content(file_name, pkg_name='onedrivee', is_text=True):
"""
Read a resource file in data/.
:param str file_name:
:param str pkg_name:
:param True | False is_text: True to indicate the text is UTF-8 encoded.
:return str | bytes: Content of the file.
"""
content = pkgutil.get_data(pkg_name, 'store/' + file_name)
if is_text:
content = content.decode('utf-8')
return content
def pkgdata(name):
data = pkgutil.get_data("pythonwhois", name)
if sys.version_info < (3, 0):
return data
else:
return data.decode("utf-8")
def getzoneinfofile_stream():
try:
return BytesIO(get_data(__name__, ZONEFILENAME))
except IOError as e: # TODO switch to FileNotFoundError?
warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
return None
def getzoneinfofile_stream():
try:
return BytesIO(get_data(__name__, ZONEFILENAME))
except IOError as e: # TODO switch to FileNotFoundError?
warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
return None
def write_images(self):
"""Write the SVG images."""
for path in ('file.svg', 'back.svg'):
source = pkgutil.get_data('flake8_html', 'images/' + path)
outpath = os.path.join(self.outdir, path)
with open(outpath, 'wb') as f:
f.write(source)
def getzoneinfofile_stream():
try:
return BytesIO(get_data(__name__, ZONEFILENAME))
except IOError as e: # TODO switch to FileNotFoundError?
warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
return None
def lib():
print('In dir_shadowing_lib.py lib()')
# Test resource extraction
lib_dat = pkgutil.get_data('test_dir_shadowing',
'dir_shadowing_lib_dat.txt')
assert (lib_dat == b'Dummy data file for dir_shadowing_lib.py\n'), lib_dat
def main():
print('In dir_shadowing_main.py main()')
dir_shadowing_lib.lib()
# Test resource extraction
dat = pkgutil.get_data('test_dir_shadowing', 'dir_shadowing_main_dat.txt')
assert (dat == b'Dummy data file for dir_shadowing_main.py\n'), dat
def lib():
print('In a_lib.py lib()')
# Test resource extraction
a_lib_dat = pkgutil.get_data('subpar.tests.package_a', 'a_lib_dat.txt')
assert (a_lib_dat == b'Dummy data file for a_lib.py\n'), a_lib_dat