def keyword_split(keywords):
"""
Return all the keywords in a keyword string.
Keeps keywords surrounded by quotes together, removing the surrounding quotes:
>>> keyword_split('Hello I\\'m looking for "something special"')
['Hello', "I'm", 'looking', 'for', 'something special']
Nested quoted strings are returned as is:
>>> keyword_split("He said \\"I'm looking for 'something special'\\" so I've given him the 'special item'")
['He', 'said', "I'm looking for 'something special'", 'so', "I've", 'given', 'him', 'the', 'special item']
"""
matches = re.findall(r'"([^"]+)"|\'([^\']+)\'|(\S+)', keywords)
return [match[0] or match[1] or match[2] for match in matches]
python类all()的实例源码
def close_db_connections(func, *args, **kwargs):
"""
Decorator to explicitly close db connections during threaded execution
Note this is necessary to work around:
https://code.djangoproject.com/ticket/22420
"""
def _close_db_connections(*args, **kwargs):
ret = None
try:
ret = func(*args, **kwargs)
finally:
from django.db import connections
for conn in connections.all():
conn.close()
return ret
return _close_db_connections
def _etcd_publish_config(**kwargs):
config = kwargs['instance']
# we purge all existing config when adding the newest instance. This is because
# deis config:unset would remove an existing value, but not delete the
# old config object
try:
_etcd_client.delete('/deis/config/{}'.format(config.app),
prevExist=True, dir=True, recursive=True)
except KeyError:
pass
for k, v in config.values.iteritems():
_etcd_client.write(
'/deis/config/{}/{}'.format(
config.app,
unicode(k).encode('utf-8').lower()),
unicode(v).encode('utf-8'))
def close_db_connections(func, *args, **kwargs):
"""
Decorator to explicitly close db connections during threaded execution
Note this is necessary to work around:
https://code.djangoproject.com/ticket/22420
"""
def _close_db_connections(*args, **kwargs):
ret = None
try:
ret = func(*args, **kwargs)
finally:
from django.db import connections
for conn in connections.all():
conn.close()
return ret
return _close_db_connections
def _etcd_publish_config(**kwargs):
config = kwargs['instance']
# we purge all existing config when adding the newest instance. This is because
# deis config:unset would remove an existing value, but not delete the
# old config object
try:
_etcd_client.delete('/deis/config/{}'.format(config.app),
prevExist=True, dir=True, recursive=True)
except KeyError:
pass
for k, v in config.values.iteritems():
_etcd_client.write(
'/deis/config/{}/{}'.format(
config.app,
unicode(k).encode('utf-8').lower()),
unicode(v).encode('utf-8'))
def update_connections_time_zone(**kwargs):
if kwargs['setting'] == 'TIME_ZONE':
# Reset process time zone
if hasattr(time, 'tzset'):
if kwargs['value']:
os.environ['TZ'] = kwargs['value']
else:
os.environ.pop('TZ', None)
time.tzset()
# Reset local time zone cache
timezone.get_default_timezone.cache_clear()
# Reset the database connections' time zone
if kwargs['setting'] in {'TIME_ZONE', 'USE_TZ'}:
for conn in connections.all():
try:
del conn.timezone
except AttributeError:
pass
try:
del conn.timezone_name
except AttributeError:
pass
conn.ensure_timezone()
def update_connections_time_zone(**kwargs):
if kwargs['setting'] == 'TIME_ZONE':
# Reset process time zone
if hasattr(time, 'tzset'):
if kwargs['value']:
os.environ['TZ'] = kwargs['value']
else:
os.environ.pop('TZ', None)
time.tzset()
# Reset local time zone cache
timezone.get_default_timezone.cache_clear()
# Reset the database connections' time zone
if kwargs['setting'] in {'TIME_ZONE', 'USE_TZ'}:
for conn in connections.all():
try:
del conn.timezone
except AttributeError:
pass
try:
del conn.timezone_name
except AttributeError:
pass
conn.ensure_timezone()
def get_postgresql_connections():
return [connection for connection in connections.all()
if connection.vendor == 'postgresql']
# Reduce any iterable to a single value using a logical OR e.g. (a | b | ...)
def get_descendant_models(model):
"""
Returns all descendants of a model, including the model itself.
"""
descendant_models = {other_model for other_model in apps.get_models()
if issubclass(other_model, model)}
descendant_models.add(model)
return descendant_models
def force_debug_cursor():
for conn in connections.all():
conn._cavalry_old_force_debug_cursor = conn.force_debug_cursor
conn.force_debug_cursor = True
yield
for conn in connections.all():
conn.force_debug_cursor = conn._cavalry_old_force_debug_cursor
def _process(request, get_response):
with force_debug_cursor(), managed(
db_record_stacks=getattr(settings, 'CAVALRY_DB_RECORD_STACKS', True),
) as data:
data['start_time'] = get_time()
response = get_response(request)
if isinstance(response, SimpleTemplateResponse):
response.render()
data['end_time'] = get_time()
data['duration'] = data['end_time'] - data['start_time']
data['databases'] = {}
for conn in connections.all():
queries = conn.queries
data['databases'][conn.alias] = {
'queries': queries,
'n_queries': len(queries),
'time': (sum(q.get('hrtime', 0) * 1000 for q in queries) if queries else 0),
}
inject_stats(request, response, data)
post_stats_kwargs = {'request': request, 'response': response, 'data': data}
if getattr(settings, 'CAVALRY_THREADED_POST', False):
Thread(name='cavalry poster', target=post_stats, kwargs=post_stats_kwargs, daemon=False).start()
else:
post_stats(**post_stats_kwargs)
return response
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if (db.settings_dict['ATOMIC_REQUESTS']
and db.alias not in non_atomic_requests):
view = transaction.atomic(using=db.alias)(view)
return view
def update_connections_time_zone(**kwargs):
if kwargs['setting'] == 'TIME_ZONE':
# Reset process time zone
if hasattr(time, 'tzset'):
if kwargs['value']:
os.environ['TZ'] = kwargs['value']
else:
os.environ.pop('TZ', None)
time.tzset()
# Reset local time zone cache
timezone.get_default_timezone.cache_clear()
# Reset the database connections' time zone
if kwargs['setting'] in {'TIME_ZONE', 'USE_TZ'}:
for conn in connections.all():
try:
del conn.timezone
except AttributeError:
pass
try:
del conn.timezone_name
except AttributeError:
pass
tz_sql = conn.ops.set_time_zone_sql()
if tz_sql and conn.timezone_name:
with conn.cursor() as cursor:
cursor.execute(tz_sql, [conn.timezone_name])
def _databases_names(cls, include_mirrors=True):
# If the test case has a multi_db=True flag, act on all databases,
# including mirrors or not. Otherwise, just on the default DB.
if getattr(cls, 'multi_db', False):
return [alias for alias in connections
if include_mirrors or not connections[alias].settings_dict['TEST']['MIRROR']]
else:
return [DEFAULT_DB_ALIAS]
def _post_teardown(self):
"""Performs any post-test things. This includes:
* Flushing the contents of the database, to leave a clean slate. If
the class has an 'available_apps' attribute, post_migrate isn't fired.
* Force-closing the connection, so the next test gets a clean cursor.
"""
try:
self._fixture_teardown()
super(TransactionTestCase, self)._post_teardown()
if self._should_reload_connections():
# Some DB cursors include SQL statements as part of cursor
# creation. If you have a test that does a rollback, the effect
# of these statements is lost, which can affect the operation of
# tests (e.g., losing a timezone setting causing objects to be
# created with the wrong time). To make sure this doesn't
# happen, get a clean connection at the start of every test.
for conn in connections.all():
conn.close()
finally:
if self.available_apps is not None:
apps.unset_available_apps()
setting_changed.send(sender=settings._wrapped.__class__,
setting='INSTALLED_APPS',
value=settings.INSTALLED_APPS,
enter=False)
def tearDownClass(cls):
if connections_support_transactions():
cls._rollback_atomics(cls.cls_atomics)
for conn in connections.all():
conn.close()
super(TestCase, cls).tearDownClass()
def skipUnlessDBFeature(*features):
"""
Skip a test unless a database has all the named features.
"""
return _deferredSkip(
lambda: not all(getattr(connection.features, feature, False) for feature in features),
"Database doesn't support feature(s): %s" % ", ".join(features)
)
def _tearDownClassInternal(cls):
# There may not be a 'server_thread' attribute if setUpClass() for some
# reasons has raised an exception.
if hasattr(cls, 'server_thread'):
# Terminate the live server's thread
cls.server_thread.terminate()
cls.server_thread.join()
# Restore sqlite in-memory database connections' non-shareability
for conn in connections.all():
if conn.vendor == 'sqlite' and conn.is_in_memory_db(conn.settings_dict['NAME']):
conn.allow_thread_sharing = False
def check_database_backends(*args, **kwargs):
issues = []
for conn in connections.all():
issues.extend(conn.validation.check(**kwargs))
return issues
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
view = transaction.atomic(using=db.alias)(view)
return view
def check_database_backends(*args, **kwargs):
issues = []
for conn in connections.all():
issues.extend(conn.validation.check(**kwargs))
return issues
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
view = transaction.atomic(using=db.alias)(view)
return view
def ready(self):
# Connections may already exist before we are called.
for conn in connections.all():
if conn.connection is not None:
register_type_handlers(conn)
connection_created.connect(register_type_handlers)
CharField.register_lookup(Unaccent)
TextField.register_lookup(Unaccent)
CharField.register_lookup(SearchLookup)
TextField.register_lookup(SearchLookup)
CharField.register_lookup(TrigramSimilar)
TextField.register_lookup(TrigramSimilar)
def check_database_backends(*args, **kwargs):
issues = []
for conn in connections.all():
issues.extend(conn.validation.check(**kwargs))
return issues
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
view = transaction.atomic(using=db.alias)(view)
return view
def ready(self):
# Connections may already exist before we are called.
for conn in connections.all():
if conn.connection is not None:
register_type_handlers(conn)
connection_created.connect(register_type_handlers)
CharField.register_lookup(Unaccent)
TextField.register_lookup(Unaccent)
CharField.register_lookup(SearchLookup)
TextField.register_lookup(SearchLookup)
CharField.register_lookup(TrigramSimilar)
TextField.register_lookup(TrigramSimilar)
def delete(self, *args, **kwargs):
"""Delete this application including all containers"""
try:
# attempt to remove containers from the scheduler
self._destroy_containers([c for c in self.container_set.exclude(type='run')])
except RuntimeError:
pass
self._clean_app_logs()
return super(App, self).delete(*args, **kwargs)
def restart(self, **kwargs):
to_restart = self.container_set.all()
if kwargs.get('type'):
to_restart = to_restart.filter(type=kwargs.get('type'))
if kwargs.get('num'):
to_restart = to_restart.filter(num=kwargs.get('num'))
self._restart_containers(to_restart)
return to_restart
def restart(self, **kwargs):
to_restart = self.container_set.all()
if kwargs.get('type'):
to_restart = to_restart.filter(type=kwargs.get('type'))
if kwargs.get('num'):
to_restart = to_restart.filter(num=kwargs.get('num'))
self._restart_containers(to_restart)
return to_restart
def patch_db(tracer):
for c in connections.all():
patch_conn(tracer, c)