def gather_stage(harvester, job):
'''Calls the harvester's gather_stage, returning harvest object ids, with
some error handling.
This is split off from gather_callback so that tests can call it without
dealing with queue stuff.
'''
job.gather_started = datetime.datetime.utcnow()
try:
harvest_object_ids = harvester.gather_stage(job)
except (Exception, KeyboardInterrupt):
harvest_objects = model.Session.query(HarvestObject).filter_by(
harvest_job_id=job.id
)
for harvest_object in harvest_objects:
model.Session.delete(harvest_object)
model.Session.commit()
raise
finally:
job.gather_finished = datetime.datetime.utcnow()
job.save()
return harvest_object_ids
python类call()的实例源码
def filter(self, values: set):
if not isinstance(values, set):
raise ValueError
if values:
key = "{}:tasks:processed".format(self.name)
lua = """
local results = {}
for _, e in pairs(ARGV) do
local x = redis.call('sismember', KEYS[1], e)
if x == 1 then
table.insert(results, e)
end
end
return results
"""
script = self.db.register_script(lua)
results = script(keys=[key], args={self.codec.dumps(v) for v in values})
return values - {self.codec.loads(e) for e in results}
def queue_purge(self, queue=None):
'''
Purge the consumer's queue.
The ``queue`` parameter exists only for compatibility and is
ignored.
'''
# Use a script to make the operation atomic
lua_code = b'''
local routing_key = KEYS[1]
local message_key = ARGV[1]
local count = 0
while true do
local s = redis.call("lpop", routing_key)
if s == false then
break
end
local value = cjson.decode(s)
local id = value[message_key]
local persistance_key = routing_key .. ":" .. id
redis.call("del", persistance_key)
count = count + 1
end
return count
'''
script = self.redis.register_script(lua_code)
return script(keys=[self.routing_key], args=[self.message_key])
def __call__(self, record_id, unique_key=None):
# Support DAL shortcut query: table(record_id)
q = self.id # This will call the __getattr__ below
# returning a MockQuery
# Instructs MockQuery, to behave as db(table.id == record_id)
q.op = 'eq'
q.value = record_id
q.unique_key = unique_key
row = q.select()
return row[0] if row else Storage()
def __init__(
self,
expiry=60,
hosts=None,
prefix="asgi:",
group_expiry=86400,
capacity=100,
channel_capacity=None,
symmetric_encryption_keys=None,
stats_prefix="asgi-meta:",
connection_kwargs=None,
):
super(RedisChannelLayer, self).__init__(
expiry=expiry,
hosts=hosts,
prefix=prefix,
group_expiry=group_expiry,
capacity=capacity,
channel_capacity=channel_capacity,
symmetric_encryption_keys=symmetric_encryption_keys,
stats_prefix=stats_prefix,
connection_kwargs=connection_kwargs,
)
self.hosts = self._setup_hosts(hosts)
# Precalculate some values for ring selection
self.ring_size = len(self.hosts)
# Create connections ahead of time (they won't call out just yet, but
# we want to connection-pool them later)
self._connection_list = self._generate_connections(
self.hosts,
redis_kwargs=connection_kwargs or {},
)
self._receive_index_generator = itertools.cycle(range(len(self.hosts)))
self._send_index_generator = itertools.cycle(range(len(self.hosts)))
self._register_scripts()
### Setup ###
def clear_date(date):
conn = getconn(date)
pattern = '*_{}*'.format(date)
def flush_all(r):
print 'clearing {} on {}'.format(r, date)
r.flushdb()
def clear_conn_lua(r):
script = '''
for _, k in ipairs(redis.call('keys', ARGV[1])) do
redis.call('del', k)
end
'''
r.eval(script, 0, pattern)
def clear_conn(r):
print 'clearing {} on {}'.format(r, date)
p = r.pipeline()
cursor = 0
while True:
cursor, keys = r.scan(cursor, match=pattern, count=100000)
print 'curosr {}, keys {}'.format(cursor, len(keys))
if len(keys):
p.delete(*keys)
if int(cursor) == 0:
break
p.execute()
tasks = []
for r in conn.conns:
t = threading.Thread(target=flush_all, args=(r,))
t.start()
tasks.append(t)
for t in tasks:
t.join()
def deleteBanchoSessions(self):
"""
Remove all `peppy:sessions:*` redis keys.
Call at bancho startup to delete old cached sessions
:return:
"""
try:
# TODO: Make function or some redis meme
glob.redis.eval("return redis.call('del', unpack(redis.call('keys', ARGV[1])))", 0, "peppy:sessions:*")
except redis.RedisError:
pass
def __call__(self, record_id, unique_key=None):
# Support DAL shortcut query: table(record_id)
q = self.id # This will call the __getattr__ below
# returning a MockQuery
# Instructs MockQuery, to behave as db(table.id == record_id)
q.op = 'eq'
q.value = record_id
q.unique_key = unique_key
row = q.select()
return row[0] if row else Storage()
ch11_listing_source.py 文件源码
项目:https---github.com-josiahcarlson-redis-in-action
作者: fuqi365
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def script_load(script):
sha = [None] #A
def call(conn, keys=[], args=[], force_eval=False): #B
if not force_eval:
if not sha[0]: #C
sha[0] = conn.execute_command( #D
"SCRIPT", "LOAD", script, parse="LOAD") #D
try:
return conn.execute_command( #E
"EVALSHA", sha[0], len(keys), *(keys+args)) #E
except redis.exceptions.ResponseError as msg:
if not msg.args[0].startswith("NOSCRIPT"): #F
raise #F
return conn.execute_command( #G
"EVAL", script, len(keys), *(keys+args)) #G
return call #H
# <end id="script-load"/>
#A Store the cached SHA1 hash of the result of SCRIPT LOAD in a list so we can change it later from within the call() function
#B When calling the "loaded script", you must provide the connection, the set of keys that the script will manipulate, and any other arguments to the function
#C We will only try loading the script if we don't already have a cached SHA1 hash
#D Load the script if we don't already have the SHA1 hash cached
#E Execute the command from the cached SHA1
#F If the error was unrelated to a missing script, re-raise the exception
#G If we received a script-related error, or if we need to force-execute the script, directly execute the script, which will automatically cache the script on the server (with the same SHA1 that we've already cached) when done
#H Return the function that automatically loads and executes scripts when called
#END
def fetch_and_import_stages(harvester, obj):
obj.fetch_started = datetime.datetime.utcnow()
obj.state = "FETCH"
obj.save()
success_fetch = harvester.fetch_stage(obj)
obj.fetch_finished = datetime.datetime.utcnow()
obj.save()
if success_fetch is True:
# If no errors where found, call the import method
obj.import_started = datetime.datetime.utcnow()
obj.state = "IMPORT"
obj.save()
success_import = harvester.import_stage(obj)
obj.import_finished = datetime.datetime.utcnow()
if success_import:
obj.state = "COMPLETE"
if success_import is 'unchanged':
obj.report_status = 'not modified'
obj.save()
return
else:
obj.state = "ERROR"
obj.save()
elif success_fetch == 'unchanged':
obj.state = 'COMPLETE'
obj.report_status = 'not modified'
obj.save()
return
else:
obj.state = "ERROR"
obj.save()
if obj.state == 'ERROR':
obj.report_status = 'errored'
elif obj.current == False:
obj.report_status = 'deleted'
elif len(model.Session.query(HarvestObject)
.filter_by(package_id = obj.package_id)
.limit(2)
.all()) == 2:
obj.report_status = 'updated'
else:
obj.report_status = 'added'
obj.save()
def transfer_to_redis(request):
"""
????????Redis?
"""
success, msg = False, ''
try:
config_data = get_config_redis_json()
logger.debug(config_data)
r = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
db=settings.REDIS_DB, password=settings.REDIS_PASSWORD)
# ??transaction=True
pipe = r.pipeline(transaction=True)
# ?????????
pattern_delete_lua = """
local keys = redis.call('keys', ARGV[1])
for i = 1, table.getn(keys) do
redis.call('del', keys[i])
end
"""
pattern_delete = r.register_script(pattern_delete_lua)
pattern_delete(keys=[''], args=['%s:*' % settings.CLIENT_CONFIG_REDIS_PREFIX], client=pipe)
for t in config_data:
logger.debug(t)
#
# client = {}
# for k, v in t.iteritems():
# if k != 'endpoints':
# client[k] = v
pipe.set('%s:%s' % (settings.CLIENT_CONFIG_REDIS_PREFIX, t['app_id']), json_dumps(t))
# for s in t['endpoints']:
# pipe.set('%s:%s:%s:%s' % (settings.PROXY_CONFIG_REDIS_PREFIX, t['access_key'], s['name'], s['version']),
# json_dumps(s))
# pipe.delete('config:*')
# the EXECUTE call sends all buffered commands to the server, returning
# a list of responses, one for each command.
pipe.execute()
success = True
except Exception as e:
msg = '??????? Redis ????'
logger.error(e.message)
logger.error(traceback.format_exc())
return http_response_json({'success': success, 'msg': msg})