def flavor_get(self, context, flavor_uuid):
query = model_query(context, models.Flavors).filter_by(
uuid=flavor_uuid)
if not context.is_admin:
query = query.filter_by(disabled=False)
the_filter = [models.Flavors.is_public == true()]
the_filter.extend([
models.Flavors.projects.has(project_id=context.project_id)
])
query = query.filter(or_(*the_filter))
try:
return query.one()
except NoResultFound:
raise exception.FlavorNotFound(
flavor_id=flavor_uuid)
python类true()的实例源码
def project_get_networks(context, project_id, associate=True):
# NOTE(tr3buchet): as before this function will associate
# a project with a network if it doesn't have one and
# associate is true
result = model_query(context, models.Network, read_deleted="no").\
filter_by(project_id=project_id).\
all()
if not result:
if not associate:
return []
return [network_associate(context, project_id)]
return result
###################
def flavor_get_all(self, context):
query = model_query(context, models.Flavors)
if not context.is_admin:
query = query.filter_by(disabled=False)
the_filter = [models.Flavors.is_public == true()]
the_filter.extend([
models.Flavors.projects.has(project_id=context.project_id)
])
query = query.filter(or_(*the_filter))
return query.all()
def has_property(self, prop):
property_granted_select = select(
[null()],
from_obj=[
Property.__table__,
PropertyGroup.__table__,
Membership.__table__
]
).where(
and_(
Property.name == prop,
Property.property_group_id == PropertyGroup.id,
PropertyGroup.id == Membership.group_id,
Membership.user_id == self.id,
Membership.active
)
)
#.cte("property_granted_select")
return and_(
not_(exists(
property_granted_select.where(
Property.granted == false())
)),
exists(
property_granted_select.where(
Property.granted == true()
)
)
)
def test_true_false(self):
self.assertEqual(self.compile(sql.false()), '0')
self.assertEqual(self.compile(sql.true()), '1')
def fixed_ip_disassociate_all_by_timeout(context, host, time):
# NOTE(vish): only update fixed ips that "belong" to this
# host; i.e. the network host or the instance
# host matches. Two queries necessary because
# join with update doesn't work.
host_filter = or_(and_(models.Instance.host == host,
models.Network.multi_host == true()),
models.Network.host == host)
result = model_query(context, models.FixedIp, (models.FixedIp.id,),
read_deleted="no").\
filter(models.FixedIp.allocated == false()).\
filter(models.FixedIp.updated_at < time).\
join((models.Network,
models.Network.id == models.FixedIp.network_id)).\
join((models.Instance,
models.Instance.uuid == models.FixedIp.instance_uuid)).\
filter(host_filter).\
all()
fixed_ip_ids = [fip[0] for fip in result]
if not fixed_ip_ids:
return 0
result = model_query(context, models.FixedIp).\
filter(models.FixedIp.id.in_(fixed_ip_ids)).\
update({'instance_uuid': None,
'leased': False,
'updated_at': timeutils.utcnow()},
synchronize_session='fetch')
return result
def _flavor_get_query(context, read_deleted=None):
query = model_query(context, models.InstanceTypes,
read_deleted=read_deleted).\
options(joinedload('extra_specs'))
if not context.is_admin:
the_filter = [models.InstanceTypes.is_public == true()]
the_filter.extend([
models.InstanceTypes.projects.any(project_id=context.project_id)
])
query = query.filter(or_(*the_filter))
return query
def _flavor_get_query_from_db(context):
query = context.session.query(api_models.Flavors).\
options(joinedload('extra_specs'))
if not context.is_admin:
the_filter = [api_models.Flavors.is_public == true()]
the_filter.extend([
api_models.Flavors.projects.any(project_id=context.project_id)
])
query = query.filter(or_(*the_filter))
return query