def test_subtransformation():
subtransformation = Transformation(
Rule('*', lib.set_localname('pablo'))
)
transformation = Transformation(
lib.f(id, Ref('root')), lib.put_variable('source_id'),
subtransformation,
lib.f(id, Ref('root')), lib.put_variable('result_id'),
lib.debug_symbols('source_id', 'result_id'),
Rule(Not(If(Ref('source_id'), operator.eq, Ref('result_id'))),
(lib.debug_message('NO!'), lib.debug_symbols('root'),
lib.set_localname('neruda'), AbortRule))
)
doc = etree.fromstring('<augustus />')
assert etree.QName(doc).text == 'augustus'
result = transformation(doc)
assert result.tag == 'pablo'
python类eq()的实例源码
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def test_richcompare_crash(self):
# gh-4613
import operator as op
# dummy class where __array__ throws exception
class Foo(object):
__array_priority__ = 1002
def __array__(self,*args,**kwargs):
raise Exception()
rhs = Foo()
lhs = np.array(1)
for f in [op.lt, op.le, op.gt, op.ge]:
if sys.version_info[0] >= 3:
assert_raises(TypeError, f, lhs, rhs)
else:
f(lhs, rhs)
assert_(not op.eq(lhs, rhs))
assert_(op.ne(lhs, rhs))
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def test_values(self):
# check all operators and all comparison results
self.checkvalue("lt", 0, 0, False)
self.checkvalue("le", 0, 0, True )
self.checkvalue("eq", 0, 0, True )
self.checkvalue("ne", 0, 0, False)
self.checkvalue("gt", 0, 0, False)
self.checkvalue("ge", 0, 0, True )
self.checkvalue("lt", 0, 1, True )
self.checkvalue("le", 0, 1, True )
self.checkvalue("eq", 0, 1, False)
self.checkvalue("ne", 0, 1, True )
self.checkvalue("gt", 0, 1, False)
self.checkvalue("ge", 0, 1, False)
self.checkvalue("lt", 1, 0, False)
self.checkvalue("le", 1, 0, False)
self.checkvalue("eq", 1, 0, False)
self.checkvalue("ne", 1, 0, True )
self.checkvalue("gt", 1, 0, True )
self.checkvalue("ge", 1, 0, True )
def test_values(self):
# check all operators and all comparison results
self.checkvalue("lt", 0, 0, False)
self.checkvalue("le", 0, 0, True )
self.checkvalue("eq", 0, 0, True )
self.checkvalue("ne", 0, 0, False)
self.checkvalue("gt", 0, 0, False)
self.checkvalue("ge", 0, 0, True )
self.checkvalue("lt", 0, 1, True )
self.checkvalue("le", 0, 1, True )
self.checkvalue("eq", 0, 1, False)
self.checkvalue("ne", 0, 1, True )
self.checkvalue("gt", 0, 1, False)
self.checkvalue("ge", 0, 1, False)
self.checkvalue("lt", 1, 0, False)
self.checkvalue("le", 1, 0, False)
self.checkvalue("eq", 1, 0, False)
self.checkvalue("ne", 1, 0, True )
self.checkvalue("gt", 1, 0, True )
self.checkvalue("ge", 1, 0, True )
def _match_subject_name(cert, subject_name, compare_func=operator.eq, alt_names=True):
names = []
if alt_names:
try:
alt_names = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
names = alt_names.value.get_values_for_type(x509.DNSName)
except x509.extensions.ExtensionNotFound:
pass
if not names:
common_names = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
if common_names:
common_name = common_names[0]
names = [common_name.value]
if not any(compare_func(name, subject_name) for name in names):
if len(names) > 1:
raise InvalidCertificate("Subject name %r doesn't match either of %s" % (subject_name, ', '.join(map(repr, names))))
elif len(names) == 1:
raise InvalidCertificate("Subject name %r doesn't match %r" % (subject_name, names[0]))
else:
raise InvalidCertificate("No appropriate commonName or subjectAltName DNSName fields were found")
def __init__(self, parent, func, on_error=None, equal=None,
initial_value=undefined, *, tracker=None):
"""
:param tracker: an instance of :class:`~.tracker.Tracker` that is
managing the computing process
:param parent: a possible parent computation owning this
:param func: the function to be computed
:param on_error: an optional callback that will be called if an
error is raised during computation
:param equal: an optional equality comparison function to be used
instead of the default ``operator.eq``
:param initial_value: an optional initial value. By default it is a
marker value called ``undefined``, that will be replaced with the
first calculated value without generating any item.
"""
func = functools.partial(self._auto, func)
self._equal = equal or operator.eq
self.current_value = initial_value
self._tracker = tracker
self._init_next_value_container()
super().__init__(parent, func, on_error, tracker=tracker)
def __setattr__(self, name, new):
fields = super().__getattribute__('_fields')
if name in fields:
try:
old = super().__getattribute__(name)
except AttributeError:
old = undefined
res = super().__setattr__(name, new)
deps = super().__getattribute__('_deps')
if name in deps:
eq = super().__getattribute__('_field_eq')
if old is undefined or not eq(old, new):
deps[name].changed()
else:
res = super().__setattr__(name, new)
return res
def __init__(self, generator=undefined, initial_value=undefined,
equal=None, always_recompute=False, *, tracker=None):
super().__init__(tracker=tracker)
self._equal = equal or operator.eq
self._descriptor_initialized = False
self._single_value_initialized = False
if callable(generator):
# suppose it's used as a method decorator
self._generator = generator
self._value = initial_value
elif generator is not undefined:
self._generator = None
self._value = generator
else:
self._generator = None
self._value = initial_value
self._comp = None
self._always_recompute = always_recompute
def number_of_args(fn):
"""Return the number of positional arguments for a function, or None if the number is variable.
Looks inside any decorated functions."""
try:
if hasattr(fn, '__wrapped__'):
return number_of_args(fn.__wrapped__)
if any(p.kind == p.VAR_POSITIONAL for p in signature(fn).parameters.values()):
return None
else:
return sum(p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) for p in signature(fn).parameters.values())
except ValueError:
# signatures don't work for built-in operators, so check for a few explicitly
UNARY_OPS = [len, op.not_, op.truth, op.abs, op.index, op.inv, op.invert, op.neg, op.pos]
BINARY_OPS = [op.lt, op.le, op.gt, op.ge, op.eq, op.ne, op.is_, op.is_not, op.add, op.and_, op.floordiv, op.lshift, op.mod, op.mul, op.or_, op.pow, op.rshift, op.sub, op.truediv, op.xor, op.concat, op.contains, op.countOf, op.delitem, op.getitem, op.indexOf]
TERNARY_OPS = [op.setitem]
if fn in UNARY_OPS:
return 1
elif fn in BINARY_OPS:
return 2
elif fn in TERNARY_OPS:
return 3
else:
raise NotImplementedError("Bult-in operator {} not supported".format(fn))
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]