def test_generating_cubefile_works(wfn_viewer):
wfn_viewer.numpoints = 64
grid, values = wfn_viewer._calc_orb_grid(wfn_viewer.mol.wfn.orbitals.canonical[1])
cb = wfn_viewer._grid_to_cube(grid, values)
assert isinstance(cb, unicode)
python类unicode()的实例源码
test_nbmolviz.py 文件源码
项目:notebook-molecular-visualization
作者: Autodesk
项目源码
文件源码
阅读 14
收藏 0
点赞 0
评论 0
def get_conn(self):
def _str(s):
# cloudant-python doesn't support unicode.
if isinstance(s, unicode):
log = LoggingMixin().log
log.debug(
'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".',
s
)
return s.encode('ascii', 'ignore')
return s
conn = self.get_connection(self.cloudant_conn_id)
for conn_param in ['host', 'password', 'schema']:
if not hasattr(conn, conn_param) or not getattr(conn, conn_param):
raise AirflowException(
'missing connection parameter {0}'.format(conn_param)
)
# In the connection form:
# - 'host' is renamed to 'Account'
# - 'login' is renamed 'Username (or API Key)'
# - 'schema' is renamed to 'Database'
#
# So, use the 'host' attribute as the account name, and, if login is
# defined, use that as the username.
account = cloudant.Account(_str(conn.host))
username = _str(conn.login or conn.host)
account.login(
username,
_str(conn.password)).raise_for_status()
return account.database(_str(conn.schema))
def log(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dttm = pendulum.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
ti = session.query(models.TaskInstance).filter(
models.TaskInstance.dag_id == dag_id,
models.TaskInstance.task_id == task_id,
models.TaskInstance.execution_date == dttm).first()
if ti is None:
logs = ["*** Task instance did not exist in the DB\n"]
else:
logger = logging.getLogger('airflow.task')
task_log_reader = conf.get('core', 'task_log_reader')
handler = next((handler for handler in logger.handlers
if handler.name == task_log_reader), None)
try:
ti.task = dag.get_task(ti.task_id)
logs = handler.read(ti)
except AttributeError as e:
logs = ["Task log handler {} does not support read logs.\n{}\n" \
.format(task_log_reader, str(e))]
for i, log in enumerate(logs):
if PY2 and not isinstance(log, unicode):
logs[i] = log.decode('utf-8')
return self.render(
'airflow/ti_log.html',
logs=logs, dag=dag, title="Log by attempts", task_id=task_id,
execution_date=execution_date, form=form)
def _can_handle_key(self, k):
return isinstance(k, (str, unicode))
def _can_handle_val(self, v):
if isinstance(v, (str, unicode)):
return True
elif isinstance(v, bool):
return True
elif isinstance(v, (int, long)):
return True
elif isinstance(v, float):
return True
return False
def _handle_attr(self, layer, feature, props):
for k, v in props.items():
if self._can_handle_attr(k, v):
if not PY3 and isinstance(k, str):
k = k.decode('utf-8')
if k not in self.seen_keys_idx:
layer.keys.append(k)
self.seen_keys_idx[k] = self.key_idx
self.key_idx += 1
feature.tags.append(self.seen_keys_idx[k])
if v not in self.seen_values_idx:
self.seen_values_idx[v] = self.val_idx
self.val_idx += 1
val = layer.values.add()
if isinstance(v, bool):
val.bool_value = v
elif isinstance(v, str):
if PY3:
val.string_value = v
else:
val.string_value = unicode(v, 'utf-8')
elif isinstance(v, unicode):
val.string_value = v
elif isinstance(v, (int, long)):
val.int_value = v
elif isinstance(v, float):
val.double_value = v
feature.tags.append(self.seen_values_idx[v])
def test_source_coding_utf8(self):
"""
Tests to ensure that the source coding line is not corrupted or
removed. It must be left as the first line in the file (including
before any __future__ imports). Also tests whether the unicode
characters in this encoding are parsed correctly and left alone.
"""
code = """
# -*- coding: utf-8 -*-
icons = [u"?", u"?", u"?", u"?"]
"""
def test_literal_prefixes_are_not_stripped(self):
"""
Tests to ensure that the u'' and b'' prefixes on unicode strings and
byte strings are not removed by the futurize script. Removing the
prefixes on Py3.3+ is unnecessary and loses some information -- namely,
that the strings have explicitly been marked as unicode or bytes,
rather than just e.g. a guess by some automated tool about what they
are.
"""
code = '''
s = u'unicode string'
b = b'byte string'
'''
self.unchanged(code)
def test_Py2_StringIO_module(self):
"""
This requires that the argument to io.StringIO be made a
unicode string explicitly if we're not using unicode_literals:
Ideally, there would be a fixer for this. For now:
TODO: add the Py3 equivalent for this to the docs. Also add back
a test for the unicode_literals case.
"""
before = """
import cStringIO
import StringIO
s1 = cStringIO.StringIO('my string')
s2 = StringIO.StringIO('my other string')
assert isinstance(s1, cStringIO.InputType)
"""
# There is no io.InputType in Python 3. futurize should change this to
# something like this. But note that the input to io.StringIO
# must be a unicode string on both Py2 and Py3.
after = """
import io
import io
s1 = io.StringIO(u'my string')
s2 = io.StringIO(u'my other string')
assert isinstance(s1, io.StringIO)
"""
self.convert_check(before, after)
def test_open(self):
"""
In conservative mode, futurize would not import io.open because
this changes the default return type from bytes to text.
"""
before = """
filename = 'temp_file_open.test'
contents = 'Temporary file contents. Delete me.'
with open(filename, 'w') as f:
f.write(contents)
with open(filename, 'r') as f:
data = f.read()
assert isinstance(data, str)
assert data == contents
"""
after = """
from past.builtins import open, str as oldbytes, unicode
filename = oldbytes(b'temp_file_open.test')
contents = oldbytes(b'Temporary file contents. Delete me.')
with open(filename, oldbytes(b'w')) as f:
f.write(contents)
with open(filename, oldbytes(b'r')) as f:
data = f.read()
assert isinstance(data, oldbytes)
assert data == contents
assert isinstance(oldbytes(b'hello'), basestring)
assert isinstance(unicode(u'hello'), basestring)
assert isinstance(oldbytes(b'hello'), basestring)
"""
self.convert_check(before, after, conservative=True)
def test_import_builtin_types(self):
code = """
s1 = 'abcd'
s2 = u'abcd'
b1 = b'abcd'
b2 = s2.encode('utf-8')
d1 = {}
d2 = dict((i, i**2) for i in range(10))
i1 = 1923482349324234L
i2 = 1923482349324234
"""
module = self.write_and_import(code, 'test_builtin_types')
self.assertTrue(isinstance(module.s1, oldstr))
self.assertTrue(isinstance(module.s2, unicode))
self.assertTrue(isinstance(module.b1, oldstr))
def get_conn(self):
def _str(s):
# cloudant-python doesn't support unicode.
if isinstance(s, unicode):
logging.debug(('cloudant-python does not support unicode. '
'Encoding %s as ascii using "ignore".'),
s)
return s.encode('ascii', 'ignore')
return s
conn = self.get_connection(self.cloudant_conn_id)
for conn_param in ['host', 'password', 'schema']:
if not hasattr(conn, conn_param) or not getattr(conn, conn_param):
raise AirflowException(
'missing connection parameter {0}'.format(conn_param)
)
# In the connection form:
# - 'host' is renamed to 'Account'
# - 'login' is renamed 'Username (or API Key)'
# - 'schema' is renamed to 'Database'
#
# So, use the 'host' attribute as the account name, and, if login is
# defined, use that as the username.
account = cloudant.Account(_str(conn.host))
username = _str(conn.login or conn.host)
account.login(
username,
_str(conn.password)).raise_for_status()
return account.database(_str(conn.schema))
def _can_handle_key(self, k):
return isinstance(k, (str, unicode))
def _can_handle_val(self, v):
if isinstance(v, (str, unicode)):
return True
elif isinstance(v, bool):
return True
elif isinstance(v, (int, long)):
return True
elif isinstance(v, float):
return True
return False
def _handle_attr(self, layer, feature, props):
for k, v in props.items():
if self._can_handle_attr(k, v):
if not PY3 and isinstance(k, str):
k = k.decode('utf-8')
if k not in self.seen_keys_idx:
layer.keys.append(k)
self.seen_keys_idx[k] = self.key_idx
self.key_idx += 1
feature.tags.append(self.seen_keys_idx[k])
if v not in self.seen_values_idx:
self.seen_values_idx[v] = self.val_idx
self.val_idx += 1
val = layer.values.add()
if isinstance(v, bool):
val.bool_value = v
elif isinstance(v, str):
if PY3:
val.string_value = v
else:
val.string_value = unicode(v, 'utf-8')
elif isinstance(v, unicode):
val.string_value = v
elif isinstance(v, (int, long)):
val.int_value = v
elif isinstance(v, float):
val.double_value = v
feature.tags.append(self.seen_values_idx[v])
def test_nestedInteractions(self, values):
"""
Nested interactions operate independently of parent interactions.
:param values: a two-tuple composed of:
- a recursive list of unicode and other recursive lists - list start
means begin interaction, string means node resolve, list end means
finish interaction.
- list of False/True; True means failed interaction
"""
requested_interactions, failures = values
failures = iter(failures)
assume(not isinstance(requested_interactions, unicode))
self.init()
ws_actor = self.connector.expectSocket()
self.connector.connect(ws_actor)
failures = iter(failures)
created_services = {}
expected_success_nodes = Counter()
expected_failed_nodes = Counter()
def run_interaction(children):
should_fail = next(failures)
failed = []
succeeded = []
self.session.start_interaction()
for child in children:
if isinstance(child, unicode):
# Make sure disco knows about the node:
if child in created_services:
node = created_services[child]
else:
node = create_node(child, child)
created_services[child] = node
self.disco.onMessage(None, NodeActive(node))
# Make sure the child Node is resolved in the interaction
self.session.resolve(node.service, "1.0")
if should_fail:
expected_failed_nodes[node] += 1
failed.append(node)
else:
expected_success_nodes[node] += 1
succeeded.append(node)
else:
run_interaction(child)
if should_fail:
self.session.fail_interaction("OHNO")
self.session.finish_interaction()
self.connector.advance_time(5.0) # Make sure interaction is sent
ws_actor.swallowLogMessages()
self.connector.expectInteraction(
self, ws_actor, self.session, failed, succeeded)
run_interaction(requested_interactions)
for node in set(expected_failed_nodes) | set(expected_success_nodes):
policy = self.disco.failurePolicy(node)
self.assertEqual((policy.successes, policy.failures),
(expected_success_nodes[node],
expected_failed_nodes[node]))