python类sql()的实例源码

sqlparser.py 文件源码 项目:etl_schedule 作者: yxl040840219 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_sql_tables(sql):
        tables = []
        parsed = sqlparse.parse(sql)
        stmt = parsed[0]
        from_seen = False
        for token in stmt.tokens:
            if from_seen:
                if token.ttype is Keyword:
                    continue
                else:
                    if isinstance(token, IdentifierList):
                        for identifier in token.get_identifiers():
                            tables.append(SQLParser.get_table_name(identifier))
                    elif isinstance(token, Identifier):
                        tables.append(SQLParser.get_table_name(token))
                    else:
                        pass
            if token.ttype is Keyword and token.value.upper() == "FROM":
                from_seen = True
        return tables
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_placeholder(self):
        def _get_tokens(sql):
            return sqlparse.parse(sql)[0].tokens[-1].tokens
        t = _get_tokens('select * from foo where user = ?')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, '?')
        t = _get_tokens('select * from foo where user = :1')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, ':1')
        t = _get_tokens('select * from foo where user = :name')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, ':name')
        t = _get_tokens('select * from foo where user = %s')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, '%s')
        t = _get_tokens('select * from foo where user = $a')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, '$a')
extract_table_names.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def extract_tables():
    stream = extract_from_part(sqlparse.parse(sql)[0])
    return list(extract_table_identifiers(stream))
helpers.py 文件源码 项目:clickhouse-cli 作者: hatarist 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _allow_join_condition(statement):
    """
    Tests if a join condition should be suggested

    We need this to avoid bad suggestions when entering e.g.
        select * from tbl1 a join tbl2 b on a.id = <cursor>
    So check that the preceding token is a ON, AND, or OR keyword, instead of
    e.g. an equals sign.

    :param statement: an sqlparse.sql.Statement
    :return: boolean
    """

    if not statement or not statement.tokens:
        return False

    last_tok = statement.token_prev(len(statement.tokens))[1]
    return last_tok.value.lower() in ('on', 'and', 'or')
helpers.py 文件源码 项目:clickhouse-cli 作者: hatarist 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _allow_join(statement):
    """
    Tests if a join should be suggested

    We need this to avoid bad suggestions when entering e.g.
        select * from tbl1 a join tbl2 b <cursor>
    So check that the preceding token is a JOIN keyword

    :param statement: an sqlparse.sql.Statement
    :return: boolean
    """

    if not statement or not statement.tokens:
        return False

    last_tok = statement.token_prev(len(statement.tokens))[1]
    return (
        last_tok.value.lower().endswith('join') and last_tok.value.lower() not in(
            'cross join', 'natural join'))
sqlparser.py 文件源码 项目:etl_schedule 作者: yxl040840219 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_sql_columns(sql):
        columns = []
        parsed = sqlparse.parse(sql)
        stmt = parsed[0]
        for token in stmt.tokens:
            if isinstance(token, IdentifierList):
                for identifier in token.get_identifiers():
                    columns.append(identifier.get_real_name())
            if isinstance(token, Identifier):
                columns.append(token.get_real_name())
            if token.ttype is Keyword:  # from
                break
        return columns
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_tokenize(self):
        sql = 'select * from foo;'
        stmts = sqlparse.parse(sql)
        self.assertEqual(len(stmts), 1)
        self.assertEqual(str(stmts[0]), sql)
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_newlines(self):
        sql = u'select\n*from foo;'
        p = sqlparse.parse(sql)[0]
        self.assertEqual(unicode(p), sql)
        sql = u'select\r\n*from foo'
        p = sqlparse.parse(sql)[0]
        self.assertEqual(unicode(p), sql)
        sql = u'select\r*from foo'
        p = sqlparse.parse(sql)[0]
        self.assertEqual(unicode(p), sql)
        sql = u'select\r\n*from foo\n'
        p = sqlparse.parse(sql)[0]
        self.assertEqual(unicode(p), sql)
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_within(self):
        sql = 'foo(col1, col2)'
        p = sqlparse.parse(sql)[0]
        col1 = p.tokens[0].tokens[1].tokens[1].tokens[0]
        self.assert_(col1.within(sqlparse.sql.Function))
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_child_of(self):
        sql = '(col1, col2)'
        p = sqlparse.parse(sql)[0]
        self.assert_(p.tokens[0].tokens[1].is_child_of(p.tokens[0]))
        sql = 'select foo'
        p = sqlparse.parse(sql)[0]
        self.assert_(not p.tokens[2].is_child_of(p.tokens[0]))
        self.assert_(p.tokens[2].is_child_of(p))
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_access_symbol(self):  # see issue27
        t = sqlparse.parse('select a.[foo bar] as foo')[0].tokens
        self.assert_(isinstance(t[-1], sqlparse.sql.Identifier))
        self.assertEqual(t[-1].get_name(), 'foo')
        self.assertEqual(t[-1].get_real_name(), '[foo bar]')
        self.assertEqual(t[-1].get_parent_name(), 'a')
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_square_brackets_notation_isnt_too_greedy(self):  # see issue153
        t = sqlparse.parse('[foo], [bar]')[0].tokens
        self.assert_(isinstance(t[0], sqlparse.sql.IdentifierList))
        self.assertEqual(len(t[0].tokens), 4)
        self.assertEqual(t[0].tokens[0].get_real_name(), '[foo]')
        self.assertEqual(t[0].tokens[-1].get_real_name(), '[bar]')
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_keyword_like_identifier(self):  # see issue47
        t = sqlparse.parse('foo.key')[0].tokens
        self.assertEqual(len(t), 1)
        self.assert_(isinstance(t[0], sqlparse.sql.Identifier))
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_function_parameter(self):  # see issue94
        t = sqlparse.parse('abs(some_col)')[0].tokens[0].get_parameters()
        self.assertEqual(len(t), 1)
        self.assert_(isinstance(t[0], sqlparse.sql.Identifier))
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_quoted_identifier():
    t = sqlparse.parse('select x.y as "z" from foo')[0].tokens
    assert isinstance(t[2], sqlparse.sql.Identifier)
    assert t[2].get_name() == 'z'
    assert t[2].get_real_name() == 'y'
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_valid_identifier_names(name):  # issue175
    t = sqlparse.parse(name)[0].tokens
    assert isinstance(t[0], sqlparse.sql.Identifier)
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_double_precision_is_builtin():
    sql = 'DOUBLE PRECISION'
    t = sqlparse.parse(sql)[0].tokens
    assert (len(t) == 1
            and t[0].ttype == sqlparse.tokens.Name.Builtin
            and t[0].value == 'DOUBLE PRECISION')
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_double_quotes_are_identifiers():
    p = sqlparse.parse('"foo"')[0].tokens
    assert len(p) == 1
    assert isinstance(p[0], sqlparse.sql.Identifier)
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_sqlite_identifiers():
    # Make sure we still parse sqlite style escapes
    p = sqlparse.parse('[col1],[col2]')[0].tokens
    assert (len(p) == 1
            and isinstance(p[0], sqlparse.sql.IdentifierList)
            and [id.get_name() for id in p[0].get_identifiers()]
                    == ['[col1]', '[col2]'])

    p = sqlparse.parse('[col1]+[col2]')[0]
    types = [tok.ttype for tok in p.flatten()]
    assert types == [T.Name, T.Operator, T.Name]
