def and_test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
items = [
cls.interpret(nodelist[i])
for i in range(1, len(nodelist), 2)
]
return functools.reduce(operator.and_, items)
python类and_()的实例源码
def and_test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
items = [
cls.interpret(nodelist[i])
for i in range(1, len(nodelist), 2)
]
return functools.reduce(operator.and_, items)
def match(self, props):
"""
Check whether the given props match all the conditions.
"""
def match_condition(props, cond):
"""
Check whether the given props match the given condition.
"""
try:
prop, op, ref = props[cond[0]], cond[1], cond[2]
except KeyError:
return False
if op == "~":
return fnmatch.fnmatchcase(prop.lower(), ref.lower())
elif op == "=":
return prop == ref
elif op == ">":
return prop > ref
elif op == "<":
return prop < ref
else:
return False
#
return functools.reduce(operator.and_,
[ match_condition(props, cond) \
for cond in self.conditions ],
True)
def __eq__(self, other):
expressions = [(self.model_class._meta.fields[field] == value)
for field, value in zip(self.field_names, other)]
return reduce(operator.and_, expressions)
def _add_query_clauses(self, initial, expressions, conjunction=None):
reduced = reduce(operator.and_, expressions)
if initial is None:
return reduced
conjunction = conjunction or operator.and_
return conjunction(initial, reduced)
def filter(self, *args, **kwargs):
# normalize args and kwargs into a new expression
dq_node = Node()
if args:
dq_node &= reduce(operator.and_, [a.clone() for a in args])
if kwargs:
dq_node &= DQ(**kwargs)
# dq_node should now be an Expression, lhs = Node(), rhs = ...
q = deque([dq_node])
dq_joins = set()
while q:
curr = q.popleft()
if not isinstance(curr, Expression):
continue
for side, piece in (('lhs', curr.lhs), ('rhs', curr.rhs)):
if isinstance(piece, DQ):
query, joins = self.convert_dict_to_node(piece.query)
dq_joins.update(joins)
expression = reduce(operator.and_, query)
# Apply values from the DQ object.
expression._negated = piece._negated
expression._alias = piece._alias
setattr(curr, side, expression)
else:
q.append(piece)
dq_node = dq_node.rhs
query = self.clone()
for field in dq_joins:
if isinstance(field, ForeignKeyField):
lm, rm = field.model_class, field.rel_model
field_obj = field
elif isinstance(field, ReverseRelationDescriptor):
lm, rm = field.field.rel_model, field.rel_model
field_obj = field.field
query = query.ensure_join(lm, rm, field_obj)
return query.where(dq_node)
def and_test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
return functools.reduce(operator.and_, [cls.interpret(nodelist[i]) for i in range(1,len(nodelist),2)])
def intersection(dfa1, dfa2):
return product(dfa1, operator.and_, dfa2)
def where(self, *expressions):
"""
Set the where clause
"""
self._where = reduce(operator.and_, expressions)
return self
def test_and_scalar(self):
self.check_array_scalar_op(operator.and_)
def test_rand_scalar(self):
self.check_array_scalar_op(operator.and_, swap=True)
def test_and_scalarzero(self):
self.check_array_scalarzero_op(operator.and_)
def test_rand_scalarzero(self):
self.check_array_scalarzero_op(operator.and_, swap=True)
def test_and_array(self):
self.check_array_array_op(operator.and_)
def index(**kwargs):
""" Get all tags
"""
where = [Q()]
if 'tagType' in kwargs:
where.append(Q(tagType__iexact=kwargs['tagType']))
where = reduce(operator.and_, where)
tags = Tag.objects.filter(where)
return [model_to_dict(tag) for tag in tags]
def get_defendant_to_reject(where=None):
if not where:
where = [Q()]
where = reduce(operator.and_, where)
defendants = Ticket.objects.filter(
where,
status='Open',
priority__in=TODO_TICKET_PRIORITY_FILTERS,
defendant__details__creationDate__lt=datetime.now() - timedelta(days=30)
).values_list('defendant', flat=True)
rejected = set([k for k, v in Counter(defendants).iteritems() if v > 1])
return rejected
def __generate_request_filter(filters):
""" Generates filters from filter query string
"""
where = [Q()]
if 'where' in filters and len(filters['where']):
keys = set(k for k in filters['where'])
if 'like' in keys:
for i in filters['where']['like']:
for key, val in i.iteritems():
field = key + '__icontains'
where.append(reduce(operator.or_, [Q(**{field: val[0]})]))
where = reduce(operator.and_, where)
else:
where = reduce(operator.and_, where)
return where
def __generate_request_filters(filters, user):
""" Generates filters base on filter query string
"""
# Add SearchService results if fulltext search
try:
for field in filters['where']['like']:
for key, value in field.iteritems():
if key == 'fulltext':
if ImplementationFactory.instance.is_implemented('SearchServiceBase'):
_add_search_filters(filters, value[0])
filters['where']['like'].remove({key: value})
break
except KeyError:
pass
# Generates Django query filter
where = [Q()]
if 'where' in filters and len(filters['where']):
keys = set(k for k in filters['where'])
if 'in' in keys:
for i in filters['where']['in']:
for key, val in i.iteritems():
field = reduce(lambda a, kv: a.replace(*kv), REPORT_FILTER_MAPPING, key)
where.append(reduce(operator.or_, [Q(**{field: i}) for i in val]))
if 'like' in keys:
like = []
for i in filters['where']['like']:
for key, val in i.iteritems():
field = reduce(lambda a, kv: a.replace(*kv), REPORT_FILTER_MAPPING, key)
field = field + '__icontains'
like.append(Q(**{field: val[0]}))
if len(like):
where.append(reduce(operator.or_, like))
else:
# All except closed
where.append(~Q(status='Archived'))
allowed = AbusePermission.objects.filter(user=user.id).values_list('category', flat=True)
where.append(Q(category__in=allowed))
where = reduce(operator.and_, where)
return where
def and_test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
items = [
cls.interpret(nodelist[i])
for i in range(1, len(nodelist), 2)
]
return functools.reduce(operator.and_, items)
def and_test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
items = [
cls.interpret(nodelist[i])
for i in range(1, len(nodelist), 2)
]
return functools.reduce(operator.and_, items)