def insert_result(self, document):
"""Insert a new *document* in the result table. The columns must not
be defined nor all present. Any new column will be added to the
database and any missing column will get value None.
"""
gc.collect()
db = dataset.connect(self.url)
return db[self.result_table_name].insert(document)
python类connect()的实例源码
def update_result(self, filter, values):
"""Update or add *values* of given rows in the result table.
Args:
filter: An identifier of the rows to update.
values: A mapping of values to update or add.
"""
gc.collect()
filter = filter.copy()
keys = list(filter.keys())
filter.update(values)
db = dataset.connect(self.url)
return db[self.result_table_name].update(filter, keys)
def all_complementary(self):
"""Get all entries of the complementary information table as a list.
The order is undefined.
"""
gc.collect()
db = dataset.connect(self.url)
return list(db[self.complementary_table_name].all())
def insert_complementary(self, document):
"""Insert a new document (row) in the complementary information table.
"""
gc.collect()
db = dataset.connect(self.url)
return db[self.complementary_table_name].insert(document)
def find_complementary(self, filter):
"""Find a document (row) from the complementary information table.
"""
gc.collect()
db = dataset.connect(self.url)
return db[self.complementary_table_name].find_one(**filter)
def get_space(self):
"""Returns the space used for previous experiments.
Raises:
AssertionError: If there are more than one space in the database.
"""
gc.collect()
db = dataset.connect(self.url)
entry_count = db[self.space_table_name].count()
if entry_count == 0:
return None
assert entry_count == 1, "Space table unexpectedly contains more than one space."
return pickle.loads(db[self.space_table_name].find_one()["space"])
def insert_space(self, space):
"""Insert a space in the database.
Raises:
AssertionError: If a space is already present in the database.
"""
gc.collect()
db = dataset.connect(self.url)
assert db[self.space_table_name].count() == 0, ("Space table cannot contain more than one space, "
"clear table first.")
return db[self.space_table_name].insert({"space": pickle.dumps(space)})
def run(consumer_key, consumer_secret, access_key, access_secret,
connection_string):
db = dataset.connect(connection_string)
api = get_api(consumer_key, consumer_secret, access_key, access_secret)
user_table = db['user']
users = user_table.find(user_table.table.columns.user_id != 0,
profile_collected=0)
users = [u for u in users]
if len(users) == 0:
print('No users without profiles')
return None
ids_to_lookup = []
for user in users:
ids_to_lookup.append(user['user_id'])
if len(ids_to_lookup) == 100:
print('Getting profiles')
profiles = get_profiles(api, user_ids=ids_to_lookup)
print('Updating 100 profiles')
upsert_profiles(db, profiles)
ids_to_lookup = []
print('Sleeping, timestamp: ' + str(datetime.now()))
time.sleep(5)
print('Getting profiles')
profiles = get_profiles(api, user_ids=ids_to_lookup)
print('Updating ' + str(len(ids_to_lookup)) + ' profiles')
upsert_profiles(db, profiles)
print('Finished getting profiles')
def run(consumer_key, consumer_secret, access_key, access_secret,
connection_string, threshold=5000, seed_only=True):
db = dataset.connect(connection_string)
api = get_api(consumer_key, consumer_secret, access_key, access_secret)
if seed_only:
is_seed = 1
else:
is_seed = 0
user_table = db['user']
users = user_table.find(user_table.table.columns.friends_count < threshold,
friends_collected=0, is_seed=is_seed)
users = [u for u in users]
all_users = len(users)
remaining = all_users
for u in users:
try:
print('Getting friend ids for ' + u['screen_name'])
next, prev, friend_ids = get_friend_ids(
api, screen_name=u['screen_name'])
print('Adding ' + str(len(friend_ids)) + ' user ids to db')
insert_if_missing(db, user_ids=friend_ids)
print('Creating relationships for ' + str(u['user_id']))
create_connections(db, u['user_id'], friend_ids=friend_ids)
update_dict = dict(id=u['id'], friends_collected=1)
user_table.update(update_dict, ['id'])
# Can only make 15 calls in a 15 minute window to this endpoint
remaining -= 1
time_left = remaining / 60.0
print(str(time_left) + ' hours to go')
print('Sleeping for 1 minute, timestamp: ' + str(datetime.now()))
time.sleep(60)
except:
continue
def _initDb(self):
db = dataset.connect(config.log_db)
l.info('{} db backend connected.'.format(db.engine.name))
return db
def write_hyperparams(log_dir, params, mode='FILE'):
if mode == 'FILE' or mode == 'BOTH':
os.makedirs(log_dir)
hyperparam_file = os.path.join(log_dir, 'hyperparams.json')
with open(hyperparam_file, 'w') as f:
f.write(json.dumps(params))
if mode == 'DATABASE' or mode == 'BOTH':
db = dt.connect(constants.DATABASE_CONNECTION_STRING)
runs_table = db['runs']
runs_table.insert(params)
if mode not in ('FILE', 'DATABASE', 'BOTH'):
raise ValueError('{} mode not recognized. Try with FILE, DATABASE or BOTH'.format(mode))
def update_in_db(datadict):
db = dt.connect(constants.DATABASE_CONNECTION_STRING)
runs_table = db['runs']
runs_table.update(datadict, keys=['hash'])
def cli(argv):
# Prepare conf dict
conf = helpers.get_variables(config, str.isupper)
# Prepare conn dict
conn = {
'warehouse': dataset.connect(config.WAREHOUSE_URL),
}
# Get and call collector
collect = importlib.import_module('collectors.%s' % argv[1]).collect
collect(conf, conn, *argv[2:])
def open_spider(self, spider):
if spider.conf and spider.conn:
self.__conf = spider.conf
self.__conn = spider.conn
else:
# For runs trigered by scrapy CLI utility
self.__conf = helpers.get_variables(config, str.isupper)
self.__conn = {'warehouse': dataset.connect(config.WAREHOUSE_URL)}
def conn():
warehouse = dataset.connect(config.WAREHOUSE_URL)
for table in warehouse.tables:
warehouse[table].delete()
return {'warehouse': warehouse}
def clear_db():
""" Delete all users and promos """
db = dataset.connect(database_url)
db['subscriptions'].drop()
db['promo'].drop()
def get_all_users():
""" get all users """
db = dataset.connect(database_url)
output = []
for user in db['subscriptions']:
output.append(user['id_user'])
return output