def get_by_user_id_type( cls, user_id, type ):
return cls.query( ndb.AND( cls.user_id == user_id, cls.type == type )).get( )
python类AND的实例源码
def count_by_user_id_type( cls, user_id, type ):
return cls.query( ndb.AND( cls.user_id == user_id, cls.type == type )).count()
def fetch_by_user_id_type( cls, user_id, type ):
return cls.query( ndb.AND( cls.user_id == user_id, cls.type == type )).order( -cls.time_created ).fetch()
def fetch_keys_by_user_id_type( cls, user_id, type ):
return cls.query( ndb.AND( cls.user_id == user_id, cls.type == type )).fetch( keys_only = True )
def fetch_keys_by_user_id_except_type( cls, user_id, type ):
return cls.query( ndb.AND( cls.user_id == user_id, cls.type != type )).fetch( keys_only = True )
def fetch_keys_by_email_type( cls, email, type ):
return cls.query( ndb.AND( cls.email == email, cls.type == type )).fetch( keys_only = True )
def exist_by_user_id_token( cls, user_id, token ):
count = cls.query( ndb.AND( cls.user_id == user_id, cls.token == token )).count( 1 )
return count > 0
def fetch_keys_old_tokens_by_types( cls, days_old, types ):
return cls.query( ndb.AND( cls.type.IN( types ) , cls.time_created <= ( datetime.datetime.now( ) - datetime.timedelta( days = days_old )))).fetch( keys_only = True )
#=== UTILITIES ================================================================
def get_by_token_type( cls, token, type, retry = 0 ):
entity = cls.query(ndb.AND(cls.token == token, cls.type == type)).get()
if retry and not entity:
timeout = cls.TIMEOUT_S
for i in range(retry):
entity = cls.query(ndb.AND(cls.token == token, cls.type == type)).get()
if entity:
break
else:
time.sleep(timeout)
timeout *= 2
return entity
def delete_by_user_id_token( cls, user_id, token ):
key = cls.query(ndb.AND(cls.user_id == user_id, cls.token == token)).fetch(keys_only = True)
if key:
key[ 0 ].delete()
return True
return False
def get_by_user_id_token_valid_age( cls, user_id, token ):
return cls.query( ndb.AND( cls.user_id == user_id, cls.token == token,
cls.time_created > ( datetime.datetime.now() - datetime.timedelta( minutes = cls.MAX_AGE )))).get()
def get_previous_meetings(subscription, cooldown=None):
if cooldown is None:
cooldown = get_config()['meeting_cooldown_weeks']
meetings = defaultdict(list)
# get all meeting specs from x weeks ago til now
time_threshold_for_meetings = datetime.now() - timedelta(weeks=cooldown)
meeting_spec_keys = [
spec.key for spec in MeetingSpec.query(
ndb.AND(MeetingSpec.datetime > time_threshold_for_meetings,
MeetingSpec.meeting_subscription == subscription)
).fetch()
]
logging.info('Previous Meeting History: ')
logging.info([meeting.get().datetime.strftime("%Y-%m-%d %H:%M") for meeting in meeting_spec_keys])
if meeting_spec_keys == []:
return set([])
# get all meetings from meeting specs
meeting_keys = [meeting.key for meeting in Meeting.query().filter(
Meeting.meeting_spec.IN(meeting_spec_keys)).fetch()]
if meeting_keys == []:
return set([])
# get all participants from meetings
participants = MeetingParticipant.query().filter(
MeetingParticipant.meeting.IN(meeting_keys)
).fetch()
if participants == []:
return set([])
# group by meeting Id
for participant in participants:
meetings[participant.meeting.id()].append(participant.user)
# ids are sorted, all matches should be in increasing order by id for the matching algorithm to work
disallowed_meetings = set([tuple(sorted(meeting, key=lambda Key: Key.id())) for meeting in meetings.values()])
logging.info('Past Meetings')
logging.info([tuple([meeting.get().get_username() for meeting in meeting]) for meeting in disallowed_meetings])
disallowed_meetings = {tuple([meeting.id() for meeting in meeting]) for meeting in disallowed_meetings}
return disallowed_meetings