def _do_write(self, query):
# send provision query to server if not self._write_url.
# after send provision query, set self._write_url.
# send query to server, return JSON
rethinker = doublethink.Rethinker(db="trough_configuration", servers=self.rethinkdb)
services = doublethink.ServiceRegistry(rethinker)
master_node = services.unique_service('trough-sync-master')
logging.info('master_node=%r', master_node)
if not master_node:
raise Exception('no healthy trough-sync-master in service registry')
if not self._write_url:
buffer = BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, master_node.get('url'))
c.setopt(c.POSTFIELDS, self.database)
if self.proxy:
c.setopt(pycurl.PROXY, self.proxy)
c.setopt(pycurl.PROXYPORT, int(self.proxy_port))
c.setopt(pycurl.PROXYTYPE, self.proxy_type)
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()
self._write_url = buffer.getvalue()
logging.info('self._write_url=%r', self._write_url)
buffer = BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, self._write_url)
c.setopt(c.POSTFIELDS, query)
if self.proxy:
c.setopt(pycurl.PROXY, self.proxy)
c.setopt(pycurl.PROXYPORT, int(self.proxy_port))
c.setopt(pycurl.PROXYTYPE, self.proxy_type)
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()
response = buffer.getvalue()
if response.strip() != b'OK':
raise Exception('Trough Query Failed: Database: %r Response: %r Query: %.200r' % (self.database, response, query))
self._last_results = None
评论列表
文章目录