def test_write(self):
"""Test the tracking of an event."""
tracker = pawprint.Tracker(db=db, table=table, schema={"id": "INT"})
tracker.create_table()
# Check the table's empty
assert pd.io.sql.execute("SELECT COUNT(*) FROM {}".format(table), db).fetchall() == [(0,)]
# Add some data and check if the row count increases by one
tracker.write(id=1337)
assert pd.io.sql.execute("SELECT COUNT(*) FROM {}".format(table), db).fetchall() == [(1,)]
# Pull the data and ensure it's correct
data = pd.read_sql("SELECT * FROM {}".format(table), db)
assert isinstance(data, pd.DataFrame)
assert len(data.columns) == 1
assert data.columns[0] == "id"
assert data.id[0] == 1337
python类read_sql()的实例源码
def issues_with_close(self, repoid):
"""
How long on average each week it takes to close an issue
:param repoid: The id of the project in the projects table. Use repoid() to get this.
:return: DataFrame with issues/day
"""
issuesSQL = s.sql.text("""
SELECT issues.id as "id",
issues.created_at as "date",
DATEDIFF(closed.created_at, issues.created_at) AS "days_to_close"
FROM issues
JOIN
(SELECT * FROM issue_events
WHERE issue_events.action = "closed") closed
ON issues.id = closed.issue_id
WHERE issues.repo_id = :repoid""")
return pd.read_sql(issuesSQL, self.db, params={"repoid": str(repoid)})
def committer_locations(self, repoid):
"""
Return committers and their locations
@todo: Group by country code instead of users, needs the new schema
:param repoid: The id of the project in the projects table.
:return: DataFrame with users and locations sorted by commtis
"""
rawContributionsSQL = s.sql.text("""
SELECT users.login, users.location, COUNT(*) AS "commits"
FROM commits
JOIN project_commits
ON commits.id = project_commits.commit_id
JOIN users
ON users.id = commits.author_id
WHERE project_commits.project_id = :repoid
AND LENGTH(users.location) > 1
GROUP BY users.id
ORDER BY commits DESC
""")
return pd.read_sql(rawContributionsSQL, self.db, params={"repoid": str(repoid)})
def issue_response_time(self, repoid):
"""
How long it takes for issues to be responded to by people who have commits associate with the project
:param repoid: The id of the project in the projects table.
:return: DataFrame with the issues' id the date it was
opened, and the date it was first responded to
"""
issuesSQL = s.sql.text("""
SELECT issues.created_at AS "created_at",
MIN(issue_comments.created_at) AS "responded_at"
FROM issues
JOIN issue_comments
ON issue_comments.issue_id = issues.id
WHERE issue_comments.user_id IN
(SELECT users.id
FROM users
JOIN commits
WHERE commits.author_id = users.id
AND commits.project_id = :repoid)
AND issues.repo_id = :repoid
GROUP BY issues.id
""")
return pd.read_sql(issuesSQL, self.db, params={"repoid": str(repoid)})
def get_trip_stop_coordinates(self, trip_I):
"""
Get coordinates for a given trip_I
Parameters
----------
trip_I : int
the integer id of the trip
Returns
-------
stop_coords : pandas.DataFrame
with columns "lats" and "lons"
"""
query = """SELECT lat, lon
FROM stop_times
JOIN stops
USING(stop_I)
WHERE trip_I={trip_I}
ORDER BY stop_times.seq""".format(trip_I=trip_I)
stop_coords = pd.read_sql(query, self.conn)
return stop_coords
def remove_dangling_shapes(db_conn):
"""
Remove dangling entries from the shapes directory.
Parameters
----------
db_conn: sqlite3.Connection
connection to the GTFS object
"""
db_conn.execute(DELETE_SHAPES_NOT_REFERENCED_IN_TRIPS_SQL)
SELECT_MIN_MAX_SHAPE_BREAKS_BY_TRIP_I_SQL = \
"SELECT trips.trip_I, shape_id, min(shape_break) as min_shape_break, max(shape_break) as max_shape_break FROM trips, stop_times WHERE trips.trip_I=stop_times.trip_I GROUP BY trips.trip_I"
trip_min_max_shape_seqs= pandas.read_sql(SELECT_MIN_MAX_SHAPE_BREAKS_BY_TRIP_I_SQL, db_conn)
rows = []
for row in trip_min_max_shape_seqs.itertuples():
shape_id, min_shape_break, max_shape_break = row.shape_id, row.min_shape_break, row.max_shape_break
if min_shape_break is None or max_shape_break is None:
min_shape_break = float('-inf')
max_shape_break = float('-inf')
rows.append( (shape_id, min_shape_break, max_shape_break) )
DELETE_SQL_BASE = "DELETE FROM shapes WHERE shape_id=? AND (seq<? OR seq>?)"
db_conn.executemany(DELETE_SQL_BASE, rows)
remove_dangling_shapes_references(db_conn)
def read_sql(sql, con, filePath, index_col=None, coerce_float=True,
params=None, parse_dates=None, columns=None, chunksize=None):
"""
Read SQL query or database table into a DataFrameModel.
Provide a filePath argument in addition to the *args/**kwargs from
pandas.read_sql and get a DataFrameModel.
NOTE: The chunksize option is overridden to None always (for now).
Reference:
http://pandas.pydata.org/pandas-docs/version/0.18.1/generated/pandas.read_sql.html
pandas.read_sql(sql, con, index_col=None, coerce_float=True,
params=None, parse_dates=None, columns=None, chunksize=None)
:return: DataFrameModel
"""
# TODO: Decide if chunksize is worth keeping and how to handle?
df = pandas.read_sql(sql, con, index_col, coerce_float,
params, parse_dates, columns, chunksize=None)
return DataFrameModel(df, filePath=filePath)
def _limit_and_df(self, query, limit, as_df=False):
"""adds a limit (limit==None := no limit) to any query and allow a return as pandas.DataFrame
:param bool as_df: if is set to True results return as pandas.DataFrame
:param `sqlalchemy.orm.query.Query` query: SQL Alchemy query
:param int limit: maximum number of results
:return: query result of pyctd.manager.models.XY objects
"""
if limit:
query = query.limit(limit)
if as_df:
results = read_sql(query.statement, self.engine)
else:
results = query.all()
return results
def sqlite2observations(filename='observations.db'):
"""
Restore a databse of observations.
"""
con = db.connect(filename)
df = pd.read_sql('select * from observations;', con)
blank = empty_observation()
result = df.as_matrix()
final_result = np.empty(result.shape[0], dtype=blank.dtype)
# XXX-ugh, there has to be a better way.
for i, key in enumerate(blank.dtype.names):
final_result[key] = result[:, i+1]
to_convert = ['RA', 'dec', 'alt', 'az', 'rotSkyPos', 'moonAlt', 'sunAlt']
for key in to_convert:
final_result[key] = np.radians(final_result[key])
return final_result
def fill(self):
self.init_db(self.engine)
df = pd.read_sql("select * from fundamental", self.engine).sort_values(['report_date', 'quarter'])
df['trade_date'] = df['report_date'] = pd.to_datetime(df['report_date'])
with click.progressbar(df.groupby('code'),
label='writing data',
item_show_func=lambda x: x[0] if x else None) as bar:
bar.is_hidden = False
for stock, group in bar:
group = group.drop_duplicates(subset='trade_date', keep="last").set_index('trade_date')
sessions = pd.date_range(group.index[0], group.index[-1])
d = group.reindex(sessions, copy=False).fillna(method='pad')
d.to_sql('full', self.engine, if_exists='append', index_label='trade_date')
def ReadSqlData(self,name, db):
conn = create_engine(
'mysql://' + config.user + ':' + config.password + '@' + config.ip + '/' + db + '?charset=utf8')
x = 'select * from ' + name + ';' # sql???
return pandas.read_sql(x, con=conn)
def load_world(self, world_type):
"""For open world validation, we must keep track of which onion service
a trace came from. However for closed world validation, we can select
traces without consideration of which site they belong to.
:returns: a pandas DataFrame df containing the dataset
"""
select_hs_urls = ', t3.hs_url' if world_type is 'open' else ''
labeled_query = ('select t1.*, t3.is_sd {} '
'from features.frontpage_features t1 '
'inner join raw.frontpage_examples t2 '
'on t1.exampleid = t2.exampleid '
'inner join raw.hs_history t3 '
'on t3.hsid = t2.hsid').format(select_hs_urls)
df = pd.read_sql(labeled_query, self.engine)
return df
def get_exampleids(self):
"""Get list of exampleids"""
query = "SELECT DISTINCT exampleid FROM raw.frontpage_traces"
df = pd.read_sql(query, self.engine)
return df.exampleid.values
def get_ordered_trace_cells(self, exampleid):
"""Get trace for a given exampleid"""
df = pd.read_sql("""SELECT ingoing, t_trace FROM raw.frontpage_traces
WHERE exampleid={}
ORDER BY t_trace""".format(exampleid),
self.engine)
return df
def update_live_data(download=False):
def live_summary(live_setting_id):
group_dict = {1:"?'s", 2:'Aqours'}
attr_dict = {1:'Smile', 2:'Pure', 3:'Cool'}
diff_dict = {1:'Easy', 2:'Normal', 3:'Hard', 4:'Expert', 6:'Master'}
setting = df_live_setting.loc[live_setting_id]
track_info = df_live_track.loc[setting['live_track_id']]
live_info = {
'cover': cover_path(setting['live_icon_asset']),
'name': track_info['name'],
'group': group_dict[track_info['member_category']],
'attr': attr_dict[setting['attribute_icon_id']],
'note_number': int(setting['s_rank_combo']),
'diff_level': diff_dict[setting['difficulty']],
'diff_star': int(setting['stage_level']),
'file_dir': live_path(setting['notes_setting_asset'])
}
return live_info
print('Downloading latest live.db_')
opener = urllib.request.URLopener()
opener.addheader('User-Agent', 'whatever')
opener.retrieve(live_db_download_url, live_db_dir)
print('Generating basic live stats')
conn = sqlite3.connect(live_db_dir)
df_live_track = pd.read_sql('SELECT * FROM live_track_m', con=conn, index_col='live_track_id')
df_live_setting = pd.read_sql('SELECT * FROM live_setting_m', con=conn, index_col='live_setting_id')
# live_data = [live_summary(live_setting_id) for live_setting_id, row in df_live_setting.iterrows() if row['difficulty']!=5]
live_data = [live_summary(live_setting_id) for live_setting_id, row in df_live_setting.iterrows() if row['difficulty']!=5 and live_setting_id != 10779]
with open(live_archive_dir, 'w') as fp:
json.dump(live_data, fp)
print('Basic live data has been saved in', live_archive_dir)
def demand_daily_data(db, rows=[], feature='', function='lag', unique=['ToiletID','Collection_Date'], conditions=None):
"""
A function to generate by day variables for a feature
Args:
DICT DB Connection object (see grab_collections_data)
LIST ROWS List of rows
STR FEATURE A feature name to create daily records for
STR FUNCTION Apply either the LAG or LEAVE function (in the future, maybe some other functions)
LIST UNIQUE List of unique identifiers
STR CONDITIONS Apply the conditions string (see grab_collections_data)
Returns:
DF DAILY_DATA Pandas data frame of daily variables
"""
# Reprocess the unique list to account for capitalization
unique = ','.join(['"%s"' %(uu) for uu in unique])
# Construct the sql statement using window functions (e.g., OVER and LAG/LEAVE)
statement = 'SELECT %s' %(unique)
for rr in rows:
statement += ', %s("%s", %i, NULL) OVER(order by %s) as "%s_%s%i" ' %(function,
feature,
rr,
unique,
feature,
function,
rr)
# Complete the statement
statement += "FROM %s.%s %s ORDER BY %s" %(db['database'],
db['table'],
conditions,
unique)
# Execute the statement
daily_data = pd.read_sql(statement,
con=db['connection'],
coerce_float=True,
params=None)
# Return the lagged/leave data
return(daily_data)
def grab_from_features_and_labels(db, fold, config):
"""
A function that subsets the features df and labels df stored in the Postgres, into train and test features and labels, based on the fold info (train start, train end, test start, test end )
Args
DICT FOLD start and end date for both train and test set, in the fomat{"train":(start, end),"test":(start, end)}
Returns
df features train
df labels train
df features test
df labels test
"""
RESPONSE_RENAMER = {'response_f':'response', 'response_u':'response'}
dataset = pd.read_sql('select * from modeling.dataset where (("Collection_Date" >= '+"'"+fold['train_start'].strftime('%Y-%m-%d')+"'"+') and ("Collection_Date" <= '+"'"+fold['test_end'].strftime('%Y-%m-%d')+"'"+'))', db['connection'], coerce_float=True, params=None)
toilet_routes = pd.read_sql('select * from modeling.toilet_route', db['connection'], coerce_float=True, params=None)
#TODO: Fix this...
dataset = dataset.fillna(0) #A hack to make it run for now...
#Drop the toilets that do not have contiguous data.
# Note that missing collections are filled with NaN'd rows, so if a toilet is not contiguous, it must mean that it appeared or disappeared during the fold period -> ignore it.
toilet_groups = dataset.groupby(config['cols']['toiletname'])
toilets = dataset[config['cols']['toiletname']].unique()
number_of_days = max(toilet_groups.size())
contiguous_toilets = [t for t in toilets if (toilet_groups.size()[t] == number_of_days)]
dataset = dataset.loc[dataset[config['cols']['toiletname']].isin(contiguous_toilets)]
#Sort for the purposes of later functions...
dataset = dataset.sort_values(by=['Collection_Date','ToiletID'])
features_train = dataset.loc[((dataset['Collection_Date']>=fold["train_start"]) & (dataset['Collection_Date']<=fold["train_end"]))].drop(['response_f','response_u',config['Xy']['response_f']['variable'], config['Xy']['response_u']['variable']],axis=1)
features_test = dataset.loc[((dataset['Collection_Date']>=fold["test_start"]) & (dataset['Collection_Date']<=fold["test_end"]))].drop(['response_f','response_u',config['Xy']['response_f']['variable'], config['Xy']['response_u']['variable']],axis=1)
labels_train_u = dataset.loc[((dataset['Collection_Date']>=fold["train_start"]) & (dataset['Collection_Date']<=fold["train_end"])),['response_u','Collection_Date','ToiletID']].rename(columns=RESPONSE_RENAMER)
labels_train_f = dataset.loc[((dataset['Collection_Date']>=fold["train_start"]) & (dataset['Collection_Date']<=fold["train_end"])),['response_f','Collection_Date','ToiletID']].rename(columns=RESPONSE_RENAMER)
labels_test_f = dataset.loc[((dataset['Collection_Date']>=fold["test_start"]) & (dataset['Collection_Date']<=fold["test_end"])),['response_f','Collection_Date','ToiletID']].rename(columns=RESPONSE_RENAMER)
labels_test_u = dataset.loc[((dataset['Collection_Date']>=fold["test_start"]) & (dataset['Collection_Date']<=fold["test_end"])),['response_u','Collection_Date','ToiletID']].rename(columns=RESPONSE_RENAMER)
return(features_train, labels_train_f, labels_train_u, features_test, labels_test_f, labels_test_u, toilet_routes)
def pickle_vis_data_pandas():
db = MySQLDatabase(DATABASE_HOST, DATABASE_USER, DATABASE_PASSWORD, DATABASE_NAME)
conn = db._create_connection()
df = pd.read_sql('select source_article_id, target_article_id, target_y_coord_1920_1080, target_x_coord_1920_1080, visual_region from link_features', conn)
print len(df)
no_dup = df.sort(['source_article_id','target_y_coord_1920_1080','target_x_coord_1920_1080']).groupby(["source_article_id", "target_article_id"]).first()
print len(no_dup)
feature = no_dup.loc[no_dup['visual_region']=='lead']
print len(feature)
feature.reset_index(inplace=True)
feature = no_dup.loc[no_dup['visual_region']=='infobox']
print len(feature)
feature.reset_index(inplace=True)
feature[['source_article_id','target_article_id']].to_csv('/home/ddimitrov/tmp/infobox.tsv', sep='\t', index=False)
feature = no_dup.loc[no_dup['visual_region']=='navbox']
print len(feature)
feature.reset_index(inplace=True)
feature[['source_article_id','target_article_id']].to_csv('/home/ddimitrov/tmp/navbox.tsv', sep='\t', index=False)
feature = no_dup.loc[no_dup['visual_region']=='left-body']
print len(feature)
feature.reset_index(inplace=True)
feature[['source_article_id','target_article_id']].to_csv('/home/ddimitrov/tmp/left-body.tsv', sep='\t',index=False)
feature = no_dup.loc[no_dup['visual_region']=='body']
print len(feature)
feature.reset_index(inplace=True)
feature[['source_article_id','target_article_id']].to_csv('/home/ddimitrov/tmp/body.tsv', sep='\t',index=False)
def get_redirecsfromXML(self, dump_date):
db = MySQLDatabase(DATABASE_HOST, DATABASE_USER, DATABASE_PASSWORD, DATABASE_NAME)
conn = db._create_connection()
df = pd.read_sql(('select * from redirects'),conn)
return df.set_index('source_article_name')['target_article_name'].to_dict()
def pickle_correlations_zeros():
db = MySQLDatabase(DATABASE_HOST, DATABASE_USER, DATABASE_PASSWORD, DATABASE_NAME)
conn = db._create_connection()
print 'read'
df = pd.read_sql('select source_article_id, target_article_id, IFNULL(counts, 0) as counts from link_features group by source_article_id, target_article_id', conn)
print 'group'
article_counts = df.groupby(by=["target_article_id"])['counts'].sum().reset_index()
print 'write to file'
article_counts[["target_article_id","counts"]].to_csv(TMP+'article_counts.tsv', sep='\t', index=False)