python类format()的实例源码

ddquery.py 文件源码 项目:ddquery 作者: elinaldosoft 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def format(self, record):
        super(SqlFormatter, self).format(record)
        sql = record.sql.strip()

        if self.parse:
            sql = sqlparse.format(sql, reindent=self.reindent, keyword_case=self.keyword_case)
            if hasattr(record, 'duration'):
                sql = "({0:.3f}ms) {1}".format(record.duration, sql)

        if self.highlight:
            sql = highlight(
                    sql,
                    self._lexer,
                    self._formatter
                )

        return sql
operations.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
etl.py 文件源码 项目:bit 作者: codesmart-co 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create_table(self):

        if not self.schema:
            self.schema = DB_ETL_SCHEMA

        if not self.create_schema():
            return False

        logger.info('try to create table {} in {}'.format(
            self.sql_table_name,
            self.schema
        ))

        if self.exist_table():
            return True

        table = self.get_sql_table_object(need_columns=True)

        db_table = self.local_engine.execute(CreateTable(table))

        for index in table.indexes:
            self.local_engine.execute(CreateIndex(index))

        return db_table
etl.py 文件源码 项目:bit 作者: codesmart-co 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def locale_sql_count(self):
        sql = ''
        if self.schema and self.sql_table_name:
            """Use schema and table name"""
            try:
                sql = 'SELECT COUNT(*) AS rows_count FROM {schema}.{table};'.\
                    format(
                        schema=self.schema,
                        table=self.sql_table_name
                    )
            except Exception as e:
                logger.exception(str(e))
        elif self.sql_table_name:
            """Use table name"""
            try:
                sql = 'SELECT COUNT(*) AS rows_count FROM {table};'.format(
                    table=self.sql_table_name
                )
            except Exception as e:
                logger.exception(str(e))

        return sql
formatters.py 文件源码 项目:djangolab 作者: DhiaTN 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def format(self, record):

        sql = record.sql.strip()

        if sqlparse:
            # Indent the SQL query
            sql = sqlparse.format(sql, reindent=True)

        if pygments:
            # Highlight the SQL query
            sql = pygments.highlight(
                sql,
                SqlLexer(),
                Terminal256Formatter(style='monokai')
            )

        record.statement = sql
        return super(SQLFormatter, self).format(record)
test_format.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_strip_comments_multi(self):
        sql = '/* sql starts here */\nselect'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select')
        sql = '/* sql starts here */ select'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select')
        sql = '/*\n * sql starts here\n */\nselect'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select')
        sql = 'select (/* sql starts here */ select 2)'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select (select 2)')
        sql = 'select (/* sql /* starts here */ select 2)'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select (select 2)')
test_format.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_notransform_of_quoted_crlf(self):
        # Make sure that CR/CR+LF characters inside string literals don't get
        # affected by the formatter.

        s1 = "SELECT some_column LIKE 'value\r'"
        s2 = "SELECT some_column LIKE 'value\r'\r\nWHERE id = 1\n"
        s3 = "SELECT some_column LIKE 'value\\'\r' WHERE id = 1\r"
        s4 = "SELECT some_column LIKE 'value\\\\\\'\r' WHERE id = 1\r\n"

        f = lambda x: sqlparse.format(x)

        # Because of the use of
        self.ndiffAssertEqual(f(s1), "SELECT some_column LIKE 'value\r'")
        self.ndiffAssertEqual(f(s2), "SELECT some_column LIKE 'value\r'\nWHERE id = 1\n")
        self.ndiffAssertEqual(f(s3), "SELECT some_column LIKE 'value\\'\r' WHERE id = 1\n")
        self.ndiffAssertEqual(f(s4), "SELECT some_column LIKE 'value\\\\\\'\r' WHERE id = 1\n")
test_format.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_join(self):
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select * from foo join bar on 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'join bar on 1 = 2']))
        s = 'select * from foo inner join bar on 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'inner join bar on 1 = 2']))
        s = 'select * from foo left outer join bar on 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'left outer join bar on 1 = 2']
                                              ))
        s = 'select * from foo straight_join bar on 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'straight_join bar on 1 = 2']
                                              ))
