def parse_sql_tables(sql):
tables = []
parsed = sqlparse.parse(sql)
stmt = parsed[0]
from_seen = False
for token in stmt.tokens:
if from_seen:
if token.ttype is Keyword:
continue
else:
if isinstance(token, IdentifierList):
for identifier in token.get_identifiers():
tables.append(SQLParser.get_table_name(identifier))
elif isinstance(token, Identifier):
tables.append(SQLParser.get_table_name(token))
else:
pass
if token.ttype is Keyword and token.value.upper() == "FROM":
from_seen = True
return tables
python类sql()的实例源码
def test_placeholder(self):
def _get_tokens(sql):
return sqlparse.parse(sql)[0].tokens[-1].tokens
t = _get_tokens('select * from foo where user = ?')
self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
self.assertEqual(t[-1].value, '?')
t = _get_tokens('select * from foo where user = :1')
self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
self.assertEqual(t[-1].value, ':1')
t = _get_tokens('select * from foo where user = :name')
self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
self.assertEqual(t[-1].value, ':name')
t = _get_tokens('select * from foo where user = %s')
self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
self.assertEqual(t[-1].value, '%s')
t = _get_tokens('select * from foo where user = $a')
self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
self.assertEqual(t[-1].value, '$a')
def extract_tables():
stream = extract_from_part(sqlparse.parse(sql)[0])
return list(extract_table_identifiers(stream))
def _allow_join_condition(statement):
"""
Tests if a join condition should be suggested
We need this to avoid bad suggestions when entering e.g.
select * from tbl1 a join tbl2 b on a.id = <cursor>
So check that the preceding token is a ON, AND, or OR keyword, instead of
e.g. an equals sign.
:param statement: an sqlparse.sql.Statement
:return: boolean
"""
if not statement or not statement.tokens:
return False
last_tok = statement.token_prev(len(statement.tokens))[1]
return last_tok.value.lower() in ('on', 'and', 'or')
def _allow_join(statement):
"""
Tests if a join should be suggested
We need this to avoid bad suggestions when entering e.g.
select * from tbl1 a join tbl2 b <cursor>
So check that the preceding token is a JOIN keyword
:param statement: an sqlparse.sql.Statement
:return: boolean
"""
if not statement or not statement.tokens:
return False
last_tok = statement.token_prev(len(statement.tokens))[1]
return (
last_tok.value.lower().endswith('join') and last_tok.value.lower() not in(
'cross join', 'natural join'))
def parse_sql_columns(sql):
columns = []
parsed = sqlparse.parse(sql)
stmt = parsed[0]
for token in stmt.tokens:
if isinstance(token, IdentifierList):
for identifier in token.get_identifiers():
columns.append(identifier.get_real_name())
if isinstance(token, Identifier):
columns.append(token.get_real_name())
if token.ttype is Keyword: # from
break
return columns
def test_tokenize(self):
sql = 'select * from foo;'
stmts = sqlparse.parse(sql)
self.assertEqual(len(stmts), 1)
self.assertEqual(str(stmts[0]), sql)
def test_newlines(self):
sql = u'select\n*from foo;'
p = sqlparse.parse(sql)[0]
self.assertEqual(unicode(p), sql)
sql = u'select\r\n*from foo'
p = sqlparse.parse(sql)[0]
self.assertEqual(unicode(p), sql)
sql = u'select\r*from foo'
p = sqlparse.parse(sql)[0]
self.assertEqual(unicode(p), sql)
sql = u'select\r\n*from foo\n'
p = sqlparse.parse(sql)[0]
self.assertEqual(unicode(p), sql)
def test_within(self):
sql = 'foo(col1, col2)'
p = sqlparse.parse(sql)[0]
col1 = p.tokens[0].tokens[1].tokens[1].tokens[0]
self.assert_(col1.within(sqlparse.sql.Function))
def test_child_of(self):
sql = '(col1, col2)'
p = sqlparse.parse(sql)[0]
self.assert_(p.tokens[0].tokens[1].is_child_of(p.tokens[0]))
sql = 'select foo'
p = sqlparse.parse(sql)[0]
self.assert_(not p.tokens[2].is_child_of(p.tokens[0]))
self.assert_(p.tokens[2].is_child_of(p))
def test_access_symbol(self): # see issue27
t = sqlparse.parse('select a.[foo bar] as foo')[0].tokens
self.assert_(isinstance(t[-1], sqlparse.sql.Identifier))
self.assertEqual(t[-1].get_name(), 'foo')
self.assertEqual(t[-1].get_real_name(), '[foo bar]')
self.assertEqual(t[-1].get_parent_name(), 'a')
def test_square_brackets_notation_isnt_too_greedy(self): # see issue153
t = sqlparse.parse('[foo], [bar]')[0].tokens
self.assert_(isinstance(t[0], sqlparse.sql.IdentifierList))
self.assertEqual(len(t[0].tokens), 4)
self.assertEqual(t[0].tokens[0].get_real_name(), '[foo]')
self.assertEqual(t[0].tokens[-1].get_real_name(), '[bar]')
def test_keyword_like_identifier(self): # see issue47
t = sqlparse.parse('foo.key')[0].tokens
self.assertEqual(len(t), 1)
self.assert_(isinstance(t[0], sqlparse.sql.Identifier))
def test_function_parameter(self): # see issue94
t = sqlparse.parse('abs(some_col)')[0].tokens[0].get_parameters()
self.assertEqual(len(t), 1)
self.assert_(isinstance(t[0], sqlparse.sql.Identifier))
def test_quoted_identifier():
t = sqlparse.parse('select x.y as "z" from foo')[0].tokens
assert isinstance(t[2], sqlparse.sql.Identifier)
assert t[2].get_name() == 'z'
assert t[2].get_real_name() == 'y'
def test_valid_identifier_names(name): # issue175
t = sqlparse.parse(name)[0].tokens
assert isinstance(t[0], sqlparse.sql.Identifier)
def test_double_precision_is_builtin():
sql = 'DOUBLE PRECISION'
t = sqlparse.parse(sql)[0].tokens
assert (len(t) == 1
and t[0].ttype == sqlparse.tokens.Name.Builtin
and t[0].value == 'DOUBLE PRECISION')
def test_double_quotes_are_identifiers():
p = sqlparse.parse('"foo"')[0].tokens
assert len(p) == 1
assert isinstance(p[0], sqlparse.sql.Identifier)
def test_sqlite_identifiers():
# Make sure we still parse sqlite style escapes
p = sqlparse.parse('[col1],[col2]')[0].tokens
assert (len(p) == 1
and isinstance(p[0], sqlparse.sql.IdentifierList)
and [id.get_name() for id in p[0].get_identifiers()]
== ['[col1]', '[col2]'])
p = sqlparse.parse('[col1]+[col2]')[0]
types = [tok.ttype for tok in p.flatten()]
assert types == [T.Name, T.Operator, T.Name]
def test_single_line_comments(sql):
p = sqlparse.parse(sql)[0]
assert len(p.tokens) == 5
assert p.tokens[-1].ttype == T.Comment.Single
def __init__(self,sql=None):
self.sql = sql
def extract_tables(self):
# print sqlparse.parse(self.sql)[0]
stream = self.extract_from_part(sqlparse.parse(self.sql)[0])
# print stream
return list(self.extract_table_identifiers(stream))
#black white list check
def check_query_table(self,dbtag,username):
list = Tb_blacklist.objects.filter(dbtag=dbtag)
if list :
#user in white list
if User.objects.get(username=username) in list[0].user_permit.all():
existTb=[]
else :
#if table in black list
blackTblist = list[0].tbname.split(',')
parser = Sqlparse(self.sql)
tblist = parser.extract_tables()
existTb = [val for val in blackTblist if val in tblist]
if existTb:
return True,existTb
return False,[]
def to_sqla(sql):
tokens = sqlparse.parse(sql)[0].tokens
tokens = remove_whitespace(tokens)
return tokens_to_sqla(tokens).render()
def find_prev_keyword(sql, n_skip=0):
"""
Find the last sql keyword in an SQL statement.
Returns the value of the last keyword, and the text of the query with
everything after the last keyword stripped.
"""
if not sql.strip():
return None, ''
parsed = sqlparse.parse(sql)[0]
flattened = list(parsed.flatten())
flattened = flattened[:len(flattened) - n_skip]
logical_operators = ('AND', 'OR', 'NOT', 'BETWEEN')
for t in reversed(flattened):
if t.value == '(' or (t.is_keyword and (
t.value.upper() not in logical_operators)):
# Find the location of token t in the original parsed statement
# We can't use parsed.token_index(t) because t may be a child token
# inside a TokenList, in which case token_index thows an error
# Minimal example:
# p = sqlparse.parse('select * from foo where bar')
# t = list(p.flatten())[-3] # The "Where" token
# p.token_index(t) # Throws ValueError: not in list
idx = flattened.index(t)
# Combine the string values of all tokens in the original list
# up to and including the target keyword token t, to produce a
# query string with everything after the keyword token removed
text = ''.join(tok.value for tok in flattened[:idx + 1])
return t, text
return None, ''
# Postgresql dollar quote signs look like `$$` or `$tag$`
def is_open_quote(sql):
"""Returns true if the query contains an unclosed quote."""
# parsed can contain one or more semi-colon separated commands
parsed = sqlparse.parse(sql)
return any(_parsed_is_open_quote(p) for p in parsed)
def parse_partial_identifier(word):
"""
Attempt to parse a (partially typed) word as an identifier.
word may include a schema qualification, like `schema_name.partial_name`
or `schema_name.` There may also be unclosed quotation marks, like
`"schema` or `schema."partial_name`.
:param word: string representing a (partially complete) identifier
:return: sqlparse.sql.Identifier, or None
"""
p = sqlparse.parse(word)[0]
n_tok = len(p.tokens)
if n_tok == 1 and isinstance(p.tokens[0], Identifier):
return p.tokens[0]
elif p.token_next_by(m=(Error, '"'))[1]:
# An unmatched double quote, e.g. '"foo', 'foo."', or 'foo."bar'
# Close the double quote, then reparse
return parse_partial_identifier(word + '"')
else:
return None