def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
python类Avg()的实例源码
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def range_statistics(start, end):
""" Returns the statistics (totals) for a target date. Its month will be used. """
return DayStatistics.objects.filter(day__gte=start, day__lt=end).aggregate(
total_cost=Sum('total_cost'),
electricity1=Sum('electricity1'),
electricity1_cost=Sum('electricity1_cost'),
electricity1_returned=Sum('electricity1_returned'),
electricity2=Sum('electricity2'),
electricity2_cost=Sum('electricity2_cost'),
electricity2_returned=Sum('electricity2_returned'),
electricity_merged=Sum(models.F('electricity1') + models.F('electricity2')),
electricity_cost_merged=Sum(models.F('electricity1_cost') + models.F('electricity2_cost')),
electricity_returned_merged=Sum(models.F('electricity1_returned') + models.F('electricity2_returned')),
gas=Sum('gas'),
gas_cost=Sum('gas_cost'),
temperature_min=Min('lowest_temperature'),
temperature_max=Max('highest_temperature'),
temperature_avg=Avg('average_temperature'),
)
def initialize_side_positioning(side: BattleSide):
formation_settings = side.get_formation()
if formation_settings.formation == BattleFormation.LINE:
formation = LineFormation(side, formation_settings)
else:
raise Exception(
"Formation {} not known".format(formation_settings.formation)
)
formation.make_formation()
for coords, contub in formation.output_formation():
contub.x_offset_to_formation = coords.x
contub.z_offset_to_formation = coords.z
contub.starting_x_pos = coords.x if side.z else -coords.x
contub.starting_z_pos = coords.z + 10 if side.z else -coords.z - 10
contub.save()
for unit in side.battleunit_set.all()\
.annotate(avg_x=Avg('battlecontubernium__starting_x_pos'))\
.annotate(avg_z=Avg('battlecontubernium__starting_z_pos')):
unit.starting_x_pos = math.floor(unit.avg_x)
unit.starting_z_pos = math.floor(unit.avg_z)
unit.save()
set_contubernia_starting_pos(unit)
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
try:
output_field = expression.input_field.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev and Variance aggregations '
'on date/time fields in sqlite3 '
'since date/time is saved as text.')
except FieldError:
# not every sub-expression has an output_field which is fine to
# ignore
pass
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
try:
output_field = expression.input_field.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev and Variance aggregations '
'on date/time fields in sqlite3 '
'since date/time is saved as text.')
except FieldError:
# not every sub-expression has an output_field which is fine to
# ignore
pass
def average_consumption_by_hour(max_weeks_ago):
""" Calculates the average consumption by hour. Measured over all consumption data of the past X months. """
sql_extra = {
# Ugly engine check, but still beter than iterating over a hundred thousand items in code.
'postgresql': "date_part('hour', hour_start)",
'sqlite': "strftime('%H', hour_start)",
'mysql': "extract(hour from hour_start)",
}[connection.vendor]
# Only PostgreSQL supports this builtin.
set_time_zone_sql = connection.ops.set_time_zone_sql()
if set_time_zone_sql:
connection.connection.cursor().execute(set_time_zone_sql, [settings.TIME_ZONE]) # pragma: no cover
hour_statistics = HourStatistics.objects.filter(
# This greatly helps reducing the queryset size, but also makes it more relevant.
hour_start__gt=timezone.now() - timezone.timedelta(weeks=max_weeks_ago)
).extra({
'hour_start': sql_extra
}).values('hour_start').order_by('hour_start').annotate(
avg_electricity1=Avg('electricity1'),
avg_electricity2=Avg('electricity2'),
avg_electricity1_returned=Avg('electricity1_returned'),
avg_electricity2_returned=Avg('electricity2_returned'),
avg_electricity_merged=Avg(models.F('electricity1') + models.F('electricity2')),
avg_electricity_returned_merged=Avg(models.F('electricity1_returned') + models.F('electricity2_returned')),
avg_gas=Avg('gas'),
)
# Force evaluation, as we want to reset timezone in cursor below.
hour_statistics = list(hour_statistics)
if set_time_zone_sql:
# Prevents "database connection isn't set to UTC" error.
connection.connection.cursor().execute(set_time_zone_sql, ['UTC']) # pragma: no cover
return hour_statistics
def update_pos(self):
aggregation = self.battlecontuberniuminturn_set.all().aggregate(
avg_x=Avg('x_pos'),
avg_z=Avg('z_pos')
)
self.x_pos = math.floor(aggregation['avg_x'])
self.z_pos = math.floor(aggregation['avg_z'])
self.save()
def get_satisfaction(self, obj):
score = None
if obj.type == USER_TYPE_DEVELOPER:
score = obj.participation_set.filter(
task__closed=True, status=STATUS_ACCEPTED
).aggregate(satisfaction=Avg('task__satisfaction'))['satisfaction']
if score:
score = '{:0,.0f}%'.format(score*10)
return score
def get_ratings(self, obj):
score = None
if obj.type == USER_TYPE_DEVELOPER:
query = Rating.objects.filter(
tasks__closed=True, tasks__participants=obj, tasks__participation__status=STATUS_ACCEPTED
).order_by('criteria')
details = query.values('criteria').annotate(avg=Avg('score'))
criteria_choices = dict(Rating._meta.get_field('criteria').flatchoices)
for rating in details:
rating['display_criteria'] = criteria_choices[rating['criteria']]
rating['display_avg'] = rating['avg'] and '{:0,.0f}%'.format(rating['avg']*10)
avg = query.aggregate(avg=Avg('score'))['avg']
score = {'avg': avg, 'display_avg': avg and '{:0,.0f}%'.format(avg*10) or None, 'details': details}
return score
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
try:
output_field = expression.input_field.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev and Variance aggregations '
'on date/time fields in sqlite3 '
'since date/time is saved as text.')
except FieldError:
# not every sub-expression has an output_field which is fine to
# ignore
pass
def add_unit_to_battle_in_progress(
battle_organization: BattleOrganization,
world_unit: WorldUnit
):
battle = battle_organization.side.battle
if world_unit.owner_character:
battle_character = BattleCharacter.objects.get_or_create(
battle_organization=battle_organization,
character=world_unit.owner_character,
present_in_battle=(
world_unit.owner_character.location.tile == world_unit.location.tile
)
)[0]
battle_character_in_turn = BattleCharacterInTurn.objects.get_or_create(
battle_character=battle_character,
battle_turn=battle.get_latest_turn()
)
else:
battle_character = None
battle_unit = BattleUnit.objects.create(
battle_organization=battle_organization,
owner=battle_character,
world_unit=world_unit,
starting_manpower=world_unit.get_fighting_soldiers().count(),
battle_side=battle_organization.side,
name=world_unit.name,
type=world_unit.type
)
create_contubernia(battle_unit)
position_generator = joining_contubernium_position_generator()
for contub in battle_unit.battlecontubernium_set.all():
coords = next(position_generator)
while battle.get_latest_turn().get_contubernium_in_position(coords) is not None:
coords = next(position_generator)
contub.x_offset_to_formation = coords.x
contub.z_offset_to_formation = coords.z
contub.starting_x_pos = coords.x if battle_organization.side.z else -coords.x
contub.starting_z_pos = coords.z + 10 if battle_organization.side.z else -coords.z - 10
contub.save()
battle_unit.starting_x_pos = math.floor(battle_unit.battlecontubernium_set.all().aggregate(Avg('starting_x_pos'))['starting_x_pos__avg'])
battle_unit.starting_z_pos = math.floor(battle_unit.battlecontubernium_set.all().aggregate(Avg('starting_z_pos'))['starting_z_pos__avg'])
battle_unit.save()
set_contubernia_starting_pos(battle_unit)