test_format.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_duplicate_linebreaks(self):  # issue3
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select c1 -- column1\nfrom foo'
        self.ndiffAssertEqual(f(s), '\n'.join(['select c1 -- column1',
                                               'from foo']))
        s = 'select c1 -- column1\nfrom foo'
        r = sqlparse.format(s, reindent=True, strip_comments=True)
        self.ndiffAssertEqual(r, '\n'.join(['select c1',
                                            'from foo']))
        s = 'select c1\nfrom foo\norder by c1'
        self.ndiffAssertEqual(f(s), '\n'.join(['select c1',
                                               'from foo',
                                               'order by c1']))
        s = 'select c1 from t1 where (c1 = 1) order by c1'
        self.ndiffAssertEqual(f(s), '\n'.join(['select c1',
                                               'from t1',
                                               'where (c1 = 1)',
                                               'order by c1']))
test_regressions.py 文件源码 项目:codenn 作者: sriniiyer 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_issue90():
    sql = ('UPDATE "gallery_photo" SET "owner_id" = 4018, "deleted_at" = NULL,'
           ' "width" = NULL, "height" = NULL, "rating_votes" = 0,'
           ' "rating_score" = 0, "thumbnail_width" = NULL,'
           ' "thumbnail_height" = NULL, "price" = 1, "description" = NULL')
    formatted = sqlparse.format(sql, reindent=True)
    tformatted = '\n'.join(['UPDATE "gallery_photo"',
                            'SET "owner_id" = 4018,',
                            '    "deleted_at" = NULL,',
                            '    "width" = NULL,',
                            '    "height" = NULL,',
                            '    "rating_votes" = 0,',
                            '    "rating_score" = 0,',
                            '    "thumbnail_width" = NULL,',
                            '    "thumbnail_height" = NULL,',
                            '    "price" = 1,',
                            '    "description" = NULL'])
    assert formatted == tformatted
operations.py 文件源码 项目:lifesoundtrack 作者: MTG 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
pgmigrate.py 文件源码 项目:pgmigrate 作者: yandex 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _is_initialized(cursor):
    """
    Check that database is initialized
    """
    cursor.execute('SELECT EXISTS(SELECT 1 FROM '
                   'information_schema.tables '
                   'WHERE table_schema = %s '
                   'AND table_name = %s)',
                   ('public', 'schema_version'))
    table_exists = cursor.fetchone()[0]

    if not table_exists:
        return False

    cursor.execute('SELECT * from public.schema_version limit 1')

    colnames = [desc[0] for desc in cursor.description]

    if colnames != REF_COLUMNS:
        raise MalformedSchema(
            'Table schema_version has unexpected '
            'structure: {struct}'.format(struct='|'.join(colnames)))

    return True
pgmigrate.py 文件源码 项目:pgmigrate 作者: yandex 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _set_baseline(baseline_v, user, cursor):
    """
    Cleanup schema_version and set baseline
    """
    cursor.execute('SELECT EXISTS(SELECT 1 FROM public'
                   '.schema_version WHERE version >= %s::bigint)',
                   (baseline_v,))
    check_failed = cursor.fetchone()[0]

    if check_failed:
        raise BaselineError(
            'Unable to baseline, version '
            '{version} already applied'.format(version=text(baseline_v)))

    LOG.info('cleaning up table schema_version')
    cursor.execute('DELETE FROM public.schema_version')
    LOG.info(cursor.statusmessage)

    LOG.info('setting baseline')
    cursor.execute('INSERT INTO public.schema_version '
                   '(version, type, description, installed_by) '
                   'VALUES (%s::bigint, %s, %s, %s)',
                   (text(baseline_v), 'manual',
                    'Forced baseline', user))
    LOG.info(cursor.statusmessage)
pgmigrate.py 文件源码 项目:pgmigrate 作者: yandex 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_statements(path):
    """
    Get statements from file
    """
    with codecs.open(path, encoding='utf-8') as i:
        data = i.read()
    if u'/* pgmigrate-encoding: utf-8 */' not in data:
        try:
            data.encode('ascii')
        except UnicodeError as exc:
            raise MalformedStatement(
                'Non ascii symbols in file: {0}, {1}'.format(
                    path, text(exc)))
    data = sqlparse.format(data, strip_comments=True)
    for statement in sqlparse.parsestream(data, encoding='utf-8'):
        st_str = text(statement).strip().encode('utf-8')
        if st_str:
            yield st_str
pgmigrate.py 文件源码 项目:pgmigrate 作者: yandex 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _parse_dict_callbacks(callbacks, ret, base_dir):
    for i in callbacks:
        if i in ret._fields:
            for j in callbacks[i]:
                path = os.path.join(base_dir, j)
                if not os.path.exists(path):
                    raise ConfigurationError(
                        'Path unavailable: {path}'.format(path=text(path)))
                if os.path.isdir(path):
                    for fname in sorted(os.listdir(path)):
                        getattr(ret, i).append(os.path.join(path, fname))
                else:
                    getattr(ret, i).append(path)
        else:
            raise ConfigurationError(
                'Unexpected callback type: {type}'.format(type=text(i)))

    return ret
