def isEmpty():
"""
Return True if no non-whitespace characters remain in standard
input. Otherwise return False.
"""
global _buffer
while _buffer.strip() == '':
line = sys.stdin.readline()
if sys.hexversion < 0x03000000:
line = line.decode('utf-8')
if line == '':
return True
_buffer += line
return False
#-----------------------------------------------------------------------
python类hexversion()的实例源码
def hasNextLine():
"""
Return True if standard input has a next line. Otherwise return
False.
"""
global _buffer
if _buffer != '':
return True
else:
_buffer = sys.stdin.readline()
if sys.hexversion < 0x03000000:
_buffer = _buffer.decode('utf-8')
if _buffer == '':
return False
return True
#-----------------------------------------------------------------------
def __init__(self, file):
self.file = file
if file == '':
self.infile = sys.stdin
elif file.lower().startswith('http://') or file.lower().startswith('https://'):
try:
if sys.hexversion >= 0x020601F0:
self.infile = urllib23.urlopen(file, timeout=5)
else:
self.infile = urllib23.urlopen(file)
except urllib23.HTTPError:
print('Error accessing URL %s' % file)
print(sys.exc_info()[1])
sys.exit()
elif file.lower().endswith('.zip'):
try:
self.zipfile = zipfile.ZipFile(file, 'r')
self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected'))
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
else:
try:
self.infile = open(file, 'rb')
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
self.ungetted = []
def check_invalid_constraints(self):
feat = set([])
for x in list(TaskGen.feats.values()):
feat.union(set(x))
for (x, y) in TaskGen.task_gen.prec.items():
feat.add(x)
feat.union(set(y))
ext = set([])
for x in TaskGen.task_gen.mappings.values():
ext.add(x.__name__)
invalid = ext & feat
if invalid:
Logs.error('The methods %r have invalid annotations: @extension <-> @feature/@before_method/@after_method' % list(invalid))
# the build scripts have been read, so we can check for invalid after/before attributes on task classes
for cls in list(Task.classes.values()):
if sys.hexversion > 0x3000000 and issubclass(cls, Task.Task) and isinstance(cls.hcode, str):
raise Errors.WafError('Class %r has hcode value %r of type <str>, expecting <bytes> (use Utils.h_cmd() ?)' % (cls, cls.hcode))
for x in ('before', 'after'):
for y in Utils.to_list(getattr(cls, x, [])):
if not Task.classes.get(y, None):
Logs.error('Erroneous order constraint %r=%r on task class %r' % (x, y, cls.__name__))
if getattr(cls, 'rule', None):
Logs.error('Erroneous attribute "rule" on task class %r (rename to "run_str")' % cls.__name__)
def read_header(conn):
cnt = 0
buf = []
while cnt < HEADER_SIZE:
data = conn.recv(HEADER_SIZE - cnt)
if not data:
#import traceback
#traceback.print_stack()
raise ValueError('connection ended when reading a header %r' % buf)
buf.append(data)
cnt += len(data)
if sys.hexversion > 0x3000000:
ret = ''.encode('iso8859-1').join(buf)
ret = ret.decode('iso8859-1')
else:
ret = ''.join(buf)
return ret
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def process_command(conn):
query = conn.recv(HEADER_SIZE)
if not query:
return None
#print(len(query))
assert(len(query) == HEADER_SIZE)
if sys.hexversion > 0x3000000:
query = query.decode('iso8859-1')
#print "%r" % query
if not re_valid_query.match(query):
send_response(conn, -1, '', '', 'Invalid query %r' % query)
raise ValueError('Invalid query %r' % query)
query = query.strip().split(',')
if query[0] == REQ:
run_command(conn, query[1:])
elif query[0] == BYE:
raise ValueError('Exit')
else:
raise ValueError('Invalid query %r' % query)
return 'ok'
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
if sys.hexversion > 0x3000000 and not 'b' in m:
data = data.encode(encoding)
m += 'b'
flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
if 'b' in m:
flags |= os.O_BINARY
if '+' in m:
flags |= os.O_RDWR
try:
fd = os.open(f, flags)
except OSError:
raise IOError('Cannot write to %r' % f)
f = os.fdopen(fd, m)
try:
f.write(data)
finally:
f.close()
def h_cmd(ins):
"""
Task command hashes are calculated by calling this function. The inputs can be
strings, functions, tuples/lists containing strings/functions
"""
# this function is not meant to be particularly fast
if isinstance(ins, str):
# a command is either a string
ret = ins
elif isinstance(ins, list) or isinstance(ins, tuple):
# or a list of functions/strings
ret = str([h_cmd(x) for x in ins])
else:
# or just a python function
ret = str(h_fun(ins))
if sys.hexversion > 0x3000000:
ret = ret.encode('iso8859-1', 'xmlcharrefreplace')
return ret
def check_invalid_constraints(self):
feat = set([])
for x in list(TaskGen.feats.values()):
feat.union(set(x))
for (x, y) in TaskGen.task_gen.prec.items():
feat.add(x)
feat.union(set(y))
ext = set([])
for x in TaskGen.task_gen.mappings.values():
ext.add(x.__name__)
invalid = ext & feat
if invalid:
Logs.error('The methods %r have invalid annotations: @extension <-> @feature/@before_method/@after_method' % list(invalid))
# the build scripts have been read, so we can check for invalid after/before attributes on task classes
for cls in list(Task.classes.values()):
if sys.hexversion > 0x3000000 and issubclass(cls, Task.Task) and isinstance(cls.hcode, str):
raise Errors.WafError('Class %r has hcode value %r of type <str>, expecting <bytes> (use Utils.h_cmd() ?)' % (cls, cls.hcode))
for x in ('before', 'after'):
for y in Utils.to_list(getattr(cls, x, [])):
if not Task.classes.get(y, None):
Logs.error('Erroneous order constraint %r=%r on task class %r' % (x, y, cls.__name__))
if getattr(cls, 'rule', None):
Logs.error('Erroneous attribute "rule" on task class %r (rename to "run_str")' % cls.__name__)
def read_header(conn):
cnt = 0
buf = []
while cnt < HEADER_SIZE:
data = conn.recv(HEADER_SIZE - cnt)
if not data:
#import traceback
#traceback.print_stack()
raise ValueError('connection ended when reading a header %r' % buf)
buf.append(data)
cnt += len(data)
if sys.hexversion > 0x3000000:
ret = ''.encode('iso8859-1').join(buf)
ret = ret.decode('iso8859-1')
else:
ret = ''.join(buf)
return ret
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
if sys.hexversion > 0x3000000 and not 'b' in m:
data = data.encode(encoding)
m += 'b'
flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
if 'b' in m:
flags |= os.O_BINARY
if '+' in m:
flags |= os.O_RDWR
try:
fd = os.open(f, flags)
except OSError:
raise IOError('Cannot write to %r' % f)
f = os.fdopen(fd, m)
try:
f.write(data)
finally:
f.close()
def h_cmd(ins):
"""
Task command hashes are calculated by calling this function. The inputs can be
strings, functions, tuples/lists containing strings/functions
"""
# this function is not meant to be particularly fast
if isinstance(ins, str):
# a command is either a string
ret = ins
elif isinstance(ins, list) or isinstance(ins, tuple):
# or a list of functions/strings
ret = str([h_cmd(x) for x in ins])
else:
# or just a python function
ret = str(h_fun(ins))
if sys.hexversion > 0x3000000:
ret = ret.encode('iso8859-1', 'xmlcharrefreplace')
return ret
def read_header(conn):
cnt = 0
buf = []
while cnt < HEADER_SIZE:
data = conn.recv(HEADER_SIZE - cnt)
if not data:
#import traceback
#traceback.print_stack()
raise ValueError('connection ended when reading a header %r' % buf)
buf.append(data)
cnt += len(data)
if sys.hexversion > 0x3000000:
ret = ''.encode('iso8859-1').join(buf)
ret = ret.decode('iso8859-1')
else:
ret = ''.join(buf)
return ret
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def process_command(conn):
query = conn.recv(HEADER_SIZE)
if not query:
return None
#print(len(query))
assert(len(query) == HEADER_SIZE)
if sys.hexversion > 0x3000000:
query = query.decode('iso8859-1')
#print "%r" % query
if not re_valid_query.match(query):
send_response(conn, -1, '', '', 'Invalid query %r' % query)
raise ValueError('Invalid query %r' % query)
query = query.strip().split(',')
if query[0] == REQ:
run_command(conn, query[1:])
elif query[0] == BYE:
raise ValueError('Exit')
else:
raise ValueError('Invalid query %r' % query)
return 'ok'
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
if sys.hexversion > 0x3000000 and not 'b' in m:
data = data.encode(encoding)
m += 'b'
flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
if 'b' in m:
flags |= os.O_BINARY
if '+' in m:
flags |= os.O_RDWR
try:
fd = os.open(f, flags)
except OSError:
raise IOError('Cannot write to %r' % f)
f = os.fdopen(fd, m)
try:
f.write(data)
finally:
f.close()
def get_algorithm(algorithm):
"""Returns the wire format string and the hash module to use for the
specified TSIG algorithm
@rtype: (string, hash constructor)
@raises NotImplementedError: I{algorithm} is not supported
"""
global _hashes
if _hashes is None:
_setup_hashes()
if isinstance(algorithm, (str, unicode)):
algorithm = dns.name.from_text(algorithm)
if sys.hexversion < 0x02050200 and \
(algorithm == HMAC_SHA384 or algorithm == HMAC_SHA512):
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" requires Python 2.5.2 or later")
try:
return (algorithm.to_digestable(), _hashes[algorithm])
except KeyError:
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" is not supported")
def get_algorithm(algorithm):
"""Returns the wire format string and the hash module to use for the
specified TSIG algorithm
@rtype: (string, hash constructor)
@raises NotImplementedError: I{algorithm} is not supported
"""
global _hashes
if _hashes is None:
_setup_hashes()
if isinstance(algorithm, (str, unicode)):
algorithm = dns.name.from_text(algorithm)
if sys.hexversion < 0x02050200 and \
(algorithm == HMAC_SHA384 or algorithm == HMAC_SHA512):
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" requires Python 2.5.2 or later")
try:
return (algorithm.to_digestable(), _hashes[algorithm])
except KeyError:
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" is not supported")
def main():
"""
Create the ec2rl instance and run it. Provide the user with messages relevant to their subcommand, if applicable.
Returns:
(int): 0 if no errors detected,
201 if Python < 2.7,
"""
if sys.hexversion < 0x2070000:
print("ec2rl requires Python 2.7+, but running version is {0}.".format(
platform.python_version()))
return 201
import ec2rlcore.main
ec2rl = ec2rlcore.main.Main()
ec2rl()
return 0
def _get_python_version(self):
venv_python = self.get_python_program()
# FIXME: use a get_output() function
code = 'import sys; print(sys.hexversion)'
exitcode, stdout = self.get_output_nocheck(venv_python, '-c', code)
if exitcode:
print("ERROR: failed to get the Python version")
sys.exit(exitcode)
hexversion = int(stdout.rstrip())
print("Python hexversion: %x" % hexversion)
# On Python: 3.5a0 <= version < 3.5.0 (final), install pip 7.1.2,
# the last version working on Python 3.5a0:
# https://sourceforge.net/p/pyparsing/bugs/100/
self._force_old_pip = (0x30500a0 <= hexversion < 0x30500f0)
def get_algorithm(algorithm):
"""Returns the wire format string and the hash module to use for the
specified TSIG algorithm
@rtype: (string, hash constructor)
@raises NotImplementedError: I{algorithm} is not supported
"""
global _hashes
if _hashes is None:
_setup_hashes()
if isinstance(algorithm, (str, unicode)):
algorithm = dns.name.from_text(algorithm)
if sys.hexversion < 0x02050200 and \
(algorithm == HMAC_SHA384 or algorithm == HMAC_SHA512):
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" requires Python 2.5.2 or later")
try:
return (algorithm.to_digestable(), _hashes[algorithm])
except KeyError:
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" is not supported")
def finalize_options(self):
self.set_undefined_options('install_lib',
('install_dir', 'install_dir'))
self.set_undefined_options('install',('install_layout','install_layout'))
if sys.hexversion > 0x2060000:
self.set_undefined_options('install',('prefix_option','prefix_option'))
ei_cmd = self.get_finalized_command("egg_info")
basename = pkg_resources.Distribution(
None, None, ei_cmd.egg_name, ei_cmd.egg_version
).egg_name() + '.egg-info'
if self.install_layout:
if not self.install_layout.lower() in ['deb']:
raise DistutilsOptionError(
"unknown value for --install-layout")
basename = basename.replace('-py%s' % pkg_resources.PY_MAJOR, '')
elif self.prefix_option or 'real_prefix' in sys.__dict__:
# don't modify for virtualenv
pass
else:
basename = basename.replace('-py%s' % pkg_resources.PY_MAJOR, '')
self.source = ei_cmd.egg_info
self.target = os.path.join(self.install_dir, basename)
self.outputs = [self.target]
def python_version_string(version=None):
"""
Convert a numeric Python version (such as ``sys.hexversion``) to a
printable string.
:Parameters:
version : int
Python integer version
:rtype: str
:return: The stringified version
"""
major, minor, micro, release_level, serial = split_python_version(version)
s = '%d.%d' % (major, minor)
if micro > 0:
s += '.%d' % micro
if release_level != 'final':
s += release_level[0]
s += '%s' % serial
return s
def _stop_trace(self):
if self.pydev_do_not_trace:
disable_tracing = True
if pydevd_vm_type.get_vm_type() == pydevd_vm_type.PydevdVmType.JYTHON and sys.hexversion <= 0x020201f0:
# don't run untraced threads if we're in jython 2.2.1 or lower
# jython bug: if we start a thread and another thread changes the tracing facility
# it affects other threads (it's not set only for the thread but globally)
# Bug: http://sourceforge.net/tracker/index.php?func=detail&aid=1870039&group_id=12867&atid=112867
disable_tracing = False
if disable_tracing:
pydevd_tracing.SetTrace(None) # no debugging on this thread
#=======================================================================================================================
# ReaderThread
#=======================================================================================================================
def test_implementation(self):
# This test applies to all implementations equally.
levels = {'alpha': 0xA, 'beta': 0xB, 'candidate': 0xC, 'final': 0xF}
self.assertTrue(hasattr(sys.implementation, 'name'))
self.assertTrue(hasattr(sys.implementation, 'version'))
self.assertTrue(hasattr(sys.implementation, 'hexversion'))
self.assertTrue(hasattr(sys.implementation, 'cache_tag'))
version = sys.implementation.version
self.assertEqual(version[:2], (version.major, version.minor))
hexversion = (version.major << 24 | version.minor << 16 |
version.micro << 8 | levels[version.releaselevel] << 4 |
version.serial << 0)
self.assertEqual(sys.implementation.hexversion, hexversion)
# PEP 421 requires that .name be lower case.
self.assertEqual(sys.implementation.name,
sys.implementation.name.lower())
def finalize_options(self):
self.set_undefined_options('install_lib',
('install_dir', 'install_dir'))
self.set_undefined_options('install',('install_layout','install_layout'))
if sys.hexversion > 0x2060000:
self.set_undefined_options('install',('prefix_option','prefix_option'))
ei_cmd = self.get_finalized_command("egg_info")
basename = pkg_resources.Distribution(
None, None, ei_cmd.egg_name, ei_cmd.egg_version
).egg_name() + '.egg-info'
if self.install_layout:
if not self.install_layout.lower() in ['deb']:
raise DistutilsOptionError(
"unknown value for --install-layout")
basename = basename.replace('-py%s' % pkg_resources.PY_MAJOR, '')
elif self.prefix_option or 'real_prefix' in sys.__dict__:
# don't modify for virtualenv
pass
else:
basename = basename.replace('-py%s' % pkg_resources.PY_MAJOR, '')
self.source = ei_cmd.egg_info
self.target = os.path.join(self.install_dir, basename)
self.outputs = [self.target]
def __get_download_headers__(self):
if self.url.startswith("https"):
try:
conn = urllib2.urlopen(urllib2.Request(self.url.replace("https", "http"), headers=self._headers))
conn.fp._sock.close()
self.url = self.url.replace("https", "http")
except:
pass
for x in range(3):
try:
if not sys.hexversion > 0x0204FFFF:
conn = urllib2.urlopen(urllib2.Request(self.url, headers=self._headers))
conn.fp._sock.close()
else:
conn = urllib2.urlopen(urllib2.Request(self.url, headers=self._headers), timeout=5)
except:
self.response_headers = dict()
self._state = self.states.error
else:
self.response_headers = conn.headers.dict
self._state = self.states.stopped
break
def _generate(self):
""" Generate the Python code. """
needs_close = False
if sys.hexversion >= 0x03000000:
if self._opts.output == '-':
from io import TextIOWrapper
pyfile = TextIOWrapper(sys.stdout.buffer, encoding='utf8')
else:
pyfile = open(self._opts.output, 'wt', encoding='utf8')
needs_close = True
else:
if self._opts.output == '-':
pyfile = sys.stdout
else:
pyfile = open(self._opts.output, 'wt')
needs_close = True
compileUi(self._ui_file, pyfile, self._opts.execute, self._opts.indent,
self._opts.from_imports, self._opts.resource_suffix)
if needs_close:
pyfile.close()