def format(self, record):
super(SqlFormatter, self).format(record)
sql = record.sql.strip()
if self.parse:
sql = sqlparse.format(sql, reindent=self.reindent, keyword_case=self.keyword_case)
if hasattr(record, 'duration'):
sql = "({0:.3f}ms) {1}".format(record.duration, sql)
if self.highlight:
sql = highlight(
sql,
self._lexer,
self._formatter
)
return sql
python类format()的实例源码
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def create_table(self):
if not self.schema:
self.schema = DB_ETL_SCHEMA
if not self.create_schema():
return False
logger.info('try to create table {} in {}'.format(
self.sql_table_name,
self.schema
))
if self.exist_table():
return True
table = self.get_sql_table_object(need_columns=True)
db_table = self.local_engine.execute(CreateTable(table))
for index in table.indexes:
self.local_engine.execute(CreateIndex(index))
return db_table
def locale_sql_count(self):
sql = ''
if self.schema and self.sql_table_name:
"""Use schema and table name"""
try:
sql = 'SELECT COUNT(*) AS rows_count FROM {schema}.{table};'.\
format(
schema=self.schema,
table=self.sql_table_name
)
except Exception as e:
logger.exception(str(e))
elif self.sql_table_name:
"""Use table name"""
try:
sql = 'SELECT COUNT(*) AS rows_count FROM {table};'.format(
table=self.sql_table_name
)
except Exception as e:
logger.exception(str(e))
return sql
def format(self, record):
sql = record.sql.strip()
if sqlparse:
# Indent the SQL query
sql = sqlparse.format(sql, reindent=True)
if pygments:
# Highlight the SQL query
sql = pygments.highlight(
sql,
SqlLexer(),
Terminal256Formatter(style='monokai')
)
record.statement = sql
return super(SQLFormatter, self).format(record)
def test_strip_comments_multi(self):
sql = '/* sql starts here */\nselect'
res = sqlparse.format(sql, strip_comments=True)
self.ndiffAssertEqual(res, 'select')
sql = '/* sql starts here */ select'
res = sqlparse.format(sql, strip_comments=True)
self.ndiffAssertEqual(res, 'select')
sql = '/*\n * sql starts here\n */\nselect'
res = sqlparse.format(sql, strip_comments=True)
self.ndiffAssertEqual(res, 'select')
sql = 'select (/* sql starts here */ select 2)'
res = sqlparse.format(sql, strip_comments=True)
self.ndiffAssertEqual(res, 'select (select 2)')
sql = 'select (/* sql /* starts here */ select 2)'
res = sqlparse.format(sql, strip_comments=True)
self.ndiffAssertEqual(res, 'select (select 2)')
def test_notransform_of_quoted_crlf(self):
# Make sure that CR/CR+LF characters inside string literals don't get
# affected by the formatter.
s1 = "SELECT some_column LIKE 'value\r'"
s2 = "SELECT some_column LIKE 'value\r'\r\nWHERE id = 1\n"
s3 = "SELECT some_column LIKE 'value\\'\r' WHERE id = 1\r"
s4 = "SELECT some_column LIKE 'value\\\\\\'\r' WHERE id = 1\r\n"
f = lambda x: sqlparse.format(x)
# Because of the use of
self.ndiffAssertEqual(f(s1), "SELECT some_column LIKE 'value\r'")
self.ndiffAssertEqual(f(s2), "SELECT some_column LIKE 'value\r'\nWHERE id = 1\n")
self.ndiffAssertEqual(f(s3), "SELECT some_column LIKE 'value\\'\r' WHERE id = 1\n")
self.ndiffAssertEqual(f(s4), "SELECT some_column LIKE 'value\\\\\\'\r' WHERE id = 1\n")
def test_join(self):
f = lambda sql: sqlparse.format(sql, reindent=True)
s = 'select * from foo join bar on 1 = 2'
self.ndiffAssertEqual(f(s), '\n'.join(['select *',
'from foo',
'join bar on 1 = 2']))
s = 'select * from foo inner join bar on 1 = 2'
self.ndiffAssertEqual(f(s), '\n'.join(['select *',
'from foo',
'inner join bar on 1 = 2']))
s = 'select * from foo left outer join bar on 1 = 2'
self.ndiffAssertEqual(f(s), '\n'.join(['select *',
'from foo',
'left outer join bar on 1 = 2']
))
s = 'select * from foo straight_join bar on 1 = 2'
self.ndiffAssertEqual(f(s), '\n'.join(['select *',
'from foo',
'straight_join bar on 1 = 2']
))
def test_duplicate_linebreaks(self): # issue3
f = lambda sql: sqlparse.format(sql, reindent=True)
s = 'select c1 -- column1\nfrom foo'
self.ndiffAssertEqual(f(s), '\n'.join(['select c1 -- column1',
'from foo']))
s = 'select c1 -- column1\nfrom foo'
r = sqlparse.format(s, reindent=True, strip_comments=True)
self.ndiffAssertEqual(r, '\n'.join(['select c1',
'from foo']))
s = 'select c1\nfrom foo\norder by c1'
self.ndiffAssertEqual(f(s), '\n'.join(['select c1',
'from foo',
'order by c1']))
s = 'select c1 from t1 where (c1 = 1) order by c1'
self.ndiffAssertEqual(f(s), '\n'.join(['select c1',
'from t1',
'where (c1 = 1)',
'order by c1']))
def test_issue90():
sql = ('UPDATE "gallery_photo" SET "owner_id" = 4018, "deleted_at" = NULL,'
' "width" = NULL, "height" = NULL, "rating_votes" = 0,'
' "rating_score" = 0, "thumbnail_width" = NULL,'
' "thumbnail_height" = NULL, "price" = 1, "description" = NULL')
formatted = sqlparse.format(sql, reindent=True)
tformatted = '\n'.join(['UPDATE "gallery_photo"',
'SET "owner_id" = 4018,',
' "deleted_at" = NULL,',
' "width" = NULL,',
' "height" = NULL,',
' "rating_votes" = 0,',
' "rating_score" = 0,',
' "thumbnail_width" = NULL,',
' "thumbnail_height" = NULL,',
' "price" = 1,',
' "description" = NULL'])
assert formatted == tformatted
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def _is_initialized(cursor):
"""
Check that database is initialized
"""
cursor.execute('SELECT EXISTS(SELECT 1 FROM '
'information_schema.tables '
'WHERE table_schema = %s '
'AND table_name = %s)',
('public', 'schema_version'))
table_exists = cursor.fetchone()[0]
if not table_exists:
return False
cursor.execute('SELECT * from public.schema_version limit 1')
colnames = [desc[0] for desc in cursor.description]
if colnames != REF_COLUMNS:
raise MalformedSchema(
'Table schema_version has unexpected '
'structure: {struct}'.format(struct='|'.join(colnames)))
return True
def _set_baseline(baseline_v, user, cursor):
"""
Cleanup schema_version and set baseline
"""
cursor.execute('SELECT EXISTS(SELECT 1 FROM public'
'.schema_version WHERE version >= %s::bigint)',
(baseline_v,))
check_failed = cursor.fetchone()[0]
if check_failed:
raise BaselineError(
'Unable to baseline, version '
'{version} already applied'.format(version=text(baseline_v)))
LOG.info('cleaning up table schema_version')
cursor.execute('DELETE FROM public.schema_version')
LOG.info(cursor.statusmessage)
LOG.info('setting baseline')
cursor.execute('INSERT INTO public.schema_version '
'(version, type, description, installed_by) '
'VALUES (%s::bigint, %s, %s, %s)',
(text(baseline_v), 'manual',
'Forced baseline', user))
LOG.info(cursor.statusmessage)
def _get_statements(path):
"""
Get statements from file
"""
with codecs.open(path, encoding='utf-8') as i:
data = i.read()
if u'/* pgmigrate-encoding: utf-8 */' not in data:
try:
data.encode('ascii')
except UnicodeError as exc:
raise MalformedStatement(
'Non ascii symbols in file: {0}, {1}'.format(
path, text(exc)))
data = sqlparse.format(data, strip_comments=True)
for statement in sqlparse.parsestream(data, encoding='utf-8'):
st_str = text(statement).strip().encode('utf-8')
if st_str:
yield st_str
def _parse_dict_callbacks(callbacks, ret, base_dir):
for i in callbacks:
if i in ret._fields:
for j in callbacks[i]:
path = os.path.join(base_dir, j)
if not os.path.exists(path):
raise ConfigurationError(
'Path unavailable: {path}'.format(path=text(path)))
if os.path.isdir(path):
for fname in sorted(os.listdir(path)):
getattr(ret, i).append(os.path.join(path, fname))
else:
getattr(ret, i).append(path)
else:
raise ConfigurationError(
'Unexpected callback type: {type}'.format(type=text(i)))
return ret
def process_response(request, response):
if request.GET.get('debug') == '':
if response['Content-Type'] == 'application/octet-stream':
new_content = ('<html><body>Binary Data, Length: {}</body></html>'
.format(len(response.content)))
response = HttpResponse(new_content)
elif response['Content-Type'] != 'text/html':
content = response.content
try:
json_ = json.loads(content)
content = json.dumps(json_, sort_keys=True, indent=2)
except ValueError:
pass
response = HttpResponse('<html><body><pre>{}</pre></body></html>'.format(content))
return response
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def render_sql(db, q, inline=False):
"""
Render the sql used by a query (only works for Postgres)
:param q (query): an SQLAlchemy query object
:param inline (bool): inline parameters?
:return: str
"""
compiled_statement = q.statement.compile(dialect=postgresql.dialect())
pretty_statement = sql_format(str(compiled_statement), reindent=True)
if inline:
with db.session.connection().connection.connection.cursor() as cur:
return cur.mogrify(pretty_statement, compiled_statement.params).decode('utf-8')
else:
return pretty_statement + ("\nparameters: {}".format(str(compiled_statement.params)) if compiled_statement.params else '')
def test_sqlformatter(self):
sqlformatter = SqlFormatter().format(self.model)
sql_compare = 'SELECT "django_migrations"."app","django_migrations"."name" FROM "django_migrations"'
sql_compare = sqlparse.format(sql_compare, reindent=True, keyword_case='upper')
sql_compare = "(0.005ms) {0}".format(sql_compare)
sql_compare = highlight(
sql_compare,
SqlLexer(),
Terminal256Formatter(style='default')
)
self.assertEqual(sql_compare, sqlformatter)
def prettify_sql(sql):
if sql is None:
return None
return sqlparse.format(
sql,
keyword_case="upper",
identfier_case="lower",
strip_comments=False,
reindent=True,
indent_tabs=False)
def get_safe_connection_string(conn):
try:
# 2.7+
params = conn.get_dsn_parameters()
except AttributeError:
params = dict(i.split('=') for i in shlex.split(conn.dsn))
return '{user}@{host}:{port}/{dbname}'.format(**params)
def callproc(self, procname, vars=None):
timestamp = time.time()
try:
return super(TaliskerCursor, self).callproc(procname, vars)
finally:
duration = (time.time() - timestamp) * 1000
# no query parameters, cannot safely record
self.connection._record(
'stored proc: {}'.format(procname), None, duration)
def return_insert_id(self):
"""
For backends that support returning the last insert ID as part
of an insert query, this method returns the SQL and params to
append to the INSERT query. The returned fragment should
contain a format string to hold the appropriate column.
"""
pass
def get_db_converters(self, expression):
"""
Get a list of functions needed to convert field data.
Some field types on some backends do not provide data in the correct
format, this is the hook for converter functions.
"""
return []
def get_perm(self):
return (
'[{obj.sql_table_name}]'
'(id:{obj.id})').format(obj=self)
def sql_table_name(self):
return '{}_{}'.format(DB_ETL_PREFIX, self.name)
def create_schema(self):
logger.info('try to create schema {}'.format(self.schema))
if self.exist_schema():
return True
return self.local_engine.execute(CreateSchema(self.schema))