def run(self):
if not self.enabled():
logging.info("Oplog tailer is disabled, skipping")
return
logging.info("Starting oplog tailers on all replica sets (options: compression=%s, status_secs=%i)" % (self.compression(), self.status_secs))
self.timer.start(self.timer_name)
for shard in self.replsets:
tail_stop = Event()
secondary = self.replsets[shard].find_secondary()
mongo_uri = secondary['uri']
shard_name = mongo_uri.replset
oplog_file = self.prepare_oplog_files(shard_name)
oplog_state = OplogState(self.manager, mongo_uri, oplog_file)
thread = TailThread(
self.backup_stop,
tail_stop,
mongo_uri,
self.config,
self.timer,
oplog_file,
oplog_state,
self.do_gzip()
)
self.shards[shard] = {
'stop': tail_stop,
'thread': thread,
'state': oplog_state
}
self.shards[shard]['thread'].start()
while not oplog_state.get('running'):
if self.shards[shard]['thread'].exitcode:
raise OperationError("Oplog tailer for %s failed with exit code %i!" % (mongo_uri, self.shards[shard]['thread'].exitcode))
sleep(0.5)
评论列表
文章目录