def donations(request):
'''
actual data from the FEC, propublica data, through the django ORM cache
'''
data = {}
data["representatives"] = {d["id"]: d for d in
Representative.objects.all()
.annotate(name=Concat('first_name', Value(" "), 'last_name'))
.values("id", "name", "party")}
data["committees"] = {d["id"]: d for d in
SuperPAC.objects.all().values("id", "name")}
data["donations"] = list(Donation.objects.all()
.annotate(source=F("superpac_id"), destination=F("representative_id"))
.values("source", "destination", "support")
.annotate(amount=Sum("amount")))
return JsonResponse(data)
python类Concat()的实例源码
def _lecturas_del_periodo(anio, mes, quincena=None, region_id=None, funcion=Avg):
muestras = Muestra.objects.filter(anio=anio, mes=mes, aprobada=True)
if quincena:
muestras = muestras.filter(quincena=quincena)
lecturas = Lectura.objects.filter(muestra__in=muestras, precio__gt=0)
if region_id:
lecturas = lecturas.filter(
muestra__planilla_de_relevamiento__zona__jurisdiccion__region__pk=region_id)
lecturas = lecturas.annotate(orden=F("producto_con_marca__producto_generico__id"))\
.annotate(producto=F("producto_con_marca__producto_generico__nombre"))\
.annotate(comercio=F('muestra__planilla_de_relevamiento__comercio__nombre'))\
.values('comercio', 'producto')\
.annotate(valor=funcion('precio'))\
.annotate(c_p=Concat(F('muestra__planilla_de_relevamiento__comercio__nombre'),
F("producto_con_marca__producto_generico__nombre")))\
.order_by('orden', 'comercio')
return lecturas
def update_list_best_practice():
"""
Hits the DB once no matter how many objects.
NB:
- save() will not be called, and the related signals will not be sent.
- does not work with m2m relationships.
>>> UPDATE community SET name = (community.name + ' e.V')
WHERE community.name LIKE 'community%'
"""
communities = Community.objects.filter(name__startswith='community')
communities.update(name=Concat('name', Value(' e.V')))
## Bulk delete
##############
def get_queryset(self):
empty_str = ExpressionWrapper(V(''), output_field=CharField())
future_meeting = models.MeetingHistory.objects.latest('date')
return models.PresentHistory.objects.values(
date=F('meeting__date'),
presentation_type=F('present_type'),
presenter_name=F('presenter__name'),
present_content=F('content'),
).exclude(meeting__date=future_meeting.date).order_by().union(
models.MeetingSkip.objects.all().values(
'date',
presentation_type=Concat(V('Postponed: '), 'reason'),
presenter_name=empty_str,
present_content=empty_str,
).filter(date__lte=date.today()).order_by()
).order_by('-date')
def list_employees_search(request,name):
"""
parameters/returns:
employees: lista de objetos employee a los que tiene acceso el administrador (los que están en su empresa)
template: employee_list.html
"""
# Check that the current user has permissions
lista = get_list_for_role(request)
if name != "all_true":
#little opt just if not empty
lista=lista.annotate(
search_name=Concat('user__first_name', V(' '), 'user__last_name')
).filter(search_name__icontains=name)
employees = lista.filter(user__is_active=True)
return render(request, "employee/employee_search.html",
{"employees": employees})
def search(request):
'''
Searches through all available sources to match the user's query
'''
query = request.GET["query"]
## match the query against a zipcode regex, go a zipcode search if it matches
if re.match("^\d{5}$", query):
## here we call an external api to search for the reps via zipcode
results = SunlightAPI().rep_by_zip(query)
## loop through the results
reps = []
for rep in results["results"]:
## try finding the rep in our database
reps += (Representative.objects.all()
.annotate(name=Concat('first_name', Value(" "), 'last_name'))
.filter(name=rep["first_name"]+" "+rep["last_name"])
.values("id", "name", "party"))
## return the found reps
return JsonResponse({"representatives": reps, "committees": []})
data = {}
data["representatives"] = list(Representative.objects.all()
.annotate(name=Concat('first_name', Value(" "), 'last_name'))
.filter(name__icontains=query)
.values("id", "name", "party"))
data["committees"] = list(SuperPAC.objects.filter(name__icontains=query)
.values("id", "name"))
if "." in query or len(query) > 5:
results = SunlightAPI().search(query)
data["bills"] = results["results"]
return JsonResponse(data)
def get_queryset(self):
queryset = Report.objects.all()
queryset = queryset.annotate(
year=ExtractYear('date'), month=ExtractMonth('date')
)
queryset = queryset.values('year', 'month')
queryset = queryset.annotate(duration=Sum('duration'))
queryset = queryset.annotate(pk=Concat('year', Value('-'), 'month'))
return queryset
def get_queryset(self):
date = self._extract_date()
user = self.request.user
queryset = get_user_model().objects.values('id')
queryset = queryset.annotate(
date=Value(date, DateField()),
)
# last_reported_date filter is set, a date can only be calucated
# for users with either at least one absence or report
if date is None:
users_with_reports = Report.objects.values('user').distinct()
users_with_absences = Absence.objects.values('user').distinct()
active_users = users_with_reports.union(users_with_absences)
queryset = queryset.filter(id__in=active_users)
queryset = queryset.annotate(
pk=Concat(
'id',
Value('_'),
'date',
output_field=CharField()
)
)
if not user.is_superuser:
queryset = queryset.filter(
Q(id=user.id) | Q(supervisors=user)
)
return queryset
def get_queryset(self):
date = self._extract_date()
user = self._extract_user()
queryset = models.AbsenceType.objects.values('id')
queryset = queryset.annotate(
date=Value(date, DateField()),
)
queryset = queryset.annotate(
user=Value(user.id, IntegerField()),
)
queryset = queryset.annotate(
pk=Concat(
'user',
Value('_'),
'id',
Value('_'),
'date',
output_field=CharField()
)
)
# only myself, superuser and supervisors may see by absence balances
current_user = self.request.user
if not current_user.is_superuser:
if current_user.id != user.id:
if not current_user.supervisees.filter(id=user.id).exists():
return models.AbsenceType.objects.none()
return queryset
def get_queryset(self, request):
qs = super(CommentAdmin, self).get_queryset(request)
if django.VERSION >= (1, 8):
# Concat is not available in 1.7 and 1.6
qs = qs.annotate(author_sort=Concat('author_type', 'author_id', output_field=TextField()))
qs = qs.annotate(page_sort=Concat('page_type', 'page_id', output_field=TextField()))
return qs
def update_output(cls, execution_id, stdout, stderr):
"""
Append to stdout & stderr.
Use concatenation to efficiently update the fields.
"""
query = Execution.objects.filter(id=execution_id)
query.update(
stdout=functions.Concat('stdout', models.Value(stdout or '')),
stderr=functions.Concat('stderr', models.Value(stderr or ''))
)
def resolve_search(self, args, context, info):
term = args['q']
people = Person.objects.filter(name__icontains=term)
houses = House.objects.annotate(name=Concat(F('number'), Value(' '), F('street__name'), output_field=models.TextField())).filter(name__icontains=term)
streets = Street.objects.filter(name__icontains=term)
return itertools.chain(people, houses, streets)
candidates.py 文件源码
项目:django-calaccess-processed-data
作者: california-civic-data-coalition
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def match_form501s_by_name(self):
"""
Get Form501Filings with names that match the scraped candidate's name.
Returns a Form501Filing QuerySet.
"""
from calaccess_processed.models import Form501Filing
election_data = self.election_proxy.parsed_name
office_data = self.parse_office_name()
q = Form501Filing.objects.filter(
office__iexact=office_data['type'],
district=office_data['district'],
election_year__lte=election_data['year'],
)
# first, try "<last_name>, <first_name> <middle_name>" format
last_first_middle_q = q.annotate(
full_name=Concat(
'last_name',
Value(', '),
'first_name',
Value(' '),
'middle_name',
output_field=CharField(),
),
).filter(full_name=self.name)
# limit scope to election_type if it will yield results
if last_first_middle_q.filter(election_type=election_data['type']).exists():
q = last_first_middle_q.filter(election_type=election_data['type'])
# if not, check if dropping election_type filter yields results
elif last_first_middle_q.exists():
q = last_first_middle_q
# if not, change name format
else:
last_first_q = q.annotate(
full_name=Concat(
'last_name',
Value(', '),
'first_name',
output_field=CharField(),
),
).filter(full_name=self.name)
# again, limit to election_type at first
if last_first_q.filter(election_type=election_data['type']).exists():
q = last_first_q.filter(election_type=election_data['type'])
else:
q = last_first_q
return q
def test_annotation_is_perserved():
class ReporterType(DjangoObjectType):
full_name = String()
def resolve_full_name(instance, info, **args):
return instance.full_name
class Meta:
model = Reporter
interfaces = (Node, )
filter_fields = ()
class Query(ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterType)
def resolve_all_reporters(self, info, **args):
return Reporter.objects.annotate(
full_name=Concat('first_name', Value(' '), 'last_name', output_field=TextField())
)
Reporter.objects.create(
first_name='John',
last_name='Doe',
)
schema = Schema(query=Query)
query = '''
query NodeFilteringQuery {
allReporters(first: 1) {
edges {
node {
fullName
}
}
}
}
'''
expected = {
'allReporters': {
'edges': [{
'node': {
'fullName': 'John Doe',
}
}]
}
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected