def get_changes(self):
# this method is called from all replica servers
# and either returns changelog entry content for {serial} or,
# if it points to the "next" serial, will block and wait
# until that serial is committed. However, after
# MAX_REPLICA_BLOCK_TIME, we return 202 Accepted to indicate
# the replica should try again. The latter has two benefits:
# - nginx' timeout would otherwise return 504 (Gateway Timeout)
# - if the replica is not waiting anymore we would otherwise
# never time out here, leading to more and more threads
# if no commits happen.
if not self.xom.is_master():
raise HTTPForbidden("Replication protocol disabled")
expected_uuid = self.request.headers.get(H_EXPECTED_MASTER_ID, None)
master_uuid = self.xom.config.get_master_uuid()
# we require the header but it is allowed to be empty
# (during initialization)
if expected_uuid is None:
msg = "replica sent no %s header" % H_EXPECTED_MASTER_ID
threadlog.error(msg)
raise HTTPBadRequest(msg)
if expected_uuid and expected_uuid != master_uuid:
threadlog.error("expected %r as master_uuid, replica sent %r", master_uuid,
expected_uuid)
raise HTTPBadRequest("expected %s as master_uuid, replica sent %s" %
(master_uuid, expected_uuid))
serial = self.request.matchdict["serial"]
with self.update_replica_status(serial):
keyfs = self.xom.keyfs
if serial.lower() == "nop":
raw_entry = b""
else:
try:
serial = int(serial)
except ValueError:
raise HTTPNotFound("serial needs to be int")
raw_entry = self._wait_for_entry(serial)
devpi_serial = keyfs.get_current_serial()
r = Response(body=raw_entry, status=200, headers={
str("Content-Type"): str("application/octet-stream"),
str("X-DEVPI-SERIAL"): str(devpi_serial),
})
return r
评论列表
文章目录