def _do_read(self, query, raw=False):
# send query to server, return JSON
rethinker = doublethink.Rethinker(db="trough_configuration", servers=self.rethinkdb)
healthy_databases = list(rethinker.table('services').get_all(self.database, index='segment').run())
healthy_databases = [db for db in healthy_databases if db['role'] == 'trough-read' and (rethinker.now().run() - db['last_heartbeat']).seconds < db['ttl']]
try:
assert len(healthy_databases) > 0
except:
raise Exception('No healthy node found for segment %s' % self.database)
url = urlparse(healthy_databases[0].get('url'))
if self.proxy:
conn = HTTPConnection(self.proxy, self.proxy_port)
conn.set_tunnel(url.netloc, url.port)
conn.sock = socks.socksocket()
conn.sock.set_proxy(self.proxy_type, self.proxy, self.proxy_port)
conn.sock.connect((url.netloc.split(":")[0], url.port))
else:
conn = HTTPConnection(url.netloc)
request_path = "%s?%s" % (url.path, url.query)
conn.request("POST", request_path, query)
response = conn.getresponse()
results = json.loads(response.read())
self._last_results = results
评论列表
文章目录