develop.py 文件源码 项目:jatumba-backend 作者: YetAnotherTeam 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process_response(request, response):
        if request.GET.get('debug') == '':
            if response['Content-Type'] == 'application/octet-stream':
                new_content = ('<html><body>Binary Data, Length: {}</body></html>'
                               .format(len(response.content)))
                response = HttpResponse(new_content)
            elif response['Content-Type'] != 'text/html':
                content = response.content
                try:
                    json_ = json.loads(content)
                    content = json.dumps(json_, sort_keys=True, indent=2)
                except ValueError:
                    pass
                response = HttpResponse('<html><body><pre>{}</pre></body></html>'.format(content))

        return response
operations.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
operations.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
operations.py 文件源码 项目:django-next-train 作者: bitpixdigital 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
operations.py 文件源码 项目:LatinSounds_AppEnviaMail 作者: G3ek-aR 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
__init__.py 文件源码 项目:dustbunny 作者: Teamworksapp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def render_sql(db, q, inline=False):
    """
    Render the sql used by a query (only works for Postgres)

    :param q (query): an SQLAlchemy query object 
    :param inline (bool): inline parameters? 
    :return: str
    """
    compiled_statement = q.statement.compile(dialect=postgresql.dialect())
    pretty_statement = sql_format(str(compiled_statement), reindent=True)
    if inline:
        with db.session.connection().connection.connection.cursor() as cur:
            return cur.mogrify(pretty_statement, compiled_statement.params).decode('utf-8')
    else:
        return pretty_statement + ("\nparameters: {}".format(str(compiled_statement.params)) if compiled_statement.params else '')
ddquery_test.py 文件源码 项目:ddquery 作者: elinaldosoft 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_sqlformatter(self):
        sqlformatter = SqlFormatter().format(self.model)
        sql_compare = 'SELECT "django_migrations"."app","django_migrations"."name" FROM "django_migrations"'
        sql_compare = sqlparse.format(sql_compare, reindent=True, keyword_case='upper')
        sql_compare = "(0.005ms) {0}".format(sql_compare)

        sql_compare = highlight(
            sql_compare,
            SqlLexer(),
            Terminal256Formatter(style='default')
        )

        self.assertEqual(sql_compare, sqlformatter)
postgresql.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def prettify_sql(sql):
    if sql is None:
        return None
    return sqlparse.format(
        sql,
        keyword_case="upper",
        identfier_case="lower",
        strip_comments=False,
        reindent=True,
        indent_tabs=False)
postgresql.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_safe_connection_string(conn):
    try:
        # 2.7+
        params = conn.get_dsn_parameters()
    except AttributeError:
        params = dict(i.split('=') for i in shlex.split(conn.dsn))
    return '{user}@{host}:{port}/{dbname}'.format(**params)
postgresql.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def callproc(self, procname, vars=None):
        timestamp = time.time()
        try:
            return super(TaliskerCursor, self).callproc(procname, vars)
        finally:
            duration = (time.time() - timestamp) * 1000
            # no query parameters, cannot safely record
            self.connection._record(
                'stored proc: {}'.format(procname), None, duration)
operations.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def return_insert_id(self):
        """
        For backends that support returning the last insert ID as part
        of an insert query, this method returns the SQL and params to
        append to the INSERT query. The returned fragment should
        contain a format string to hold the appropriate column.
        """
        pass
operations.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_db_converters(self, expression):
        """
        Get a list of functions needed to convert field data.

        Some field types on some backends do not provide data in the correct
        format, this is the hook for converter functions.
        """
        return []
etl.py 文件源码 项目:bit 作者: codesmart-co 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_perm(self):
        return (
            '[{obj.sql_table_name}]'
            '(id:{obj.id})').format(obj=self)
etl.py 文件源码 项目:bit 作者: codesmart-co 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def sql_table_name(self):
        return '{}_{}'.format(DB_ETL_PREFIX, self.name)
etl.py 文件源码 项目:bit 作者: codesmart-co 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_schema(self):
        logger.info('try to create schema {}'.format(self.schema))
        if self.exist_schema():
            return True
        return self.local_engine.execute(CreateSchema(self.schema))


问题


面经


文章

微信
公众号

扫码关注公众号