def _index_subjects(self):
""" quereies the triplestore for all subject uri"""
lg = logging.getLogger("%s.%s" % (self.ln, inspect.stack()[0][3]))
lg.setLevel(self.log_level)
# if the subjects have been indexed and there are no new subjects exit
if self.data_status.get("indexed") and not self.new_subjects:
return
# get a list of all the loc_subject URIs
sparql = """
SELECT ?s
{
?s skos:inScheme <http://id.loc.gov/authorities/subjects> .
}"""
results = run_sparql_query(sparql=sparql)
# Start processing through
self.time_start = datetime.datetime.now()
batch_size = 12000
if len(results) > batch_size:
batch_end = batch_size
else:
batch_end = len(results) - 1
batch_start = 0
batch_num = 1
self.batch_data = {}
self.batch_data[batch_num] = []
end = False
last = False
while not end:
lg.debug("batch %s: %s-%s", batch_num, batch_start, batch_end)
for i, subj in enumerate(results[batch_start:batch_end]):
th = threading.Thread(name=batch_start + i + 1,
target=self._index_subject_item,
args=(iri(subj['s']['value']),
i+1,batch_num,))
th.start()
#self._index_subject_item(iri(subj['s']['value']),i+1)
print(datetime.datetime.now() - self.time_start)
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is main_thread:
continue
#print('joining %s', t.getName())
t.join()
action_list = \
self.es_worker.make_action_list(self.batch_data[batch_num])
self.es_worker.bulk_save(action_list)
del self.batch_data[batch_num]
batch_end += batch_size
batch_start += batch_size
if last:
end = True
if len(results) <= batch_end:
batch_end = len(results)
last = True
batch_num += 1
self.batch_data[batch_num] = []
print(datetime.datetime.now() - self.time_start)
评论列表
文章目录