test_parse.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_single_line_comments(sql):
    p = sqlparse.parse(sql)[0]
    assert len(p.tokens) == 5
    assert p.tokens[-1].ttype == T.Comment.Single
blFunction.py 文件源码 项目:db_platform 作者: speedocjx 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self,sql=None):
        self.sql = sql
blFunction.py 文件源码 项目:db_platform 作者: speedocjx 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def extract_tables(self):
        # print sqlparse.parse(self.sql)[0]
        stream = self.extract_from_part(sqlparse.parse(self.sql)[0])
        # print stream
        return list(self.extract_table_identifiers(stream))

    #black white list check
blFunction.py 文件源码 项目:db_platform 作者: speedocjx 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def check_query_table(self,dbtag,username):
        list = Tb_blacklist.objects.filter(dbtag=dbtag)
        if list :
            #user in white list
            if User.objects.get(username=username) in list[0].user_permit.all():
                existTb=[]
            else :
                #if table in black list
                blackTblist = list[0].tbname.split(',')
                parser = Sqlparse(self.sql)
                tblist = parser.extract_tables()
                existTb = [val for val in blackTblist if val in tblist]
            if existTb:
                return True,existTb
        return False,[]
convert.py 文件源码 项目:sqlitis 作者: pglass 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def to_sqla(sql):
    tokens = sqlparse.parse(sql)[0].tokens
    tokens = remove_whitespace(tokens)
    return tokens_to_sqla(tokens).render()
utils.py 文件源码 项目:clickhouse-cli 作者: hatarist 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def find_prev_keyword(sql, n_skip=0):
    """
    Find the last sql keyword in an SQL statement.

    Returns the value of the last keyword, and the text of the query with
    everything after the last keyword stripped.
    """
    if not sql.strip():
        return None, ''

    parsed = sqlparse.parse(sql)[0]
    flattened = list(parsed.flatten())
    flattened = flattened[:len(flattened) - n_skip]

    logical_operators = ('AND', 'OR', 'NOT', 'BETWEEN')

    for t in reversed(flattened):
        if t.value == '(' or (t.is_keyword and (
                              t.value.upper() not in logical_operators)):
            # Find the location of token t in the original parsed statement
            # We can't use parsed.token_index(t) because t may be a child token
            # inside a TokenList, in which case token_index thows an error
            # Minimal example:
            #   p = sqlparse.parse('select * from foo where bar')
            #   t = list(p.flatten())[-3]  # The "Where" token
            #   p.token_index(t)  # Throws ValueError: not in list
            idx = flattened.index(t)

            # Combine the string values of all tokens in the original list
            # up to and including the target keyword token t, to produce a
            # query string with everything after the keyword token removed
            text = ''.join(tok.value for tok in flattened[:idx + 1])
            return t, text

    return None, ''


# Postgresql dollar quote signs look like `$$` or `$tag$`
utils.py 文件源码 项目:clickhouse-cli 作者: hatarist 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def is_open_quote(sql):
    """Returns true if the query contains an unclosed quote."""

    # parsed can contain one or more semi-colon separated commands
    parsed = sqlparse.parse(sql)
    return any(_parsed_is_open_quote(p) for p in parsed)
utils.py 文件源码 项目:clickhouse-cli 作者: hatarist 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parse_partial_identifier(word):
    """
    Attempt to parse a (partially typed) word as an identifier.

    word may include a schema qualification, like `schema_name.partial_name`
    or `schema_name.` There may also be unclosed quotation marks, like
    `"schema` or `schema."partial_name`.

    :param word: string representing a (partially complete) identifier
    :return: sqlparse.sql.Identifier, or None
    """

    p = sqlparse.parse(word)[0]
    n_tok = len(p.tokens)
    if n_tok == 1 and isinstance(p.tokens[0], Identifier):
        return p.tokens[0]
    elif p.token_next_by(m=(Error, '"'))[1]:
        # An unmatched double quote, e.g. '"foo', 'foo."', or 'foo."bar'
        # Close the double quote, then reparse
        return parse_partial_identifier(word + '"')
    else:
        return None


问题


面经


文章

微信
公众号

扫码关注公众号