def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
python类split()的实例源码
def execute_sql_file(filename):
""" Since mysql can't rollback any CREATE/ALTER/DROP instruction,
do a full backup before starting the migration and if it fails, and reapply
it if it fails
"""
with database.Database() as db, open(filename) as fd:
if not db.transaction():
print("Failed to start the migration")
sys.exit(1)
cursor = QtSql.QSqlQuery(db)
for statement in sqlparse.split(fd.read()):
if not statement:
continue
if not cursor.exec_(statement):
db.rollback()
print(cursor.lastError().text())
break
else:
db.commit()
def test_psql_quotation_marks(): # issue83
# regression: make sure plain $$ work
t = sqlparse.split("""
CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $$
....
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $$
....
$$ LANGUAGE plpgsql;""")
assert len(t) == 2
# make sure $SOMETHING$ works too
t = sqlparse.split("""
CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $PROC_1$
....
$PROC_1$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $PROC_2$
....
$PROC_2$ LANGUAGE plpgsql;""")
assert len(t) == 2
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def tokenize_content(file_path, content):
"""
Take a SQL file with multiple statements,
returns a list of blocks.
"""
sql_blocks = sqlparse.split(content)
sql_blocks = filter(None, sql_blocks)
if not sql_blocks:
return
def builder(content, has_body=False):
return Block(file_path, has_body, content)
for block in sql_blocks:
# would have used `yield from' below but py2...
for r in _extract_parts(builder, block.split('\n')):
yield r
def prepare_sql_script(self, sql):
"""
Takes an SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
try:
import sqlparse
except ImportError:
raise ImproperlyConfigured(
"sqlparse is required if you don't split your SQL "
"statements manually."
)
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def is_single_statement(sql):
'''Returns True if received SQL string contains at most one statement'''
return len(sqlparse.split(sql)) <= 1
def parse(self,s):
# str: 1d,2h,3m,4s
query = s.split(',')
for elm in query:
print(elm)
if not self.elm_pattern.match(elm):
raise ValueError('Cannot parse query: {}'.format(elm))
for e in ['d', 'h', 'm', 's']:
if e in elm:
setattr(self, e, int(elm.split(e)[0]))
def timestamp_from_string(str):
print(str)
if (str.startswith( 'last ', 0, 5 )):
# sample queries: 1m; 1m,2s; 1d,2h,3m,4s
query = Timeseries_query(str.split('last ')[1])
diff = datetime.timedelta(seconds=query.s, minutes=query.m, hours=query.h, days=query.d)
return utcnow() - diff
return iso8601.parse_date(str)
def is_single_statement(sql):
'''Returns True if received SQL string contains at most one statement'''
return len(sqlparse.split(sql)) <= 1
def apply_migrations():
# Get the latest applied migration
with database.Cursor() as cursor:
cursor.prepare("SELECT version FROM __migrations ORDER BY version DESC LIMIT 1")
cursor.exec_()
last_applied = -1
if cursor.next():
last_applied = cursor.value("version")
migration_file, migration_name = tempfile.mkstemp()
should_apply = False
nb = 0
for migration in sorted(os.listdir("../migrations")):
nb = int(migration.split("_")[0])
if nb <= last_applied:
continue
should_apply = True
print(f"Applying {migration}")
with open(os.path.join("../migrations", migration, "up.sql")) as fd:
os.write(migration_file, fd.read().encode())
if not should_apply:
print("Nothing to do")
sys.exit(1)
os.write(migration_file, f"INSERT INTO __migrations(version) VALUES({nb});\n".encode())
# Execute the generated file.
execute_sql_file(migration_name)
# Since mkstemp doesn't handle automatically the file deletion, do it
# ourselves
os.remove(migration_name)
def rollback_migrations():
lower = -1
upper = -1
with database.Cursor() as cursor:
cursor.prepare("SELECT version FROM __migrations ORDER BY version DESC LIMIT 2")
cursor.exec_()
if cursor.next():
upper = int(cursor.value("version"))
if cursor.next():
lower = int(cursor.value("version"))
should_apply = False
migration_file, migration_name = tempfile.mkstemp()
for migration in sorted(os.listdir("../migrations"), reverse=True):
nb = int(migration.split("_")[0])
if lower < nb <= upper:
should_apply = True
print(f"Rollback {migration}")
with open(os.path.join("../migrations", migration, "down.sql")) as fd:
os.write(migration_file, fd.read().encode())
os.write(migration_file, f"DELETE FROM __migrations WHERE version={upper};\n".encode())
if should_apply:
execute_sql_file(migration_name)
else:
print("Nothing to do")
os.remove(migration_name)
def prepare_sql_script(self, sql, _allow_fallback=False):
"""
Takes a SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
# Remove _allow_fallback and keep only 'return ...' in Django 1.9.
try:
# This import must stay inside the method because it's optional.
import sqlparse
except ImportError:
if _allow_fallback:
# Without sqlparse, fall back to the legacy (and buggy) logic.
warnings.warn(
"Providing initial SQL data on a %s database will require "
"sqlparse in Django 1.9." % self.connection.vendor,
RemovedInDjango19Warning)
from django.core.management.sql import _split_statements
return _split_statements(sql)
else:
raise
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def test_casewhen(self):
sql = ('SELECT case when val = 1 then 2 else null end as foo;\n'
'comment on table actor is \'The actor table.\';')
stmts = sqlparse.split(sql)
self.assertEqual(len(stmts), 2)
def test_cursor_declare(self):
sql = ('DECLARE CURSOR "foo" AS SELECT 1;\n'
'SELECT 2;')
stmts = sqlparse.split(sql)
self.assertEqual(len(stmts), 2)
def test_if_function(self): # see issue 33
# don't let IF as a function confuse the splitter
sql = ('CREATE TEMPORARY TABLE tmp '
'SELECT IF(a=1, a, b) AS o FROM one; '
'SELECT t FROM two')
stmts = sqlparse.split(sql)
self.assertEqual(len(stmts), 2)
def test_split_simple():
stmts = sqlparse.split('select * from foo; select * from bar;')
assert len(stmts) == 2
assert stmts[0] == 'select * from foo;'
assert stmts[1] == 'select * from bar;'
def execute_query(self, segment, query):
'''Returns a cursor.'''
logging.info('Servicing request: {query}'.format(query=query))
# if the user sent more than one query, or the query is not a SELECT, raise an exception.
if len(sqlparse.split(query)) != 1 or sqlparse.parse(query)[0].get_type() != 'SELECT':
raise Exception('Exactly one SELECT query per request, please.')
assert os.path.isfile(segment.local_path())
logging.info("Connecting to sqlite database: {segment}".format(segment=segment.local_path()))
connection = sqlite3.connect(segment.local_path())
trough.sync.setup_connection(connection)
cursor = connection.cursor()
cursor.execute(query.decode('utf-8'))
return cursor
# uwsgi endpoint
def __call__(self, env, start_response):
try:
query_dict = urllib.parse.parse_qs(env['QUERY_STRING'])
# use the ?segment= query string variable or the host string to figure out which sqlite database to talk to.
segment_id = query_dict.get('segment', env.get('HTTP_HOST', "").split("."))[0]
logging.info('Connecting to Rethinkdb on: %s' % settings['RETHINKDB_HOSTS'])
segment = trough.sync.Segment(segment_id=segment_id, size=0, rethinker=self.rethinker, services=self.services, registry=self.registry)
content_length = int(env.get('CONTENT_LENGTH', 0))
query = env.get('wsgi.input').read(content_length)
write_lock = segment.retrieve_write_lock()
if write_lock and write_lock['node'] != settings['HOSTNAME']:
logging.info('Found write lock for {segment}. Proxying {query} to {host}'.format(segment=segment.id, query=query, host=write_lock['node']))
return self.proxy_for_write_host(write_lock['node'], segment, query, start_response)
## # enforce that we are querying the correct database, send an explicit hostname.
## write_url = "http://{node}:{port}/?segment={segment}".format(node=node, segment=segment.id, port=settings['READ_PORT'])
## with requests.post(write_url, stream=True, data=query) as r:
## status_line = '{status_code} {reason}'.format(status_code=r.status_code, reason=r.reason)
## headers = [("Content-Type", r.headers['Content-Type'],)]
## start_response(status_line, headers)
## return r.iter_content()
cursor = self.execute_query(segment, query)
start_response('200 OK', [('Content-Type','application/json')])
return self.sql_result_json_iter(cursor)
except Exception as e:
logging.error('500 Server Error due to exception', exc_info=True)
start_response('500 Server Error', [('Content-Type', 'text/plain')])
return [('500 Server Error: %s\n' % str(e)).encode('utf-8')]
def handle_input(self, input_data, verbose=True, refresh_metadata=True):
force_pager = False
if input_data.endswith(r'\p' if isinstance(input_data, str) else rb'\p'):
input_data = input_data[:-2]
force_pager = True
# FIXME: A dirty dirty hack to make multiple queries (per one paste) work.
self.query_ids = []
for query in sqlparse.split(input_data):
query_id = str(uuid4())
self.query_ids.append(query_id)
self.handle_query(query, verbose=verbose, query_id=query_id, force_pager=force_pager)
if refresh_metadata and input_data:
self.cli.application.buffer.completer.refresh_metadata()
def statements_split(cls, statements):
for statement in sqlparse.split(statements):
statement = statement.strip()
if statement.endswith(';'):
statement = statement[:-1].strip()
if statement: # remove empty statements
yield statement
def execute_sql_file(db, fp):
log.debug("executing sql file: `%s'", fp)
with open(fp, 'r') as f:
lines = f.readlines()
txt = '\n'.join(lines)
parts = sqlparse.split(txt)
# TODO: use transaction
for i, part in enumerate(parts):
log.debug("executing statement %s, `%s...'", i, part[:20].replace('\n', ''))
db.execute(text(part))
def convert_to_sqlalchemy_statement(raw_sql_script):
"""Convert raw SQL into SQLAlchemy statement."""
# remove comment and tail spaces
formated_sql_script = sqlparse.format(
raw_sql_script.strip(), strip_comments=True)
return sqlparse.split(formated_sql_script)
def column_windows(session, w_column, w_size, fb_kw=None, f_expr=None):
"""Return a series of WHERE clauses against a given column that break it
into windows.
Parameters
----------
session : object
An instance of SQLAlchemy Session.
w_column : object
Column object that is used to split into windows, should be an
integer column.
w_size : int
Size of the window
fb_kw : dict
The filter_by keywords, used by query.filter_by().
f_expr : list
The filter expressions, used by query.filter().
Returns
-------
iterable
Each element of the iterable is a whereclause expression, which
specify the range of the window over the column `w_col`.
Exmaple
-------
for whereclause in column_windows(q.session, w_column, w_size):
for row in q.filter(whereclause).order_by(w_column):
yield row
"""
def int_for_range(start_id, end_id):
"""Internal function to build range."""
if end_id:
return and_(w_column >= start_id, w_column < end_id)
else:
return w_column >= start_id
q = session.query(
w_column, func.row_number().over(order_by=w_column).label('w_row_num'))
if fb_kw:
q = q.filter_by(**fb_kw)
if f_expr:
q = q.filter(*f_expr)
q = q.from_self(w_column)
if w_size > 1:
q = q.filter(sqlalchemy.text("w_row_num % {}=1".format(w_size)))
intervals = [id for id, in q]
while intervals:
start = intervals.pop(0)
if intervals:
end = intervals[0]
else:
end = None
yield int_for_range(start, end)
def run(self, engine, step=None):
"""Runs SQL script through raw dbapi execute call"""
text = self.source()
# Don't rely on SA's autocommit here
# (SA uses .startswith to check if a commit is needed. What if script
# starts with a comment?)
conn = engine.connect()
try:
trans = conn.begin()
try:
# ignore transaction management statements that are
# redundant in SQL script context and result in
# operational error being returned.
#
# Note: we don't ignore ROLLBACK in migration scripts
# since its usage would be insane anyway, and we're
# better to fail on its occurance instead of ignoring it
# (and committing transaction, which is contradictory to
# the whole idea of ROLLBACK)
ignored_statements = ('BEGIN', 'END', 'COMMIT')
ignored_regex = re.compile('^\s*(%s).*;?$' % '|'.join(ignored_statements),
re.IGNORECASE)
# NOTE(ihrachys): script may contain multiple statements, and
# not all drivers reliably handle multistatement queries or
# commands passed to .execute(), so split them and execute one
# by one
text = sqlparse.format(text, strip_comments=True, strip_whitespace=True)
for statement in sqlparse.split(text):
if statement:
if re.match(ignored_regex, statement):
log.warning('"%s" found in SQL script; ignoring' % statement)
else:
conn.execute(statement)
trans.commit()
except Exception as e:
log.error("SQL script %s failed: %s", self.path, e)
trans.rollback()
raise
finally:
conn.close()
def prepare_sql_script(self, sql, _allow_fallback=False):
"""
Takes a SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
# Remove _allow_fallback and keep only 'return ...' in Django 1.9.
try:
# This import must stay inside the method because it's optional.
import sqlparse
except ImportError:
if _allow_fallback:
# Without sqlparse, fall back to the legacy (and buggy) logic.
warnings.warn(
"Providing initial SQL data on a %s database will require "
"sqlparse in Django 1.9." % self.connection.vendor,
RemovedInDjango19Warning)
from django.core.management.sql import _split_statements
return _split_statements(sql)
else:
raise
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
def connect(self):
self.scheme = 'http'
if '://' in self.host:
u = urlparse(self.host, allow_fragments = False)
self.host = u.hostname
self.port = u.port or self.port
self.scheme = u.scheme
self.url = '{scheme}://{host}:{port}/'.format(scheme=self.scheme, host=self.host, port=self.port)
self.client = Client(
self.url,
self.user,
self.password,
self.database,
self.settings,
self.stacktrace,
self.conn_timeout,
self.conn_timeout_retry,
self.conn_timeout_retry_delay,
)
self.echo.print("Connecting to {host}:{port}".format(
host=self.host, port=self.port)
)
try:
response = self.client.query('SELECT version();', fmt='TabSeparated')
except TimeoutError:
self.echo.error("Error: Connection timeout.")
return False
except ConnectionError:
self.echo.error("Error: Failed to connect.")
return False
except DBException as e:
self.echo.error("Error:")
self.echo.error(e.error)
if self.stacktrace and e.stacktrace:
self.echo.print("Stack trace:")
self.echo.print(e.stacktrace)
return False
if not response.data.endswith('\n'):
self.echo.error("Error: Request failed: `SELECT version();` query failed.")
return False
version = response.data.strip().split('.')
self.server_version = (int(version[0]), int(version[1]), int(version[2]))
self.echo.success(
"Connected to ClickHouse server v{0}.{1}.{2}.\n".format(
*self.server_version
)
)
return True