def _queue_submitting(self):
log.info("MysqlFastInsert thread:{} start".format(threading.current_thread()))
conn, cur = self._get_connection()
while True:
try:
lines = self.queue.get()
except:
log.error("mysql-inserter unable to get queue", exc_info=True)
time.sleep(6)
continue
# log.debug("line:", len(lines), lines[:3])
start_time = time.time()
try:
row_count = self.insert_function(cur, lines)
# row_count = len(lines)
except MySQLdb.ProgrammingError:
# ??: ????, ?????
log.error(
"mysql???? MySQLdb.ProgrammingError! process:{} cursor:{}".format(
self._multiprocessing.current_process(),
cur),
exc_info=True)
# ?????
except:
log.error(
"mysql insert error! process:{} cursor:{}".format(
self._multiprocessing.current_process(),
cur),
exc_info=True)
conn, cur = self.re_connect(conn)
else:
try:
conn.commit()
except:
log.error("commit error!", exc_info=True)
conn, cur = self.re_connect(conn)
else:
log.debug("mysql successfully inserted: {} rows in {}ms".format(
row_count, round((time.time() - start_time) * 1000, 2)))
self.count.value += row_count # ???????
finally:
self.queue.task_done()
评论列表
文章目录