def migrate_activity_block_time(apps, schema_editor):
"""
Convert hours to Django time zone.
Activity block time is in UTC but once it is converted
to time we actually want time as it would be represented
in settings.TIME_ZONE
"""
current_tz = timezone(settings.TIME_ZONE)
ActivityBlock = apps.get_model('tracking', 'ActivityBlock')
for block in ActivityBlock.objects.all():
if block.to_datetime is not None:
block.to_datetime = block.to_datetime.astimezone(current_tz).replace(tzinfo=utc)
block.from_datetime = block.from_datetime.astimezone(current_tz).replace(tzinfo=utc)
block.save()
python类TIME_ZONE的实例源码
def talk_to_dict(talk):
return {
'title': talk.title,
'description': talk.description,
'category': str(talk.category),
'accepted': talk.accepted,
'confirmed': talk.confirmed,
'start_date': talk.start_date.astimezone(tz=pytz.timezone(settings.TIME_ZONE)) if talk.start_date else None,
'duration': talk.estimated_duration,
'track': str(talk.track) if talk.track else '',
'video': talk.video,
'speakers': list(map(speaker_to_dict, talk.speakers.all())),
}
def _as_ics(self, citymeo=False):
if not self.initialized:
self._lazy_init()
cal = iCalendar()
cal.add('prodid', '-//PonyConf.io//PonyConf//FR')
cal.add('version', '2.0')
cal.add('x-wr-calname', self.conference.name)
cal.add('x-wr-timezone', settings.TIME_ZONE)
cal.add('calscale', 'GREGORIAN')
talks = self.talks
if citymeo and talks.exists():
talks = talks.filter(start_date__gte=now()-timedelta(minutes=5))
if talks.exists():
limit = talks.first().start_date.replace(hour=23, minute=59, second=59)
talks = talks.filter(start_date__lte=limit)
for talk in talks:
event = iEvent()
event.add('dtstart', talk.start_date)
if not talk.end_date:
continue
event.add('dtend', talk.end_date)
event.add('dtstamp', talk.updated)
event.add('summary', talk.title)
if talk.room:
event.add('location', talk.room)
event.add('status', 'CONFIRMED' if talk.accepted else 'TENTATIVE')
if not citymeo:
event.add('description', talk.description)
event.add('uid', '%s/%s' % (self.site.domain, talk.id))
cal.add_component(event)
return cal.to_ical()
def handle(self, *args, **options):
SKIP_FIELDS = None #set(['n_tweets', 'diffusion', 'reply'])
TIME_BUCKET_SIZE = options.get('time_bucket', 60)
S = options.get('size', 100)
HOURS = options.get('hours', 2)
print(Tweet.objects.aggregate(maxdate=Max('datetime')))
TIME_RANGE = [Tweet.objects.aggregate(maxdate=Max('datetime'))['maxdate'] - timedelta(hours=HOURS),
Tweet.objects.aggregate(maxdate=Max('datetime'))['maxdate']]
TURNS = options.get('turns', 5)
TIME_ZONE = pytz.timezone(options.get('time_zone', settings.TIME_ZONE))
EXCLUDE_REPLIES = bool(options.get('exclude_replies', True))
EXCLUDE_RETWEETS = bool(options.get('exclude_retweets', False))
INFORMATIONAL_ONLY = bool(options.get('informational_only', True))
POST_TO_TWITTER = bool(options.get('post_timeline', False))
UPDATE_USERS = bool(options.get('update_users', False))
POPULAR_ONLY = bool(options.get('popular', False))
print(SKIP_FIELDS)
print(TIME_BUCKET_SIZE)
print(S)
print(TIME_RANGE)
print(TIME_ZONE)
print(EXCLUDE_REPLIES, EXCLUDE_RETWEETS)
print(INFORMATIONAL_ONLY)
print(HOURS)
print(POST_TO_TWITTER)
print(UPDATE_USERS)
api_keys = settings.TWITTER_USER_KEYS
auth = tweepy.OAuthHandler(api_keys['consumer_key'], api_keys['consumer_secret'])
auth.set_access_token(api_keys['access_token_key'], api_keys['access_token_secret'])
api = tweepy.API(auth)
generate_timeline(TIME_RANGE, skip_fields=SKIP_FIELDS, size=S,
sideline_turns=TURNS, time_bucket_size=TIME_BUCKET_SIZE,
time_zone=TIME_ZONE, twitter_api=api, exclude_replies=EXCLUDE_REPLIES,
exclude_retweets=EXCLUDE_RETWEETS, informational_only=INFORMATIONAL_ONLY,
update_users=UPDATE_USERS, post_to_twitter=POST_TO_TWITTER, retweeted_only=POPULAR_ONLY,
n_candidates=options.get('n_candidates', None))
def test_time_zone(self):
response = self.client.get(reverse('deviceinfo'), format="json")
self.assertTrue(response.data['server_timezone'], settings.TIME_ZONE)
def get(self, request, format=None):
info = {}
info['version'] = kolibri.__version__
status, urls = get_urls()
if not urls:
# Will not return anything when running the debug server, so at least return the current URL
urls = [request.build_absolute_uri('/')]
filtered_urls = [url for url in urls if '127.0.0.1' not in url and 'localhost' not in url]
if filtered_urls:
urls = filtered_urls
info['urls'] = urls
if settings.DATABASES['default']['ENGINE'].endswith('sqlite3'):
# If any other database backend, will not be file backed, so no database path to return
info['database_path'] = settings.DATABASES['default']['NAME']
instance_model = InstanceIDModel.get_or_create_current_instance()[0]
info['device_name'] = instance_model.hostname
info['device_id'] = instance_model.id
info['os'] = instance_model.platform
info['content_storage_free_space'] = get_free_space()
# This returns the localized time for the server
info['server_time'] = local_now()
# Returns the named timezone for the server (the time above only includes the offset)
info['server_timezone'] = settings.TIME_ZONE
return Response(info)
def get_for_closing(self, date=None):
"""
:param date: close from date
:type date: datetime.datetime
:return: Events
:rtype: queryset
"""
date = arrow.Arrow.strptime(
date, '%Y-%m-%d', settings.TIME_ZONE) if date else arrow.now()
date = date.floor('day').to('UTC').datetime
queryset = self.get_queryset()
return queryset.filter(
end__lt=date, paid__gte=F('total')).exclude(status='closed')
def __init__(self, *args, **kwargs):
super(BaseOpenRosaResponse, self).__init__(*args, **kwargs)
self[OPEN_ROSA_VERSION_HEADER] = OPEN_ROSA_VERSION
tz = pytz.timezone(settings.TIME_ZONE)
dt = datetime.now(tz).strftime('%a, %d %b %Y %H:%M:%S %Z')
self['Date'] = dt
self['X-OpenRosa-Accept-Content-Length'] = DEFAULT_CONTENT_LENGTH
self['Content-Type'] = DEFAULT_CONTENT_TYPE
def get_openrosa_headers(self):
tz = pytz.timezone(settings.TIME_ZONE)
dt = datetime.now(tz).strftime('%a, %d %b %Y %H:%M:%S %Z')
return {
'Date': dt,
'X-OpenRosa-Version': '1.0',
'X-OpenRosa-Accept-Content-Length': DEFAULT_CONTENT_LENGTH
}
def get_openrosa_headers(self, request, location=True):
tz = pytz.timezone(settings.TIME_ZONE)
dt = datetime.now(tz).strftime('%a, %d %b %Y %H:%M:%S %Z')
data = {
'Date': dt,
'X-OpenRosa-Version': '1.0',
'X-OpenRosa-Accept-Content-Length': DEFAULT_CONTENT_LENGTH
}
if location:
data['Location'] = request.build_absolute_uri(request.path)
return data
def test_get_not_utc(self):
self.assertEqual(settings.TIME_ZONE, 'America/Toronto')
self.test_get()
def process_request(self, request):
tz = request.session.get(settings.TIMEZONE_SESSION_KEY)
if not tz:
tz = settings.TIME_ZONE
timezone.activate(tz)
def time_settings(self, is_dst):
this_is_dst = is_dst
if settings.USE_TZ:
tz_setting = settings.TIME_ZONE
this_timezone = tz_setting
else:
this_timezone = 'Europe/Helsinki'
return this_is_dst, this_timezone
def timezone_name(self):
"""
Name of the time zone of the database connection.
"""
if not settings.USE_TZ:
return settings.TIME_ZONE
elif self.settings_dict['TIME_ZONE'] is None:
return 'UTC'
else:
return self.settings_dict['TIME_ZONE']
def check_settings(self):
if self.settings_dict['TIME_ZONE'] is not None:
if not settings.USE_TZ:
raise ImproperlyConfigured(
"Connection '%s' cannot set TIME_ZONE because USE_TZ is "
"False." % self.alias)
elif self.features.supports_timezones:
raise ImproperlyConfigured(
"Connection '%s' cannot set TIME_ZONE because its engine "
"handles time zones conversions natively." % self.alias)
elif pytz is None:
raise ImproperlyConfigured(
"Connection '%s' cannot set TIME_ZONE because pytz isn't "
"installed." % self.alias)
def get_default_timezone():
"""
Returns the default time zone as a tzinfo instance.
This is the time zone defined by settings.TIME_ZONE.
"""
if isinstance(settings.TIME_ZONE, six.string_types) and pytz is not None:
return pytz.timezone(settings.TIME_ZONE)
else:
# This relies on os.environ['TZ'] being set to settings.TIME_ZONE.
return LocalTimezone()
# This function exists for consistency with get_current_timezone_name
def deactivate():
"""
Unsets the time zone for the current thread.
Django will then use the time zone defined by settings.TIME_ZONE.
"""
if hasattr(_active, "value"):
del _active.value
def datetime_to_utc(date):
local = pytz.timezone(settings.TIME_ZONE)
local_dt = local.localize(date, is_dst=None)
return local_dt.astimezone(pytz.utc)
def check_tg(self, options):
tg_name1 = options['talkgroup1']
tg_name2 = options['talkgroup2']
try:
tg1 = TalkGroup.objects.get(slug=tg_name1)
except TalkGroup.DoesNotExist:
self.stdout.write(self.style.ERROR('Talk Group {} does not exist, make sure you are using the slug name'.format(tg_name1)))
sys.exit(2)
try:
tg2 = TalkGroup.objects.get(slug=tg_name2)
except TalkGroup.DoesNotExist:
self.stdout.write(self.style.ERROR('Talk Group {} does not exist, make sure you are using the slug name'.format(tg_name2)))
sys.exit(3)
last_tg1 = Transmission.objects.filter(talkgroup_info=tg1)[0]
last_tg2 = Transmission.objects.filter(talkgroup_info=tg2)[0]
local_tz = pytz.timezone(settings.TIME_ZONE)
settings_time_zone = local_tz
time1 = last_tg1.start_datetime.astimezone(local_tz)
time2 = last_tg2.start_datetime.astimezone(local_tz)
if time1 >= time2:
time_diff = time1 - time2
else:
time_diff = time2 - time1
max_diff = timezone.timedelta(minutes=options['minutes'])
self.stdout.write('Comparing Last Transmission {} {} to {} {} ({})'.format(tg1, time1, tg2, time2, time_diff))
if time_diff > max_diff:
self.stdout.write(self.style.ERROR('Too long between simulcast transmissions {}'.format(time_diff)))
sys.exit(1)
else:
self.stdout.write(self.style.SUCCESS('Good'))
def timezone_name(self):
"""
Name of the time zone of the database connection.
"""
if not settings.USE_TZ:
return settings.TIME_ZONE
elif self.settings_dict['TIME_ZONE'] is None:
return 'UTC'
else:
return self.settings_dict['TIME_ZONE']