def get_input(self):
self._user_input_label_win.addstr(0, 0, 'input:')
self._user_input_label_win.refresh()
curses.echo()
inputstr = self._user_input_win.getstr(
0,
0,
self.width - self._user_input_win_x).decode(code)
curses.noecho()
if platform.python_version_tuple()[0] == '2':
inputstr = to_unicode(inputstr)
self._user_input_win.clear()
if inputstr == self.panic:
inputstr = ''
self._env._task_time = float('inf')
return inputstr
python类python_version_tuple()的实例源码
def get_input(self):
print("_"*self._total_msg_length)
print(self._env_output + ' reward:{:7}'.format(self.info['reward']))
print(self._learner_input + ' time:{:9}'.format(self.info['time']))
if self._byte_channels:
print(self._reward)
_ver = sys.version_info
if _ver[0] == 2:
input_str = raw_input()
else:
input_str = input()
if platform.python_version_tuple()[0] == '2':
input_str = to_unicode(input_str)
if input_str == self.panic:
input_str = ''
self._env._task_time = float('inf')
elif input_str == self.quit:
sys.exit()
return input_str
def test_sftp_read(self):
self.assertEqual(self._auth(), 0)
sftp = self.session.sftp_init()
self.assertTrue(sftp is not None)
if int(platform.python_version_tuple()[0]) >= 3:
test_file_data = b'test' + bytes(os.linesep, 'utf-8')
else:
test_file_data = b'test' + os.linesep
remote_filename = os.sep.join([os.path.dirname(__file__),
'remote_test_file'])
with open(remote_filename, 'wb') as test_fh:
test_fh.write(test_file_data)
with sftp.open(remote_filename, 0, 0) as remote_fh:
try:
self.assertTrue(remote_fh is not None)
remote_data = b""
for rc, data in remote_fh:
remote_data += data
self.assertEqual(remote_fh.close(), 0)
self.assertEqual(remote_data, test_file_data)
finally:
os.unlink(remote_filename)
def provenance(dirty=False):
"""Return provenance data about the execution environment.
:param dirty: if False, will exit with an error if the git repository
is dirty or absent.
"""
return {'python' : {'implementation': platform.python_implementation(),
'version' : platform.python_version_tuple(),
'compiler': platform.python_compiler(),
'branch' : platform.python_branch(),
'revision': platform.python_revision()},
'platform' : platform.platform(),
'packages' : list(pip.commands.freeze.freeze()), # list of installed packages
'git_info' : git_info(dirty_allowed=dirty),
'timestamp' : datetime.utcnow().isoformat()+'Z', # Z stands for UTC
}
def consume(self, input_char):
'''
Takes a byte into the channel
'''
if ord(input_char) >= 256:
raise Exception("Input char out of range")
if platform.python_version_tuple()[0] == '2':
encoded_char = to_unicode(input_char, 'utf-8')
else:
encoded_char = input_char
self._buffer += encoded_char
self.message_updated(self._buffer)
self.sequence_updated(self._buffer)
def save_path(self):
try:
save_path = self._save_path
model_path = self._model_path
except AttributeError:
save_path = 'saves/%s' % self.model_identifier
if platform.python_version_tuple()[0] is '2':
if not os.path.exists(save_path):
os.makedirs(save_path)
else:
os.makedirs(save_path, exist_ok=True)
model_path = os.path.join(save_path, 'model.chkpt')
self._save_path = save_path
self._model_path = model_path
return save_path, model_path
def logs_path(self):
try:
logs_path = self._logs_path
except AttributeError:
logs_path = 'logs/%s' % self.model_identifier
if self.renew_logs:
shutil.rmtree(logs_path, ignore_errors=True)
if platform.python_version_tuple()[0] is '2':
if not os.path.exists(logs_path):
os.makedirs(logs_path)
else:
os.makedirs(logs_path, exist_ok=True)
self._logs_path = logs_path
return logs_path
def get_machine_info():
"""
Return information about the machine, including host,
system, and the python version.
"""
return {'host':platform.node(),
'system': platform.system(),
'python_version': ".".join(platform.python_version_tuple())}
def prompt(default, msg):
py3 = int(platform.python_version_tuple()[0]) > 2
value = ''
if py3:
value = input(msg + " ['" + default + "']: ")
else:
value = raw_input(msg + " [" + default + "]: ")
return value if value != '' else default
def properties(self):
"""Returns less volatile node data suitable for host validation.
If the fetched property is expensive to compute, it should be cached / updated less frequently.
"""
zfs_not_installed, stdout, stderr = AgentShell.run_old(['which', 'zfs'])
return {'zfs_installed': not zfs_not_installed,
'distro': platform.linux_distribution()[0],
'distro_version': float('.'.join(platform.linux_distribution()[1].split('.')[:2])),
'python_version_major_minor': float("%s.%s" % (platform.python_version_tuple()[0],
platform.python_version_tuple()[1])),
'python_patchlevel': int(platform.python_version_tuple()[2]),
'kernel_version': platform.release()}
def get_python_version_tuple(self):
'''Returns the Python version as tuple (major, minor, patchlevel) of strings'''
return platform.python_version_tuple()
def fix_shebang(file_path, python_version):
"""Rewrite the shebang for the used major python version of the install,
use simply sed from OS"""
print("fix python version")
# accept array of python version from platform.python_version_tuple()
# fix shebang only with major python version
sed = "sed -i 's/#!.*\/usr\/bin\/.*python.*$/#!\/usr\/bin\/env python{}/'"\
.format(python_version[0], python_version[1])
# bring command together with file path
cmd = ' '.join([sed, file_path])
# execute the sed command and replace in place
os.system(cmd)
# Get current Python version
def requirements():
with open('requirements.txt', 'r') as fileobj:
requirements = [line.strip() for line in fileobj]
version = python_version_tuple()
if version[0] == 2 and version[1] == 6:
requirements.append("argparse==1.4.0")
return requirements
def test_class_constructor_only_accepts_py_modules_not_pyc(self):
# Create a module with both *.py and *.pyc.
self.py_with_pyc('foo.py')
# Create another with a *.pyc but no *.py behind it.
os.unlink(self.py_with_pyc('bar.py'))
# Now: *.py takes precedence over *.pyc ...
def get(name):
return os.path.basename(importlib.import_module(name).__file__)
self.assertTrue(get('foo'), 'foo.py')
try:
# ... and while *.pyc is importable ...
self.assertTrue(get('bar'), 'bar.pyc')
except ImportError:
try:
# (except on PyPy)
# http://doc.pypy.org/en/latest/config/objspace.lonepycfiles.html
self.assertEqual(platform.python_implementation(), 'PyPy')
except AssertionError:
# (... aaaaaand Python 3)
self.assertEqual(platform.python_version_tuple()[0], '3')
else:
# ... we refuse it.
with self.assertRaises(ValueError) as raised:
PythonPackageArchive('bar')
msg = raised.exception.args[0]
self.assertTrue(msg.startswith('Could not find a *.py source file'))
self.assertTrue(msg.endswith('bar.pyc'))
# We readily ignore a *.pyc if a *.py exists.
archive = PythonPackageArchive('foo')
archive.close()
self.assertEqual(archive.get_filenames(), ['foo.py'])
with archive.get_reader() as reader:
self.assertEqual(b'42', reader.read('foo.py'))
def test_log_unicode(self):
msg = "abc?÷"
filename_send = tempfile.mktemp()
filename_read = tempfile.mktemp()
p = pexpect.spawnu('cat')
if platform.python_version_tuple() < ('3', '0', '0'):
import codecs
def open(fname, mode, **kwargs):
if 'newline' in kwargs:
del kwargs['newline']
return codecs.open(fname, mode, **kwargs)
else:
import io
open = io.open
p.logfile_send = open(filename_send, 'w', encoding='utf-8')
p.logfile_read = open(filename_read, 'w', encoding='utf-8')
p.sendline(msg)
p.sendeof()
p.expect(pexpect.EOF)
p.close()
p.logfile_send.close()
p.logfile_read.close()
# ensure the 'send' log is correct,
with open(filename_send, 'r', encoding='utf-8') as f:
self.assertEqual(f.read(), msg + '\n\x04')
# ensure the 'read' log is correct,
with open(filename_read, 'r', encoding='utf-8', newline='') as f:
output = f.read().replace(_CAT_EOF, '')
self.assertEqual(output, (msg + '\r\n')*2 )
def calc_checksum(self, data, checksum=0):
'''
Calculate the checksum for a given block of data, can also be used to
update a checksum.
>>> csum = modem.calc_checksum('hello')
>>> csum = modem.calc_checksum('world', csum)
>>> hex(csum)
'0x3c'
'''
if platform.python_version_tuple() >= ('3', '0', '0'):
return (sum(data) + checksum) % 256
else:
return (sum(map(ord, data)) + checksum) % 256
def __init__(self):
_this_path = os.path.abspath(os.path.dirname(__file__))
self.root_path = os.path.abspath(os.path.join(_this_path, '..', '..', '..'))
self.build_path = os.path.abspath(os.path.join(_this_path, '..', '..'))
self.builder_path = os.path.join(self.build_path, 'builder')
self.win32_tools_path = os.path.join(self.build_path, 'tools', 'win32')
self.is_py2 = sys.version_info[0] == 2
self.is_py3 = sys.version_info[0] == 3
self.py_ver = platform.python_version_tuple()
self.py_ver_str = '%s%s' % (self.py_ver[0], self.py_ver[1])
self.py_exec = sys.executable
self.bits = self.BITS_32
self.bits_str = 'x86'
_bits = platform.architecture()[0]
if _bits == '64bit':
self.bits = self.BITS_64
self.bits_str = 'x64'
self.is_win = False
self.is_win_x64 = False
self.is_linux = False
self.is_macos = False
_os = platform.system().lower()
self.plat = ''
if _os == 'windows':
self.is_win = True
self.plat = 'windows'
self.is_win_x64 = 'PROGRAMFILES(X86)' in os.environ
elif _os == 'linux':
self.is_linux = True
self.plat = 'linux'
elif _os == 'darwin':
self.is_macos = True
self.plat = 'macos'
def test_encoding_latin1(self):
db = copy.copy(self.database)
db["charset"] = "latin1"
self.connect_conn_control(db)
if platform.python_version_tuple()[0] == "2":
string = unichr(233)
else:
string = "\u00e9"
create_query = "CREATE TABLE test (test CHAR(12)) CHARACTER SET latin1 COLLATE latin1_bin;"
insert_query = b"INSERT INTO test VALUES('" + string.encode('latin-1') + b"');"
event = self.create_and_insert_value(create_query, insert_query)
self.assertEqual(event.rows[0]["values"]["test"], string)
def test_encoding_utf8(self):
if platform.python_version_tuple()[0] == "2":
string = unichr(0x20ac)
else:
string = "\u20ac"
create_query = "CREATE TABLE test (test CHAR(12)) CHARACTER SET utf8 COLLATE utf8_bin;"
insert_query = b"INSERT INTO test VALUES('" + string.encode('utf-8') + b"')"
event = self.create_and_insert_value(create_query, insert_query)
self.assertMultiLineEqual(event.rows[0]["values"]["test"], string)
def save_path(self):
try:
save_path = self._save_path
model_path = self._model_path
except AttributeError:
save_path = 'saves/%s' % self.model_identifier
if platform.python_version_tuple()[0] is '2':
if not os.path.exists(save_path):
os.makedirs(save_path)
else:
os.makedirs(save_path, exist_ok=True)
model_path = os.path.join(save_path, 'model.chkpt')
self._save_path = save_path
self._model_path = model_path
return save_path, model_path
def logs_path(self):
try:
logs_path = self._logs_path
except AttributeError:
logs_path = 'logs/%s' % self.model_identifier
if self.renew_logs:
shutil.rmtree(logs_path, ignore_errors=True)
if platform.python_version_tuple()[0] is '2':
if not os.path.exists(logs_path):
os.makedirs(logs_path)
else:
os.makedirs(logs_path, exist_ok=True)
self._logs_path = logs_path
return logs_path
def get_entry_points():
major = int(platform.python_version_tuple()[0])
name = 'itango3' if major == 3 else 'itango'
return {
"console_scripts": ["{0} = itango:run".format(name)],
"gui_scripts": ["{0}-qt = itango:run_qt".format(name)]}
def test_log_unicode(self):
msg = "abc?÷"
filename_send = tempfile.mktemp()
filename_read = tempfile.mktemp()
p = pexpect.spawnu('cat')
if platform.python_version_tuple() < ('3', '0', '0'):
import codecs
def open(fname, mode, **kwargs):
if 'newline' in kwargs:
del kwargs['newline']
return codecs.open(fname, mode, **kwargs)
else:
import io
open = io.open
p.logfile_send = open(filename_send, 'w', encoding='utf-8')
p.logfile_read = open(filename_read, 'w', encoding='utf-8')
p.sendline(msg)
p.sendeof()
p.expect(pexpect.EOF)
p.close()
p.logfile_send.close()
p.logfile_read.close()
# ensure the 'send' log is correct,
with open(filename_send, 'r', encoding='utf-8') as f:
self.assertEqual(f.read(), msg + '\n\x04')
# ensure the 'read' log is correct,
with open(filename_read, 'r', encoding='utf-8', newline='') as f:
output = f.read().replace(_CAT_EOF, '')
self.assertEqual(output, (msg + '\r\n')*2 )
def platform_label():
major_version, _, __ = platform.python_version_tuple()
implementation = platform.python_implementation()
return '{implementation}{major_version}'.format(implementation=implementation.lower(),
major_version=major_version)
def __init__(self):
# self.dist_linux = ['ubuntu', 'centos', 'redhat']
# self.dist_all = self.dist_linux + ['windows', 'macos']
self.dist_all = ['windows', 'linux', 'macos']
self.is_py2 = sys.version_info[0] == 2
self.is_py3 = sys.version_info[0] == 3
self.target = TARGET_RELEASE
self.target_path = 'release'
_py_ver = platform.python_version_tuple()
self.py_ver = '%s%s' % (_py_ver[0], _py_ver[1])
self.bits = BITS_32
self.bits_path = 'x86'
_bits = platform.architecture()[0]
if _bits == '64bit':
self.bits = BITS_64
self.bits_path = 'x64'
_os = platform.system().lower()
self.dist = ''
if _os == 'windows':
self.dist = 'windows'
elif _os == 'linux':
self.dist = 'linux'
# (dist, ver, sys_id) = platform.dist()
# dist = dist.lower()
# if dist in self.dist_linux:
# self.dist = dist
# else:
# raise RuntimeError('unsupported linux dist: %s' % dist)
elif _os == 'darwin':
self.dist = 'macos'
self.host_os = self.dist
if self.host_os == 'windows':
self.host_os_is_win_x64 = 'PROGRAMFILES(X86)' in os.environ
self.make_dist_path()