def get_classified_songs(self, telegram_id):
conn = sqlite3.connect(self._DATABASE)
sql = """
SELECT
danceability, energy, loudness, speechiness, acousticness,
instrumentalness, liveness, valence, tempo, activity
FROM songs s, users u, song_user su
WHERE
activity IS NOT NULL AND
s.id = su.song_id AND
su.user_id = u.id AND
u.telegram_user_id = {}
""".format(telegram_id)
resp = pd.read_sql_query(sql, conn)
conn.close()
return resp
python类read_sql_query()的实例源码
def build_df(table: str = 'articles',
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> pd.DataFrame:
"""Build dataframe with derived fields."""
with closing(sqlite3.connect(DB_FILE_NAME)) as conn:
articles = pd.read_sql_query(f'select * from {table}', conn)
articles['date'] = pd.to_datetime(articles['publish_date'])
if start_date:
articles = articles.loc[articles['date'] >= start_date]
if end_date:
articles = articles.loc[articles['date'] <= end_date]
articles = articles.replace([None], [''], regex=True)
articles['base_url'] = articles.apply(get_url_base, axis=1)
articles['word_count'] = articles.apply(count_words, axis=1)
return articles
def SensitivityQuery(self, table, data_set):
# Returns the number of times an analyte is found at each concentration and the
# number of repetitions in a particular data set.
sql_statement = "SELECT COUNT(%s.id) AS Count, %s.Concentration_pg AS Conc_pg, \
DataSetConcentrations.Repetitions AS Repetitions \
FROM \
Sample \
INNER JOIN %s ON \
%s.id = Sample.%s_foreignkey \
INNER JOIN DataSetConcentrations ON \
DataSetConcentrations.id = Sample.DataSetConcentrations_foreignkey \
WHERE \
Sample.DataSetName = '%s' \
GROUP BY \
Conc_pg \
ORDER BY \
Conc_pg;" % (table, table, table, table, table, data_set)
return pd.read_sql_query(sql_statement, self.conn)
def GetRepsAtEachConcentration(self, analyte_table_lst, data_set):
df = pd.DataFrame()
for table in analyte_table_lst:
sql_statement = "SELECT \
%s.Concentration_pg AS Conc, COUNT(%s.Concentration_pg) AS %s \
FROM \
Sample \
Inner Join %s ON \
%s.id = Sample.%s_foreignkey \
WHERE \
DataSetName = '%s' \
GROUP BY 1 \
ORDER BY 1 ASC;" % (table, table, table, table, table, table, data_set)
df1 = pd.read_sql_query(sql_statement, self.conn)
df1.set_index('Conc', inplace=True)
df = pd.concat([df, df1], axis=1)
return df
def import_data_from_psql(user_id):
"""Import data from psql; clean & merge dataframes."""
library = pd.read_sql_table(
'library',
con='postgres:///nextbook',
columns=['book_id', 'title', 'author', 'pub_year', 'original_pub_year', 'pages'])
book_subjects = pd.read_sql_table(
'book_subjects',
con='postgres:///nextbook')
subjects = pd.read_sql_table(
'subjects', con='postgres:///nextbook',
columns=['subject_id', 'subject'])
user_ratings = pd.read_sql_query(
sql=('SELECT book_id, user_id, status, rating FROM user_books WHERE user_id=%s' % user_id),
con='postgres:///nextbook')
library = library.merge(user_ratings, how='left', on='book_id')
library['pages'].fillna(0, inplace=True)
# merge subject names into book_subjects; drop uninteresting subjects from book_subjects table
book_subjects = book_subjects.merge(subjects, how='left', on='subject_id')
delete_values = ["protected daisy", "accessible book", "in library", "overdrive", "large type books", 'ficci\xc3\xb3n juvenil', 'ficci\xc3\xb3n', 'lending library']
book_subjects = book_subjects[~book_subjects['subject'].isin(delete_values)]
return [library, book_subjects, subjects]
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_sql_open_close(self):
# Test if the IO in the database still work if the connection closed
# between the writing and reading (as in many real situations).
with tm.ensure_clean() as name:
conn = self.connect(name)
sql.to_sql(self.test_frame3, "test_frame3_legacy", conn,
flavor="sqlite", index=False)
conn.close()
conn = self.connect(name)
result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;",
conn)
conn.close()
tm.assert_frame_equal(self.test_frame3, result)
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_datetime(self):
df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
'B': np.arange(3.0)})
df.to_sql('test_datetime', self.conn)
# with read_table -> type information from schema used
result = sql.read_sql_table('test_datetime', self.conn)
result = result.drop('index', axis=1)
tm.assert_frame_equal(result, df)
# with read_sql -> no type information -> sqlite has no native
result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
result = result.drop('index', axis=1)
if self.flavor == 'sqlite':
self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
result['A'] = to_datetime(result['A'])
tm.assert_frame_equal(result, df)
else:
tm.assert_frame_equal(result, df)
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_datetime_NaT(self):
df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
'B': np.arange(3.0)})
df.loc[1, 'A'] = np.nan
df.to_sql('test_datetime', self.conn, index=False)
# with read_table -> type information from schema used
result = sql.read_sql_table('test_datetime', self.conn)
tm.assert_frame_equal(result, df)
# with read_sql -> no type information -> sqlite has no native
result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
if self.flavor == 'sqlite':
self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
result['A'] = to_datetime(result['A'], errors='coerce')
tm.assert_frame_equal(result, df)
else:
tm.assert_frame_equal(result, df)
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_datetime_time(self):
# test support for datetime.time
df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"])
df.to_sql('test_time', self.conn, index=False)
res = read_sql_table('test_time', self.conn)
tm.assert_frame_equal(res, df)
# GH8341
# first, use the fallback to have the sqlite adapter put in place
sqlite_conn = TestSQLiteFallback.connect()
sql.to_sql(df, "test_time2", sqlite_conn, index=False)
res = sql.read_sql_query("SELECT * FROM test_time2", sqlite_conn)
ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
tm.assert_frame_equal(ref, res) # check if adapter is in place
# then test if sqlalchemy is unaffected by the sqlite adapter
sql.to_sql(df, "test_time3", self.conn, index=False)
if self.flavor == 'sqlite':
res = sql.read_sql_query("SELECT * FROM test_time3", self.conn)
ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
tm.assert_frame_equal(ref, res)
res = sql.read_sql_table("test_time3", self.conn)
tm.assert_frame_equal(df, res)
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_connectable_issue_example(self):
# This tests the example raised in issue
# https://github.com/pydata/pandas/issues/10104
def foo(connection):
query = 'SELECT test_foo_data FROM test_foo_data'
return sql.read_sql_query(query, con=connection)
def bar(connection, data):
data.to_sql(name='test_foo_data',
con=connection, if_exists='append')
def main(connectable):
with connectable.connect() as conn:
with conn.begin():
foo_data = conn.run_callable(foo)
conn.run_callable(bar, foo_data)
DataFrame({'test_foo_data': [0, 1, 2]}).to_sql(
'test_foo_data', self.conn)
main(self.conn)
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_temporary_table(self):
test_data = u'Hello, World!'
expected = DataFrame({'spam': [test_data]})
Base = declarative.declarative_base()
class Temporary(Base):
__tablename__ = 'temp_test'
__table_args__ = {'prefixes': ['TEMPORARY']}
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)
Session = sa_session.sessionmaker(bind=self.conn)
session = Session()
with session.transaction:
conn = session.connection()
Temporary.__table__.create(conn)
session.add(Temporary(spam=test_data))
session.flush()
df = sql.read_sql_query(
sql=sqlalchemy.select([Temporary.spam]),
con=conn,
)
tm.assert_frame_equal(df, expected)
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def _get_index_columns(self, tbl_name):
ixs = sql.read_sql_query(
"SHOW INDEX IN %s" % tbl_name, self.conn)
ix_cols = {}
for ix_name, ix_col in zip(ixs.Key_name, ixs.Column_name):
if ix_name not in ix_cols:
ix_cols[ix_name] = []
ix_cols[ix_name].append(ix_col)
return list(ix_cols.values())
# TODO: cruft?
# def test_to_sql_save_index(self):
# self._to_sql_save_index()
# for ix_name, ix_col in zip(ixs.Key_name, ixs.Column_name):
# if ix_name not in ix_cols:
# ix_cols[ix_name] = []
# ix_cols[ix_name].append(ix_col)
# return ix_cols.values()
def get_data(query):
"""
Pulls data from the db based on the query
Input
-----
query: str
SQL query from the database
Output
------
data: DataFrame
Dump of Query into a DataFrame
"""
from setup_environment import db_dict
with setup_environment.connect_to_syracuse_db(**db_dict) as conn:
data = pd.read_sql_query(query, conn)
return data
def show_hexbin(self, query):
"""shows hexbin plot over map
Args:
query: name of sql
"""
self.load()
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
hx = self.base_map.hexbin(
np.array([geom.x for geom in points]),
np.array([geom.y for geom in points]),
gridsize=275,
bins='log',
mincnt=1,
edgecolor='none',
alpha=1.,
lw=0.2,
cmap=plt.get_cmap('afmhot'))
plt.tight_layout()
plt.show()
def show_scatter(self, query, color='blue'):
self.load()
"""shows scatter plot over map
Args:
query: name of sql
"""
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
plot = self.base_map.scatter(
[point.x for point in points],
[point.y for point in points],
10, marker='o', lw=.25,
facecolor=color, edgecolor='w',
alpha=0.9, antialiased=True,
zorder=3)
plt.show()
def get_classified_songs(self, telegram_id):
conn = sqlite3.connect(self._DATABASE)
sql = """
SELECT
danceability, energy, loudness, speechiness, acousticness,
instrumentalness, liveness, valence, tempo, activity
FROM songs s, users u, song_user su
WHERE
activity IS NOT NULL AND
s.id = su.song_id AND
su.user_id = u.id AND
u.telegram_user_id = {}
""".format(telegram_id)
resp = pd.read_sql_query(sql, conn)
conn.close()
return resp
def get_upstream_stops_ratio(self, target, trough_stops, ratio):
"""
Selects the stops for which the ratio or higher proportion of trips to the target passes trough a set of trough stops
:param target: target of trips
:param trough_stops: stops where the selected trips are passing trough
:param ratio: threshold for inclusion
:return:
"""
if isinstance(trough_stops, list):
trough_stops = ",".join(trough_stops)
query = """SELECT stops.* FROM other.stops,
(SELECT q2.from_stop_I AS stop_I FROM
(SELECT journeys.from_stop_I, count(*) AS n_total FROM journeys
WHERE journeys.to_stop_I = {target}
GROUP BY from_stop_I) q1,
(SELECT journeys.from_stop_I, count(*) AS n_trough FROM journeys, legs
WHERE journeys.journey_id=legs.journey_id AND legs.from_stop_I IN ({trough_stops}) AND journeys.to_stop_I = {target}
GROUP BY journeys.from_stop_I) q2
WHERE q1.from_stop_I = q2.from_stop_I AND n_trough/(n_total*1.0) >= {ratio}) q1
WHERE stops.stop_I = q1.stop_I""".format(target=target, trough_stops=trough_stops, ratio=ratio)
df = read_sql_query(query, self.conn)
return df
def get_directly_accessible_stops_within_distance(self, stop, distance):
"""
Returns stops that are accessible without transfer from the stops that are within a specific walking distance
:param stop: int
:param distance: int
:return:
"""
query = """SELECT stop.* FROM
(SELECT st2.* FROM
(SELECT * FROM stop_distances
WHERE from_stop_I = %s) sd,
(SELECT * FROM stop_times) st1,
(SELECT * FROM stop_times) st2
WHERE sd.d < %s AND sd.to_stop_I = st1.stop_I AND st1.trip_I = st2.trip_I
GROUP BY st2.stop_I) sq,
(SELECT * FROM stops) stop
WHERE sq.stop_I = stop.stop_I""" % (stop, distance)
return pd.read_sql_query(query, self.conn)
def setUp(self):
self.data = {
"DC_PEC": '''
import pandas as pd
from sqlalchemy import create_engine
from urllib.request import urlretrieve; urlretrieve('https://s3.amazonaws.com/assets.datacamp.com/production/course_998/datasets/Chinook.sqlite', 'Chinook.sqlite')
engine = create_engine('sqlite:///Chinook.sqlite')
''',
"DC_CODE": '''
# Execute query and store records in dataframe: df
df = pd.read_sql_query("ELECT * FROM PlaylistTrack INNER JOIN Track on PlaylistTrack.TrackId = Track.TrackId WHERE Milliseconds < 250000", engine)
# Print head of dataframe
print(df.head())
''',
"DC_SOLUTION": '''
# Execute query and store records in dataframe: df
df = pd.read_sql_query("SELECT * FROM PlaylistTrack INNER JOIN Track on PlaylistTrack.TrackId = Track.TrackId WHERE Milliseconds < 250000", engine)
# Print head of dataframe
print(df.head())
'''
}
def test_Pass(self):
self.data["DC_SCT"] = '''
# Test: call to read_sql_query() and 'df' variable
test_correct(
lambda: test_object("df"),
lambda: test_function("pandas.read_sql_query", do_eval = False)
)
# Test: Predefined code
predef_msg = "You don't have to change any of the predefined code."
test_function("print", incorrect_msg = predef_msg)
success_msg("Great work!")
'''
sct_payload = helper.run(self.data)
self.assertFalse(sct_payload['correct'])
def run(x, string):
print("Processing chunk: {}".format(string))
conn = sqlite3.connect(':memory:')
c = conn.cursor()
try:
c.execute("""CREATE TABLE war (nb_trick int)""")
conn.commit()
except sqlite3.OperationalError:
pass
for i in range(x):
b = Cython_War_Trick.Battle()
result = b.trick()
c.execute("""INSERT INTO war VALUES (?)""", [result])
conn.commit()
chunk = pd.read_sql_query("""SELECT nb_trick FROM war""", conn)
f = chunk['nb_trick'].value_counts()
return f
def run(self):
result_list = []
sql = None
while True:
self.lock.acquire()
if not self.queue.empty():
sql = self.queue.get()
self.lock.release()
else:
self.lock.release()
break
stock_data = pd.read_sql_query(sql, con=self.db)
stock_data = stock_data.set_index('datetime')
result_list.append(stock_data)
print "A stock has finished reading and {} stocks left".format(self.queue.qsize())
self.lock.acquire()
self.parent_list.extend(result_list)
self.lock.release()
self.db.close()
def get_nr_particles_per_population(self) -> pd.Series:
"""
Returns
-------
nr_particles_per_population: pd.DataFrame
A pandas DataFrame containing the number
of particles for each population
"""
query = (self._session.query(Population.t)
.join(ABCSMC)
.join(Model)
.join(Particle)
.filter(ABCSMC.id == self.id))
df = pd.read_sql_query(query.statement, self._engine)
nr_particles_per_population = df.t.value_counts().sort_index()
return nr_particles_per_population
def get_predict_acc2(debug=False):
db = Db()
engine = db._get_engine()
sql_stocklist = "select * from acc1"
if debug:
pass
df = pd.read_sql_query(sql_stocklist, engine)
acc2 = df.sort_values('c_yearmonthday', ascending=0)
acc2 = acc2.head(2)
acc2 = acc2.groupby('c_yearmonthday').sum()
acc2_final = pd.DataFrame()
acc2_final['h_p_acc'] = [df['acc'].sum() / float(df['acc'].count())]
acc2_final['h_p_change'] = [df['p_change'].sum() / 2.0]
acc2_final['p_acc'] = [acc2['acc'].sum() / 2.0]
acc2_final['p_change'] = [acc2['p_change'].sum() / 2.0]
return acc2_final
def get_data(self,labels_=None, data=None):
print('Loading CleanText from DataBase from...')
conn = connect('/home/gondin/metis/project/clinton-email-download/hrcemail3.sqlite')
sql = """SELECT Keywords, Polarity, Subjectivity, "from", cluster_labels, pdf_path as "Email" FROM document;"""
self.data = pd.read_sql_query(sql, conn)
self.data['Similarity'] = self.similarity[:,0]
conn.close()
#self.data = self.data.sample(1000)
self.data = self.data.sample(15000,random_state=44)
# labels_ =self.labels_ ==self.label_
labels_ = self.labels_
print(self.data.shape)
print(labels_.shape)
self.data.Polarity = self.data.Polarity.apply(lambda x: round(x,2))
return (self.data.ix[labels_ & (self.data.cluster_labels>0), ['Keywords','Similarity','Polarity', 'Subjectivity', "from","Email"]].sort_values('Similarity'))
#return (self.data.ix[labels_, ['Keywords','dist', "Email"]].sort_values('dist'))
def get_data(self,labels_=None, data=None):
print('Loading CleanText from DataBase from...')
conn = connect('/home/gondin/metis/project/clinton-email-download/hrcemail3.sqlite')
sql = """SELECT Keywords, Polarity, Subjectivity, "from", cluster_labels, pdf_path as "Email" FROM document;"""
self.data = pd.read_sql_query(sql, conn)
self.data['Similarity'] = self.similarity[:,0]
conn.close()
#self.data = self.data.sample(1000)
self.data = self.data.sample(15000,random_state=44)
# labels_ =self.labels_ ==self.label_
labels_ = self.labels_
print(self.data.shape)
print(labels_.shape)
self.data.Polarity = self.data.Polarity.apply(lambda x: round(x,2))
return (self.data.ix[labels_ & (self.data.cluster_labels>0), ['Keywords','Similarity','Polarity', 'Subjectivity', "from","Email"]].sort_values('Similarity'))
#return (self.data.ix[labels_, ['Keywords','dist', "Email"]].sort_values('dist'))
def query_wikidata_mysql(query):
with SSHTunnelForwarder((ssh_host, ssh_port), ssh_username=ssh_user, ssh_pkey=mypkey,
remote_bind_address=(sql_hostname, sql_port)) as tunnel:
conn = pymysql.connect(host='127.0.0.1', user=sql_user, password=sql_pass, db=sql_main_database,
port=tunnel.local_bind_port)
df = pd.read_sql_query(query, conn)
conn.close()
return df
def tweet_dates(self):
conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_COLNAMES)
df = pd.read_sql_query(
'SELECT created_at FROM tweets', conn, parse_dates=['created_at'],
index_col=['created_at']
)
return df
def all(self):
conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES)
df = pd.read_sql_query(
'SELECT * FROM tweets', conn, parse_dates=['created_at']
)
return df
def tweets_since(self, dt):
"""
Retrieves all tweets since a particular datetime as a generator that
iterates on ``chunksize``.
:param dt: The starting datetime to query from.
"""
conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES)
df = pd.read_sql_query(
'SELECT * FROM tweets WHERE created_at > ?', conn, params=(dt,),
parse_dates=['created_at']
)
return TweetBin(df, dt, datetime.now())