def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
python类getreader()的实例源码
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def sensu_event_resolve(message):
API_URL = settings.SENSU_API_URL + '/resolve'
userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii")
headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Authorization' : 'Basic %s' % userAndPass }
try:
client_name, check_name = message['entity'].split(':')
post_params = {"client": client_name, "check": check_name}
request = http.request('POST', API_URL, body=json.dumps(post_params), headers=headers)
response = request.status
if response == 202:
#reader = codecs.getreader('utf-8')
#data = json.load(reader(request))
request.release_conn()
else:
logger.error('response: %s' % str(response))
except:
logger.error("sensu_event_resolve failed resolving entity: %s" % message['entity'])
raise
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
if headers.get('Content-Type') != 'application/json':
logger.debug('Unexpected response for JSON request')
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def reset(self):
self.dataStream = codecs.getreader(self.charEncoding[0])(self.rawStream,
'replace')
self.chunk = u""
self.chunkSize = 0
self.chunkOffset = 0
self.errors = []
# number of (complete) lines in previous chunks
self.prevNumLines = 0
# number of columns in the last line of the previous chunk
self.prevNumCols = 0
#Deal with CR LF and surrogates split over chunk boundaries
self._bufferedCharacter = None
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def main():
filenames = ParseArguments(sys.argv[1:])
# Change stderr to write with replacement characters so we don't die
# if we try to print something containing non-ASCII characters.
sys.stderr = codecs.StreamReaderWriter(sys.stderr,
codecs.getreader('utf8'),
codecs.getwriter('utf8'),
'replace')
_cpplint_state.ResetErrorCounts()
for filename in filenames:
ProcessFile(filename, _cpplint_state.verbose_level)
_cpplint_state.PrintErrorCounts()
sys.exit(_cpplint_state.error_count > 0)
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
if headers.get('Content-Type') != 'application/json':
logger.debug('Unexpected response for JSON request')
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def _open_asset_path(path, encoding=None):
"""
:param asset_path:
string containing absolute path to file,
or package-relative path using format
``"python.module:relative/file/path"``.
:returns:
filehandle opened in 'rb' mode
(unless encoding explicitly specified)
"""
if encoding:
return codecs.getreader(encoding)(_open_asset_path(path))
if os.path.isabs(path):
return open(path, "rb")
package, sep, subpath = path.partition(":")
if not sep:
raise ValueError("asset path must be absolute file path "
"or use 'pkg.name:sub/path' format: %r" % (path,))
return pkg_resources.resource_stream(package, subpath)
#: type aliases
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def _resolve_version(version):
"""
Resolve LATEST version
"""
if version is not LATEST:
return version
meta_url = urljoin(DEFAULT_URL, '/pypi/setuptools/json')
resp = urlopen(meta_url)
with contextlib.closing(resp):
try:
charset = resp.info().get_content_charset()
except Exception:
# Python 2 compat; assume UTF-8
charset = 'UTF-8'
reader = codecs.getreader(charset)
doc = json.load(reader(resp))
return str(doc['info']['version'])
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fn = 'METADATA'
else:
fn = METADATA_FILENAME
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
except KeyError:
raise ValueError('Invalid wheel, because %s is '
'missing' % fn)
return result
def openshift_main():
import sys
import json
import codecs
import urllib3
from collections import OrderedDict
pool = urllib3.PoolManager()
reader = codecs.getreader('utf-8')
spec_url = 'https://raw.githubusercontent.com/openshift/origin/' \
'%s/api/swagger-spec/openshift-openapi-spec.json' % sys.argv[1]
output_path = sys.argv[2]
print("writing to {}".format(output_path))
with pool.request('GET', spec_url, preload_content=False) as response:
if response.status != 200:
print("Error downloading spec file. Reason: %s" % response.reason)
return 1
in_spec = json.load(reader(response), object_pairs_hook=OrderedDict)
out_spec = process_swagger(process_openshift_swagger(in_spec, output_path))
update_codegen_ignore(out_spec, output_path)
with open(output_path, 'w') as out:
json.dump(out_spec, out, sort_keys=True, indent=2,
separators=(',', ': '), ensure_ascii=True)
return 0
def _resolve_version(version):
"""
Resolve LATEST version
"""
if version is not LATEST:
return version
meta_url = urljoin(DEFAULT_URL, '/pypi/setuptools/json')
resp = urlopen(meta_url)
with contextlib.closing(resp):
try:
charset = resp.info().get_content_charset()
except Exception:
# Python 2 compat; assume UTF-8
charset = 'UTF-8'
reader = codecs.getreader(charset)
doc = json.load(reader(resp))
return str(doc['info']['version'])