def parse_sql_tables(sql):
tables = []
parsed = sqlparse.parse(sql)
stmt = parsed[0]
from_seen = False
for token in stmt.tokens:
if from_seen:
if token.ttype is Keyword:
continue
else:
if isinstance(token, IdentifierList):
for identifier in token.get_identifiers():
tables.append(SQLParser.get_table_name(identifier))
elif isinstance(token, Identifier):
tables.append(SQLParser.get_table_name(token))
else:
pass
if token.ttype is Keyword and token.value.upper() == "FROM":
from_seen = True
return tables
python类tokens()的实例源码
def test_simple(self):
from cStringIO import StringIO
stream = StringIO("SELECT 1; SELECT 2;")
lex = lexer.Lexer()
tokens = lex.get_tokens(stream)
self.assertEqual(len(list(tokens)), 9)
stream.seek(0)
lex.bufsize = 4
tokens = list(lex.get_tokens(stream))
self.assertEqual(len(tokens), 9)
stream.seek(0)
lex.bufsize = len(stream.getvalue())
tokens = list(lex.get_tokens(stream))
self.assertEqual(len(tokens), 9)
def _gather_sql_placeholders(body):
p = sqlparse.parse(body)
assert len(p) == 1
p = p[0]
tokens = list(p.flatten())
names = [x.value[1:] for x in tokens if x.ttype == Token.Name.Placeholder]
return sorted(set(names))
def parse_sql_columns(sql):
columns = []
parsed = sqlparse.parse(sql)
stmt = parsed[0]
for token in stmt.tokens:
if isinstance(token, IdentifierList):
for identifier in token.get_identifiers():
columns.append(identifier.get_real_name())
if isinstance(token, Identifier):
columns.append(token.get_real_name())
if token.ttype is Keyword: # from
break
return columns
def is_subselect(parsed):
if not parsed.is_group():
return False
for item in parsed.tokens:
if item.ttype is DML and item.value.upper() == 'SELECT':
return True
return False
def extract_from_part(parsed):
from_seen = False
for item in parsed.tokens:
if from_seen:
if is_subselect(item):
for x in extract_from_part(item):
yield x
elif item.ttype is Keyword:
raise StopIteration
else:
yield item
elif item.ttype is Keyword and item.value.upper() == 'FROM':
from_seen = True
def test_simple(self):
s = 'select * from foo;'
stream = lexer.tokenize(s)
self.assert_(isinstance(stream, types.GeneratorType))
tokens = list(stream)
self.assertEqual(len(tokens), 8)
self.assertEqual(len(tokens[0]), 2)
self.assertEqual(tokens[0], (Keyword.DML, u'select'))
self.assertEqual(tokens[-1], (Punctuation, u';'))
def test_backticks(self):
s = '`foo`.`bar`'
tokens = list(lexer.tokenize(s))
self.assertEqual(len(tokens), 3)
self.assertEqual(tokens[0], (Name, u'`foo`'))
def test_linebreaks(self): # issue1
s = 'foo\nbar\n'
tokens = lexer.tokenize(s)
self.assertEqual(''.join(str(x[1]) for x in tokens), s)
s = 'foo\rbar\r'
tokens = lexer.tokenize(s)
self.assertEqual(''.join(str(x[1]) for x in tokens), s)
s = 'foo\r\nbar\r\n'
tokens = lexer.tokenize(s)
self.assertEqual(''.join(str(x[1]) for x in tokens), s)
s = 'foo\r\nbar\n'
tokens = lexer.tokenize(s)
self.assertEqual(''.join(str(x[1]) for x in tokens), s)
def test_negative_numbers(self):
s = "values(-1)"
tokens = list(lexer.tokenize(s))
self.assertEqual(len(tokens), 4)
self.assertEqual(tokens[2][0], Number.Integer)
self.assertEqual(tokens[2][1], '-1')
# Somehow this test fails on Python 3.2
def test_tab_expansion(self):
s = "\t"
lex = lexer.Lexer()
lex.tabsize = 5
tokens = list(lex.get_tokens(s))
self.assertEqual(tokens[0][1], " " * 5)
def test_repr(self):
p = sqlparse.parse('foo, bar, baz')[0]
tst = "<IdentifierList 'foo, b...' at 0x"
self.assertEqual(repr(p.tokens[0])[:len(tst)], tst)
def test_error(self):
from cStringIO import StringIO
stream = StringIO("FOOBAR{")
lex = lexer.Lexer()
lex.bufsize = 4
tokens = list(lex.get_tokens(stream))
self.assertEqual(len(tokens), 2)
self.assertEqual(tokens[1][0], Error)
def test_parse_endifloop():
p = sqlparse.parse('END IF')[0]
assert len(p.tokens) == 1
assert p.tokens[0].ttype is Keyword
p = sqlparse.parse('END IF')[0]
assert len(p.tokens) == 1
p = sqlparse.parse('END\t\nIF')[0]
assert len(p.tokens) == 1
assert p.tokens[0].ttype is Keyword
p = sqlparse.parse('END LOOP')[0]
assert len(p.tokens) == 1
assert p.tokens[0].ttype is Keyword
p = sqlparse.parse('END LOOP')[0]
assert len(p.tokens) == 1
assert p.tokens[0].ttype is Keyword
def is_subselect(self,parsed):
if not parsed.is_group:
return False
for item in parsed.tokens:
if item.ttype is DML and item.value.upper() == 'SELECT':
return True
return False
def extract_from_part(self,parsed):
from_seen = False
for item in parsed.tokens:
if from_seen:
if self.is_subselect(item):
for x in self.extract_from_part(item):
yield x
elif item.ttype is Keyword:
raise StopIteration
else:
yield item
elif item.ttype is Keyword and item.value.upper() == 'FROM':
from_seen = True
def debug_tokens(tokens):
for t in tokens:
LOG.debug(' %r %s', t, type(t))
def debug(f):
def wrapped(*args, **kwargs):
debug_args = []
for a in args:
if is_tokens(a):
debug_args.append("[<%s tokens>]" % len(a))
else:
debug_args.append("%r" % a)
args_str = " ".join(str(a) for a in debug_args)
kwargs_str = " ".join("%s=%s" for k, v in kwargs.items())
LOG.debug("%s %s", f.__name__, args_str + kwargs_str)
# try to find tokens
if 'tokens' in kwargs:
if is_tokens(kwargs['tokens']):
debug_tokens(kwargs['tokens'])
for a in args:
if is_tokens(a):
debug_tokens(a)
result = f(*args, **kwargs)
if result is not None:
LOG.debug("%s returned %r", f.__name__, result)
return result
return wrapped
def remove_whitespace(tokens):
return [x for x in tokens if not x.is_whitespace]
def to_sqla(sql):
tokens = sqlparse.parse(sql)[0].tokens
tokens = remove_whitespace(tokens)
return tokens_to_sqla(tokens).render()
def build_comparison(tok):
assert type(tok) is S.Comparison
m = M.Comparison()
for tok in remove_whitespace(tok.tokens):
LOG.debug(" %s %s", tok, type(tok))
m = sql_literal_to_model(tok, m)
if not m:
raise Exception("[BUG] Failed to convert %s to model" % tok)
return m
def group_tokens(parsed):
grouped = defaultdict(list)
identifiers = []
for token in parsed.tokens:
if token.ttype:
grouped[token.ttype].append(token.value)
elif token.get_name():
identifiers.append(token)
return grouped, identifiers
def find_prev_keyword(sql, n_skip=0):
"""
Find the last sql keyword in an SQL statement.
Returns the value of the last keyword, and the text of the query with
everything after the last keyword stripped.
"""
if not sql.strip():
return None, ''
parsed = sqlparse.parse(sql)[0]
flattened = list(parsed.flatten())
flattened = flattened[:len(flattened) - n_skip]
logical_operators = ('AND', 'OR', 'NOT', 'BETWEEN')
for t in reversed(flattened):
if t.value == '(' or (t.is_keyword and (
t.value.upper() not in logical_operators)):
# Find the location of token t in the original parsed statement
# We can't use parsed.token_index(t) because t may be a child token
# inside a TokenList, in which case token_index thows an error
# Minimal example:
# p = sqlparse.parse('select * from foo where bar')
# t = list(p.flatten())[-3] # The "Where" token
# p.token_index(t) # Throws ValueError: not in list
idx = flattened.index(t)
# Combine the string values of all tokens in the original list
# up to and including the target keyword token t, to produce a
# query string with everything after the keyword token removed
text = ''.join(tok.value for tok in flattened[:idx + 1])
return t, text
return None, ''
# Postgresql dollar quote signs look like `$$` or `$tag$`
def parse_partial_identifier(word):
"""
Attempt to parse a (partially typed) word as an identifier.
word may include a schema qualification, like `schema_name.partial_name`
or `schema_name.` There may also be unclosed quotation marks, like
`"schema` or `schema."partial_name`.
:param word: string representing a (partially complete) identifier
:return: sqlparse.sql.Identifier, or None
"""
p = sqlparse.parse(word)[0]
n_tok = len(p.tokens)
if n_tok == 1 and isinstance(p.tokens[0], Identifier):
return p.tokens[0]
elif p.token_next_by(m=(Error, '"'))[1]:
# An unmatched double quote, e.g. '"foo', 'foo."', or 'foo."bar'
# Close the double quote, then reparse
return parse_partial_identifier(word + '"')
else:
return None
def is_subselect(parsed):
if not parsed.is_group:
return False
for item in parsed.tokens:
if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT', 'CREATE'):
return True
return False
def _identifier_is_function(identifier):
return any(isinstance(t, Function) for t in identifier.tokens)
def extract_from_part(parsed, stop_at_punctuation=True):
tbl_prefix_seen = False
for item in parsed.tokens:
if tbl_prefix_seen:
if is_subselect(item):
for x in extract_from_part(item, stop_at_punctuation):
yield x
elif stop_at_punctuation and item.ttype is Punctuation:
raise StopIteration
# An incomplete nested select won't be recognized correctly as a
# sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
# the second FROM to trigger this elif condition resulting in a
# StopIteration. So we need to ignore the keyword if the keyword
# FROM.
# Also 'SELECT * FROM abc JOIN def' will trigger this elif
# condition. So we need to ignore the keyword JOIN and its variants
# INNER JOIN, FULL OUTER JOIN, etc.
elif item.ttype is Keyword and (
not item.value.upper() == 'FROM') and (
not item.value.upper().endswith('JOIN')):
tbl_prefix_seen = False
else:
yield item
elif item.ttype is Keyword or item.ttype is Keyword.DML:
item_val = item.value.upper()
if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
item_val.endswith('JOIN')):
tbl_prefix_seen = True
# 'SELECT a, FROM abc' will detect FROM as part of the column list.
# So this check here is necessary.
elif isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
if (identifier.ttype is Keyword and
identifier.value.upper() == 'FROM'):
tbl_prefix_seen = True
break
def discoverComponents(sql):
"""Search for installable/removable components within a string containing
one or more SQL statemnets"""
components = []
parsed = sqlparse.parse(sql)
for statement in parsed:
for token in statement.tokens:
name = None
typ = None
# remove newlines, extra spaces for regex
stmt = str(statement).replace("\n", " ")
stmt = " ".join(stmt.split())
for comp in sqlComponents:
if token.match(Keyword, comp.typ):
name = comp.match(stmt)
typ = comp.typ
if name is not None:
component = AppComponent(name, typ)
if component not in components:
components.append(component)
# sort alphabetically, should fix drop issues when 'rule on table'
# is dropped before 'table'
return sorted(components, key=lambda x: x.typ)