def set_integer(instance, field, min_value=-2**31, max_value=2**31 - 1):
""" IntegerField """
possibles = []
possibles.extend(get_field_choices(field))
if possibles:
setattr(instance, field.name, random.choice(possibles))
return
for validator in field.validators:
if isinstance(validator, MinValueValidator):
min_value = max(min_value, validator.limit_value)
if isinstance(validator, MaxValueValidator):
max_value = min(max_value, validator.limit_value)
value = random.randint(min_value, max_value)
setattr(instance, field.name, value)
python类IntegerField()的实例源码
def get_django_field_map(self):
from django.db.models import fields as djf
return [
(djf.AutoField, PrimaryKeyField),
(djf.BigIntegerField, BigIntegerField),
# (djf.BinaryField, BlobField),
(djf.BooleanField, BooleanField),
(djf.CharField, CharField),
(djf.DateTimeField, DateTimeField), # Extends DateField.
(djf.DateField, DateField),
(djf.DecimalField, DecimalField),
(djf.FilePathField, CharField),
(djf.FloatField, FloatField),
(djf.IntegerField, IntegerField),
(djf.NullBooleanField, partial(BooleanField, null=True)),
(djf.TextField, TextField),
(djf.TimeField, TimeField),
(djf.related.ForeignKey, ForeignKeyField),
]
def get_django_field_map(self):
from django.db.models import fields as djf
return [
(djf.AutoField, PrimaryKeyField),
(djf.BigIntegerField, BigIntegerField),
# (djf.BinaryField, BlobField),
(djf.BooleanField, BooleanField),
(djf.CharField, CharField),
(djf.DateTimeField, DateTimeField), # Extends DateField.
(djf.DateField, DateField),
(djf.DecimalField, DecimalField),
(djf.FilePathField, CharField),
(djf.FloatField, FloatField),
(djf.IntegerField, IntegerField),
(djf.NullBooleanField, partial(BooleanField, null=True)),
(djf.TextField, TextField),
(djf.TimeField, TimeField),
(djf.related.ForeignKey, ForeignKeyField),
]
def get_django_field_map(self):
from django.db.models import fields as djf
return [
(djf.AutoField, PrimaryKeyField),
(djf.BigIntegerField, BigIntegerField),
# (djf.BinaryField, BlobField),
(djf.BooleanField, BooleanField),
(djf.CharField, CharField),
(djf.DateTimeField, DateTimeField), # Extends DateField.
(djf.DateField, DateField),
(djf.DecimalField, DecimalField),
(djf.FilePathField, CharField),
(djf.FloatField, FloatField),
(djf.IntegerField, IntegerField),
(djf.NullBooleanField, partial(BooleanField, null=True)),
(djf.TextField, TextField),
(djf.TimeField, TimeField),
(djf.related.ForeignKey, ForeignKeyField),
]
def __init__(self, expression, distinct=False, **extra):
if expression == '*':
expression = Star()
super(Count, self).__init__(
expression, distinct='DISTINCT ' if distinct else '', output_field=IntegerField(), **extra)
def _ordinal_aggregate_field(self):
return IntegerField()
def filter_fields_to_type(klass, query_dict):
reserved_fields = ['order_by', 'format', 'limit', 'offset']
q = QuerySet(klass)
query = dict(query_dict)
fields = {}
for field in q.model._meta.fields:
fields[field.column] = field
# Remove the reserved fields we know about.
for field in query.keys():
if field in reserved_fields:
del query[field]
# This will error if it find an unknown field and cause the standard tasty pie query to run.
for field in query.keys():
try:
field_type = type(fields[field])
value = query[field]
if field_type == django_fields.AutoField or field_type == django_fields.IntegerField:
value = int(value)
elif field_type == django_fields.BooleanField:
value = (value.lower() == 'true')
query[field] = value
except KeyError:
pass
return query
# monkey-patch ResourceOptions to have a default-empty readonly list
def _resolve_output_field(self):
source_field = self.get_source_fields()[0]
if isinstance(source_field, (IntegerField, DecimalField)):
self._output_field = FloatField()
super(Avg, self)._resolve_output_field()
def __init__(self, expression, distinct=False, **extra):
if expression == '*':
expression = Star()
super(Count, self).__init__(
expression, distinct='DISTINCT ' if distinct else '', output_field=IntegerField(), **extra)
def __init__(self, expression, **extra):
output_field = extra.pop('output_field', fields.IntegerField())
super(Length, self).__init__(expression, output_field=output_field, **extra)
def _resolve_output_field(self):
source_field = self.get_source_fields()[0]
if isinstance(source_field, (IntegerField, DecimalField)):
self._output_field = FloatField()
super(Avg, self)._resolve_output_field()
def __init__(self, expression, distinct=False, **extra):
if expression == '*':
expression = Star()
super(Count, self).__init__(
expression, distinct='DISTINCT ' if distinct else '', output_field=IntegerField(), **extra)
def __init__(self, expression, **extra):
output_field = extra.pop('output_field', fields.IntegerField())
super(Length, self).__init__(expression, output_field=output_field, **extra)
def __init__(self, expression, distinct=False, **extra):
if expression == '*':
expression = Star()
super(Count, self).__init__(
expression, distinct='DISTINCT ' if distinct else '', output_field=IntegerField(), **extra)
def _ordinal_aggregate_field(self):
return IntegerField()
def __init__(self, expression, distinct=False, **extra):
if expression == '*':
expression = Star()
super(Count, self).__init__(
expression, distinct='DISTINCT ' if distinct else '', output_field=IntegerField(), **extra)
def _ordinal_aggregate_field(self):
return IntegerField()
def _resolve_output_field(self):
source_field = self.get_source_fields()[0]
if isinstance(source_field, (IntegerField, DecimalField)):
self._output_field = FloatField()
super(Avg, self)._resolve_output_field()
def __init__(self, expression, distinct=False, **extra):
if expression == '*':
expression = Star()
super(Count, self).__init__(
expression, distinct='DISTINCT ' if distinct else '', output_field=IntegerField(), **extra)
def __init__(self, expression, **extra):
output_field = extra.pop('output_field', fields.IntegerField())
super(Length, self).__init__(expression, output_field=output_field, **extra)
def __init__(self, expression, distinct=False, **extra):
if expression == '*':
expression = Value(expression)
super(Count, self).__init__(
expression, distinct='DISTINCT ' if distinct else '', output_field=IntegerField(), **extra)
def _ordinal_aggregate_field(self):
return IntegerField()
def handle(self, *args, **options):
"""
Send new message notifications
"""
# command to run: python manage.py tunga_send_customer_emails
min_date = datetime.datetime.utcnow() - relativedelta(minutes=1) # 5 minute window to read new messages
customer_channels = Channel.objects.filter(
type=CHANNEL_TYPE_SUPPORT,
created_by__isnull=True,
content_type=ContentType.objects.get_for_model(Inquirer)
).annotate(new_messages=Sum(
Case(
When(
~Q(action_targets__actor_content_type=F('content_type')) &
Q(action_targets__gt=F('last_read')) &
Q(action_targets__timestamp__lte=min_date) &
Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]),
then=1
),
default=0,
output_field=IntegerField()
)
), latest_message=Max('action_targets__id')).filter(new_messages__gt=0)
for channel in customer_channels:
customer = channel.content_object
if customer.email:
activities = Action.objects.filter(
channels=channel, id__gt=channel.last_read, verb__in=[verbs.SEND]
).order_by('id')
messages = [activity.action_object for activity in activities]
if messages:
to = [customer.email]
subject = "[Tunga Support] Help"
ctx = {
'customer': customer,
'count': channel.new_messages,
'messages': messages,
'channel_url': '%s/customer/help/%s/' % (TUNGA_URL, channel.id)
}
if send_mail(subject, 'tunga/email/unread_help_messages', to, ctx):
channel.last_read = channel.latest_message
channel.save()
def handle(self, *args, **options):
"""
Send new message notifications
"""
# command to run: python manage.py tunga_send_message_emails
utc_now = datetime.datetime.utcnow()
min_date = utc_now - relativedelta(minutes=15) # 15 minute window to read new messages
min_last_email_date = utc_now - relativedelta(hours=3) # Limit to 1 email every 3 hours per channel
commission_date = parse('2016-08-08 00:00:00') # Don't notify about events before the commissioning date
user_channels = ChannelUser.objects.filter(
Q(last_email_at__isnull=True) |
Q(last_email_at__lt=min_last_email_date)
).annotate(new_messages=Sum(
Case(
When(
~Q(channel__action_targets__actor_object_id=F('user_id')) &
Q(channel__action_targets__gt=F('last_read')) &
Q(channel__action_targets__timestamp__lte=min_date) &
Q(channel__action_targets__timestamp__gte=commission_date) &
(Q(last_email_at__isnull=True) | Q(channel__action_targets__timestamp__gt=F('last_email_at'))) &
Q(channel__action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]),
then=1
),
default=0,
output_field=IntegerField()
)
)).filter(new_messages__gt=0)
for user_channel in user_channels:
channel_name = user_channel.channel.get_channel_display_name(user_channel.user)
to = [user_channel.user.email]
if user_channel.channel.type == CHANNEL_TYPE_DIRECT:
conversation_subject = "New message{} from {}".format(
user_channel.new_messages == 1 and '' or 's',
channel_name
)
else:
conversation_subject = "Conversation: {}".format(channel_name)
subject = conversation_subject
ctx = {
'receiver': user_channel.user,
'new_messages': user_channel.new_messages,
'channel_name': channel_name,
'channel': user_channel.channel,
'channel_url': '%s/conversation/%s/' % (TUNGA_URL, user_channel.channel.id)
}
if send_mail(subject, 'tunga/email/unread_channel_messages', to, ctx):
user_channel.last_email_at = datetime.datetime.utcnow()
user_channel.save()
def overview(request, event_url_name):
event = get_object_or_404(Event, url_name=event_url_name)
# permission
if not event.is_admin(request.user):
return nopermission(request)
num_helpers = event.helper_set.count()
num_coordinators = 0
timeline = {}
for helper in event.helper_set.all():
if helper.is_coordinator:
num_coordinators += 1
else:
day = helper.timestamp.strftime('%Y-%m-%d')
if day in timeline:
timeline[day] += 1
else:
timeline[day] = 1
num_vegetarians = event.helper_set.filter(vegetarian=True).count()
num_shift_slots = Shift.objects.filter(job__event=event).aggregate(
Sum('number'))['number__sum']
empty_slots_expr = ExpressionWrapper(F('number') - F('num_helpers'),
output_field=fields.IntegerField())
num_empty_shift_slots = Shift.objects.filter(job__event=event) \
.annotate(num_helpers=Count('helper')) \
.annotate(empty_slots=empty_slots_expr) \
.aggregate(Sum('empty_slots'))['empty_slots__sum']
total_duration = ExpressionWrapper((F('end') - F('begin')) * F('number'),
output_field=fields.DurationField())
hours_total = Shift.objects.filter(job__event=event) \
.annotate(duration=total_duration) \
.aggregate(Sum('duration'))['duration__sum']
# sum up timeline
timeline = OrderedDict(sorted(timeline.items()))
timeline_sum = OrderedDict()
tmp = 0
for day in timeline:
tmp += timeline[day]
timeline_sum[day] = tmp
# render
context = {'event': event,
'num_helpers': num_helpers,
'num_coordinators': num_coordinators,
'num_vegetarians': num_vegetarians,
'num_shift_slots': num_shift_slots,
'num_empty_shift_slots': num_empty_shift_slots,
'hours_total': hours_total,
'timeline': timeline_sum}
return render(request, 'statistic/overview.html', context)