def test_aquire_lease_redis_available_lose_secondary(self,
mock_settings,
mock_time,
mock_get_secondary_cache_source,
mock_get_connection):
mock_settings.ENDPOINTS = {}
mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
mock_time.time.return_value = 999.
mock_get_secondary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
mock_pipe.get.return_value = None
mock_pipe.execute.side_effect = redis.WatchError
ret = acquire_lease('a', 1, 1, primary=False)
self.assertFalse(ret)
mock_pipe.watch.assert_called_with('lease-a')
mock_pipe.get.assert_called_with('lease-a')
mock_pipe.multi.assert_called_with()
mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:1')
mock_pipe.execute.assert_called_with()
python类WatchError()的实例源码
def test_aquire_lease_redis_available_lose(self,
mock_settings,
mock_time,
mock_get_primary_cache_source,
mock_get_connection):
mock_settings.ENDPOINTS = {}
mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
mock_time.time.return_value = 999.
mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
mock_pipe.get.return_value = None
mock_pipe.execute.side_effect = redis.WatchError
ret = acquire_lease('a', 1, 1)
self.assertFalse(ret)
mock_pipe.watch.assert_called_with('lease-a')
mock_pipe.get.assert_called_with('lease-a')
mock_pipe.multi.assert_called_with()
mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:1')
mock_pipe.execute.assert_called_with()
def test_aquire_lease_redis_leased_expired_lose(self,
mock_settings,
mock_time,
mock_get_primary_cache_source,
mock_get_connection):
mock_settings.ENDPOINTS = {}
mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
mock_time.time.return_value = 999.
mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
mock_pipe.get.return_value = '99:99:0:99'
mock_pipe.execute.side_effect = redis.WatchError
ret = acquire_lease('a', 1, 1)
self.assertFalse(ret)
mock_pipe.watch.assert_called_with('lease-a')
mock_pipe.get.assert_called_with('lease-a')
mock_pipe.multi.assert_called_with()
mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:100')
mock_pipe.execute.assert_called_with()
def test_release_lease_redis_owned_self_loses(self,
mock_settings,
mock_get_primary_cache_source,
mock_get_connection):
mock_settings.ENDPOINTS = {}
mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
mock_pipe.get.return_value = '99:99:99:99'
mock_pipe.execute.side_effect = redis.WatchError
ret = release_lease('a', 99, 99, 99)
self.assertFalse(ret)
mock_pipe.watch.assert_called_with('lease-a')
mock_pipe.get.assert_called_with('lease-a')
mock_pipe.multi.assert_called_with()
mock_pipe.setex.assert_called_with('lease-a', 86400, '-1:-1:0:99')
mock_pipe.execute.assert_called_with()
def _batch_insert_prob_redis(conn, names, all_hashes, colour, count=0):
r = conn
with r.pipeline() as pipe:
try:
pipe.watch(names)
vals = get_vals(r, names, all_hashes)
pipe.multi()
for name, values, hs in zip(names, vals, all_hashes):
for val, h in zip(values, hs):
ba = BitArray()
if val is None:
val = b''
ba.frombytes(val)
ba.setbit(colour, 1)
pipe.hset(name, h, ba.tobytes())
pipe.execute()
except redis.WatchError:
logger.warning("Retrying %s %s " % (r, name))
if count < 5:
self._batch_insert(conn, hk, colour, count=count+1)
else:
logger.warning(
"Failed %s %s. Too many retries. Contining regardless." % (r, name))
def get_current_task(self):
with self.local_redis.pipeline() as pipe:
while True:
try:
pipe.watch(TASK_ID_KEY)
task_id = int(retry_get(pipe, TASK_ID_KEY))
if task_id == self.cached_task_id:
logger.debug('[worker] Returning cached task {}'.format(task_id))
break
pipe.multi()
pipe.get(TASK_DATA_KEY)
logger.info('[worker] Getting new task {}. Cached task was {}'.format(task_id, self.cached_task_id))
self.cached_task_id, self.cached_task_data = task_id, deserialize(pipe.execute()[0])
break
except redis.WatchError:
continue
return self.cached_task_id, self.cached_task_data
def incr(self, key, amount, maximum, ttl):
with self.client.pipeline() as pipe:
while True:
try:
pipe.watch(key)
value = int(pipe.get(key) or b"0")
value += amount
if value > maximum:
return False
pipe.multi()
pipe.set(key, value, px=ttl)
pipe.execute()
return True
except redis.WatchError:
continue
def decr(self, key, amount, minimum, ttl):
with self.client.pipeline() as pipe:
while True:
try:
pipe.watch(key)
value = int(pipe.get(key) or b"0")
value -= amount
if value < minimum:
return False
pipe.multi()
pipe.set(key, value, px=ttl)
pipe.execute()
return True
except redis.WatchError:
continue
def incr_and_sum(self, key, keys, amount, maximum, ttl):
with self.client.pipeline() as pipe:
while True:
try:
pipe.watch(key, *keys)
value = int(pipe.get(key) or b"0")
value += amount
if value > maximum:
return False
values = pipe.mget(keys)
total = amount + sum(int(n) for n in values if n)
if total > maximum:
return False
pipe.multi()
pipe.set(key, value, px=ttl)
pipe.execute()
return True
except redis.WatchError:
continue
def save_enqueued(self, pipe):
"""
Preparing job to enqueue. Works via pipeline.
Nothing done if WatchError happens while next `pipeline.execute()`.
"""
job = self.create_job(status=JobStatus.QUEUED)
self.set_job_params(pipeline=pipe)
job.origin = self.origin
job.enqueued_at = utcnow()
if job.timeout is None:
job.timeout = self.timeout
job.save(pipeline=pipe)
self.job = job
def save_deferred(self, depends_on, pipe):
"""
Preparing job to defer (add as dependent). Works via pipeline.
Nothing done if WatchError happens while next `pipeline.execute()`.
"""
job = self.create_job(depends_on=depends_on, status=JobStatus.DEFERRED)
self.set_job_params(pipeline=pipe)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
return job
def create(self, redis):
"""
Create job by create job context in redis, each job will create len(hosts) meta keys,
conflict dectection is done by redis key exists check of job meta keys.
If no conflict, job context (the `meta_keys`) will created via redis pipeline
to avoid operate confilct.
"""
pipeline = redis.pipeline()
try:
pipeline.watch(self.meta_keys)
for key in self.meta_keys:
if pipeline.exists(key):
raise JobConflictError("operate conflict, job already exists on some host(s)")
LOG.info("going to create job meta data <{0}>".format(';'.join(self.meta_keys)))
pipeline.multi()
for key in self.meta_keys:
pipeline.hmset(key, dict(startat=self._startat))
pipeline.execute()
LOG.info("job meta data create finished, <{0}>".format(';'.join(self.meta_keys)))
except WatchError:
LOG.info("conflict detected on job meta data create <{0}>".format(';'.join(self.meta_keys)))
raise JobConflictError("operate conflict, try again later")
finally:
pipeline.reset()
def get_delayed_writes(self, delayed_write_key):
"""
Method to get all delayed write-cuboid keys for a single delayed_write_key
Returns:
list(str): List of delayed write-cuboid keys
"""
write_cuboid_key_list = []
with self.status_client.pipeline() as pipe:
try:
# Get all items in the list and cleanup, in a transaction so other procs can't add anything
pipe.watch(delayed_write_key)
pipe.multi()
# Get all items in the list
pipe.lrange(delayed_write_key, 0, -1)
# Delete the delayed-write-key as it should be empty now
pipe.delete(delayed_write_key)
# Delete its associated resource-delayed-write key that stores the resource string
pipe.delete("RESOURCE-{}".format(delayed_write_key))
# Execute.
write_cuboid_key_list = pipe.execute()
# If you got here things worked OK. Clean up the result. First entry in list is the LRANGE result
write_cuboid_key_list = write_cuboid_key_list[0]
# Keys are encoded
write_cuboid_key_list = [x.decode() for x in write_cuboid_key_list]
except redis.WatchError as _:
# Watch error occurred. Just bail out and let the daemon pick this up later.
return []
except Exception as e:
raise SpdbError("An error occurred while attempting to retrieve delay-write keys: \n {}".format(e),
ErrorCodes.REDIS_ERROR)
return write_cuboid_key_list
def release_gpus_in_use(driver_id, local_scheduler_id, gpu_ids, redis_client):
"""Release the GPUs that a given worker was using.
Note that this does not affect the local scheduler's bookkeeping. It only
affects the GPU allocations which are recorded in the primary Redis shard,
which are redundant with the local scheduler bookkeeping.
Args:
driver_id: The ID of the driver that is releasing some GPUs.
local_scheduler_id: The ID of the local scheduler that owns the GPUs
being released.
gpu_ids: The IDs of the GPUs being released.
redis_client: A client for the primary Redis shard.
"""
# Attempt to release GPU IDs atomically.
with redis_client.pipeline() as pipe:
while True:
try:
# If this key is changed before the transaction below (the
# multi/exec block), then the transaction will not take place.
pipe.watch(local_scheduler_id)
# Figure out which GPUs are currently in use.
result = redis_client.hget(local_scheduler_id, "gpus_in_use")
gpus_in_use = dict() if result is None else json.loads(
result.decode("ascii"))
assert driver_id in gpus_in_use
assert gpus_in_use[driver_id] >= len(gpu_ids)
gpus_in_use[driver_id] -= len(gpu_ids)
pipe.multi()
pipe.hset(local_scheduler_id, "gpus_in_use",
json.dumps(gpus_in_use))
pipe.execute()
# If a WatchError is not raised, then the operations should
# have gone through atomically.
break
except redis.WatchError:
# Another client must have changed the watched key between the
# time we started WATCHing it and the pipeline's execution. We
# should just retry.
continue
def _release_lease_redis(cache_arn, correlation_id, steps, retries, fence_token):
"""
Releases a lease from redis.
"""
import redis
redis_conn = get_connection(cache_arn)
if not redis_conn:
return # pragma: no cover
with redis_conn.pipeline() as pipe:
try:
# get the current value of the lease (within a watch)
redis_key = LEASE_DATA.LEASE_KEY_PREFIX + correlation_id
pipe.watch(redis_key)
current_lease_value = pipe.get(redis_key)
pipe.multi()
# if there is already a lease holder, then we have a few options
if current_lease_value:
# split the current lease apart
current_steps, current_retries, current_time, current_fence_token = \
_deserialize_lease_value(current_lease_value)
# release it by:
# 1. setting the lease value to "unowned" (steps/retries = -1)
# 2. setting it as expired (expires = 0) with set
# 3. setting the fence token to the current value so it can be incremented later
if (current_steps, current_retries, current_fence_token) == (steps, retries, fence_token):
new_fence_token = fence_token
new_lease_value = _serialize_lease_value(-1, -1, 0, new_fence_token)
pipe.setex(redis_key, LEASE_DATA.LEASE_CLEANUP_TIMEOUT, new_lease_value)
# otherwise, something else owns the lease, so we can't release it
else:
return False
else:
# the lease is no longer owned by anyone
return False
# execute the transaction
pipe.execute()
# if we make it this far, we have released the lease
return True
except redis.WatchError:
return False
except redis.exceptions.ConnectionError:
logger.exception('')
return 0
def _bump_timestamp(self, collection_id, parent_id, record=None,
modified_field=None, last_modified=None):
key = '{0}.{1}.timestamp'.format(collection_id, parent_id)
while 1:
with self._client.pipeline() as pipe:
try:
pipe.watch(key)
previous = pipe.get(key)
pipe.multi()
# XXX factorize code from memory and redis backends.
is_specified = (record is not None and
modified_field in record or
last_modified is not None)
if is_specified:
# If there is a timestamp in the new record,
# try to use it.
if last_modified is not None:
current = last_modified
else:
current = record[modified_field]
else:
current = utils.msec_time()
if previous and int(previous) >= current:
collection_timestamp = int(previous) + 1
else:
collection_timestamp = current
# Return the newly generated timestamp as the current one
# only if nothing else was specified.
is_equal = previous and int(previous) == current
if not is_specified or is_equal:
current = collection_timestamp
pipe.set(key, collection_timestamp)
pipe.execute()
return current
except redis.WatchError: # pragma: no cover
# Our timestamp has been modified by someone else, let's
# retry.
# XXX: untested.
continue
def add_to_page_out(self, temp_page_out_key, lookup_key, resolution, morton, time_sample):
"""
Method to add a key to the page-out tracking set
Args:
lookup_key (str): Lookup key for a channel
resolution (int): level in the resolution heirarchy
morton (int): morton id for the cuboid
time_sample (int): time sample for cuboid
Returns:
(bool, bool): Tuple where first value is if the transaction succeeded and the second is if the key is in
page out already
"""
page_out_key = "PAGE-OUT&{}&{}".format(lookup_key, resolution)
in_page_out = True
cnt = 0
with self.status_client.pipeline() as pipe:
while 1:
try:
# Create temp set
pipe.watch(page_out_key)
pipe.multi()
pipe.sadd(temp_page_out_key, "{}&{}".format(time_sample, morton))
pipe.expire(temp_page_out_key, 15)
pipe.sdiff(temp_page_out_key, page_out_key)
pipe.sadd(page_out_key, "{}&{}".format(time_sample, morton))
result = pipe.execute()
if len(result[2]) > 0:
in_page_out = False
else:
in_page_out = True
break
except redis.WatchError as e:
# Watch error occurred, try again!
cnt += 1
if cnt > 200:
raise SpdbError("Failed to add to page out due to timeout. {}".format(e),
ErrorCodes.REDIS_ERROR)
continue
except Exception as e:
raise SpdbError("Failed to check page-out set. {}".format(e),
ErrorCodes.REDIS_ERROR)
return in_page_out