def delete_table(self):
logger.info('try to delete table {} in {}'.format(
self.sql_table_name,
self.schema
))
table = self.get_sql_table_object(need_columns=False)
return self.local_engine.execute(DropTable(table))
python类format()的实例源码
def remote_sql_count(self):
"""SQL QUERY FOR COUNT ROW IN REMOTE SOURCE"""
try:
return 'SELECT COUNT(*) AS rows_count FROM ({sql}) AS CHITER;'. \
format(
sql=self.remote_etl_sql().compile(
compile_kwargs={"literal_binds": True})
)
except AttributeError:
return 'SELECT COUNT(*) AS rows_count FROM ({sql}) AS CHITER;'. \
format(
sql=self.remote_etl_sql()
)
def clear(self):
""" Stop ETL TASK and Clear ETL local table """
dt = datetime.utcnow().replace(
microsecond=0
)
self.status = EtlStatus.STOPPED
self.sync_last = ''
self.sync_last_time = dt
self.sync_periodic = 0
self.is_scheduled = False
# self.chunk_size = 0
self.progress = 0
self.sync_next_time = self.get_next_sync()
db.session.merge(self)
db.session.commit()
table_name = self.sql_table_name
if self.schema and self.sql_table_name:
table_name = '{schema}.{table}'.format(
schema=self.schema,
table=self.sql_table_name
)
sql_truncate = 'TRUNCATE TABLE {} CONTINUE IDENTITY RESTRICT;'.format(
table_name
)
with self.local_engine.connect() as local_conection:
local_conection.execution_options(
autocommit=True
).execute(sql_truncate)
def clear_column_type(cls, column_type=''):
if column_type:
column_type = column_type.split(') ')
l = len(column_type)
column_type = column_type[0]
if l:
if l > 1:
column_type = '{})'.format(column_type)
column_type = 'sa.{}'.format(column_type)
if 'INTEGER' in column_type or 'TINYINT' in column_type\
or 'BIGINT' in column_type:
column_type = 'sa.Integer()'
if 'string' in column_type:
column_type = 'sa.Text()'
if 'OBJECT' in column_type:
column_type = 'sa.Text()'
if 'DATETIME' in column_type or 'TIMESTAMP WITHOUT TIME ZONE' \
in column_type or 'TIMESTAMP WITH TIME ZONE'\
in column_type:
column_type = 'sa.DateTime()'
if 'HSTORE' in column_type:
# column_type = 'postgresql.HSTORE(text_type=sa.Text())'
column_type = 'postgresql.HSTORE'
else:
column_type = 'sa.Text()'
return column_type
def sqlformat(sql):
'''
Format SQL queries.
'''
return sqlparse.format(str(sql), reindent=True, wrap_after=120)
def prepare_sql_script(self, sql, _allow_fallback=False):
"""
Takes a SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
# Remove _allow_fallback and keep only 'return ...' in Django 1.9.
try:
# This import must stay inside the method because it's optional.
import sqlparse
except ImportError:
if _allow_fallback:
# Without sqlparse, fall back to the legacy (and buggy) logic.
warnings.warn(
"Providing initial SQL data on a %s database will require "
"sqlparse in Django 1.9." % self.connection.vendor,
RemovedInDjango19Warning)
from django.core.management.sql import _split_statements
return _split_statements(sql)
else:
raise
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def return_insert_id(self):
"""
For backends that support returning the last insert ID as part
of an insert query, this method returns the SQL and params to
append to the INSERT query. The returned fragment should
contain a format string to hold the appropriate column.
"""
pass
def get_db_converters(self, expression):
"""Get a list of functions needed to convert field data.
Some field types on some backends do not provide data in the correct
format, this is the hook for coverter functions.
"""
return []
def test_keywordcase(self):
sql = 'select * from bar; -- select foo\n'
res = sqlparse.format(sql, keyword_case='upper')
self.ndiffAssertEqual(res, 'SELECT * FROM bar; -- select foo\n')
res = sqlparse.format(sql, keyword_case='capitalize')
self.ndiffAssertEqual(res, 'Select * From bar; -- select foo\n')
res = sqlparse.format(sql.upper(), keyword_case='lower')
self.ndiffAssertEqual(res, 'select * from BAR; -- SELECT FOO\n')
self.assertRaises(SQLParseError, sqlparse.format, sql,
keyword_case='foo')
def test_identifiercase(self):
sql = 'select * from bar; -- select foo\n'
res = sqlparse.format(sql, identifier_case='upper')
self.ndiffAssertEqual(res, 'select * from BAR; -- select foo\n')
res = sqlparse.format(sql, identifier_case='capitalize')
self.ndiffAssertEqual(res, 'select * from Bar; -- select foo\n')
res = sqlparse.format(sql.upper(), identifier_case='lower')
self.ndiffAssertEqual(res, 'SELECT * FROM bar; -- SELECT FOO\n')
self.assertRaises(SQLParseError, sqlparse.format, sql,
identifier_case='foo')
sql = 'select * from "foo"."bar"'
res = sqlparse.format(sql, identifier_case="upper")
self.ndiffAssertEqual(res, 'select * from "foo"."bar"')
def test_strip_comments_single(self):
sql = 'select *-- statement starts here\nfrom foo'
res = sqlparse.format(sql, strip_comments=True)
self.ndiffAssertEqual(res, 'select * from foo')
sql = 'select * -- statement starts here\nfrom foo'
res = sqlparse.format(sql, strip_comments=True)
self.ndiffAssertEqual(res, 'select * from foo')
sql = 'select-- foo\nfrom -- bar\nwhere'
res = sqlparse.format(sql, strip_comments=True)
self.ndiffAssertEqual(res, 'select from where')
self.assertRaises(SQLParseError, sqlparse.format, sql,
strip_comments=None)
def test_strip_ws(self):
f = lambda sql: sqlparse.format(sql, strip_whitespace=True)
s = 'select\n* from foo\n\twhere ( 1 = 2 )\n'
self.ndiffAssertEqual(f(s), 'select * from foo where (1 = 2)')
s = 'select -- foo\nfrom bar\n'
self.ndiffAssertEqual(f(s), 'select -- foo\nfrom bar')
self.assertRaises(SQLParseError, sqlparse.format, s,
strip_whitespace=None)
def test_outputformat(self):
sql = 'select * from foo;'
self.assertRaises(SQLParseError, sqlparse.format, sql,
output_format='foo')
def test_option(self):
self.assertRaises(SQLParseError, sqlparse.format, 'foo',
reindent=2)
self.assertRaises(SQLParseError, sqlparse.format, 'foo',
indent_tabs=2)
self.assertRaises(SQLParseError, sqlparse.format, 'foo',
reindent=True, indent_width='foo')
self.assertRaises(SQLParseError, sqlparse.format, 'foo',
reindent=True, indent_width=-12)
def test_stmts(self):
f = lambda sql: sqlparse.format(sql, reindent=True)
s = 'select foo; select bar'
self.ndiffAssertEqual(f(s), 'select foo;\n\nselect bar')
s = 'select foo'
self.ndiffAssertEqual(f(s), 'select foo')
s = 'select foo; -- test\n select bar'
self.ndiffAssertEqual(f(s), 'select foo; -- test\n\nselect bar')
def test_keywords(self):
f = lambda sql: sqlparse.format(sql, reindent=True)
s = 'select * from foo union select * from bar;'
self.ndiffAssertEqual(f(s), '\n'.join(['select *',
'from foo',
'union',
'select *',
'from bar;']))
def test_parenthesis(self):
f = lambda sql: sqlparse.format(sql, reindent=True)
s = 'select count(*) from (select * from foo);'
self.ndiffAssertEqual(f(s),
'\n'.join(['select count(*)',
'from',
' (select *',
' from foo);',
])
)
def test_where(self):
f = lambda sql: sqlparse.format(sql, reindent=True)
s = 'select * from foo where bar = 1 and baz = 2 or bzz = 3;'
self.ndiffAssertEqual(f(s), ('select *\nfrom foo\n'
'where bar = 1\n'
' and baz = 2\n'
' or bzz = 3;'))
s = 'select * from foo where bar = 1 and (baz = 2 or bzz = 3);'
self.ndiffAssertEqual(f(s), ('select *\nfrom foo\n'
'where bar = 1\n'
' and (baz = 2\n'
' or bzz = 3);'))
def test_identifier_list(self):
f = lambda sql: sqlparse.format(sql, reindent=True)
s = 'select foo, bar, baz from table1, table2 where 1 = 2'
self.ndiffAssertEqual(f(s), '\n'.join(['select foo,',
' bar,',
' baz',
'from table1,',
' table2',
'where 1 = 2']))
s = 'select a.*, b.id from a, b'
self.ndiffAssertEqual(f(s), '\n'.join(['select a.*,',
' b.id',
'from a,',
' b']))
def test_identifier_list_with_functions(self):
f = lambda sql: sqlparse.format(sql, reindent=True)
s = ("select 'abc' as foo, coalesce(col1, col2)||col3 as bar,"
"col3 from my_table")
self.ndiffAssertEqual(f(s), '\n'.join(
["select 'abc' as foo,",
" coalesce(col1, col2)||col3 as bar,",
" col3",
"from my_table"]))