def check_redis():
if HOST_ROLE == 'MASTER':
SETINEL_HOST = MASTER_SETINEL_HOST
REDIS_MASTER = MASTER_REDIS_MASTER
else:
SETINEL_HOST = SLAVE_SETINEL_HOST
REDIS_MASTER = SLAVE_REDIS_MASTER
s = redis.StrictRedis(host=SETINEL_HOST, port=26379, socket_timeout=0.1)
try:
h = s.execute_command("SENTINEL get-master-addr-by-name mymaster")[0].decode("utf-8")
print(h)
if h == REDIS_MASTER:
print('Other host is redis master')
sys.exit()
else:
pass
except Exception as e:
print(e.args[0])
sys.exit()
python类StrictRedis()的实例源码
def check_redis():
if HOST_ROLE == 'MASTER':
SETINEL_HOST = MASTER_SETINEL_HOST
REDIS_MASTER = MASTER_REDIS_MASTER
else:
SETINEL_HOST = SLAVE_SETINEL_HOST
REDIS_MASTER = SLAVE_REDIS_MASTER
s = redis.StrictRedis(host=SETINEL_HOST, port=26379, socket_timeout=0.1)
try:
h = s.execute_command("SENTINEL get-master-addr-by-name mymaster")[0].decode("utf-8")
print(h)
if h == REDIS_MASTER:
print('Other host is redis master')
sys.exit()
else:
pass
except Exception as e:
print(e.args[0])
sys.exit()
#--------------
def get_redis_connection():
return redis.StrictRedis(
host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB, password=settings.REDIS_PASSWORD,
)
def store(request):
cl = redis.StrictRedis()
cl.flushdb()
return QueueStore(cl)
def app(request):
cl = redis.StrictRedis()
cl.flushdb()
return Application(Manager(QueueStore(cl), ResultStore(cl)))
def manager(request):
cl = redis.StrictRedis()
cl.flushdb()
return Manager(QueueStore(cl), ResultStore(cl))
def store(request):
cl = redis.StrictRedis()
cl.flushdb()
return ResultStore(cl)
def set_redis(key,data,secnd):
try:
r = redis.StrictRedis()
data = json.dumps(data)
dataset = (str(key),str(data))
a = r.set('%s' %key,'%s' %data)
#rdis.bgsave()
b = r.expire('%s' %key, secnd)
return a
except Exception:
return False
def get_redis(key):
try:
r = redis.StrictRedis()
data = r.get(key)
return json.loads(data)
except Exception:
return False
def __init__(self, settings):
self.r = redis.StrictRedis(
host=settings['redis']['host'],
port=settings['redis']['port'],
db=settings['redis']['db'])
def __init__(self, app=None):
self.app = app
self.master = StrictRedis()
self.slave = self.master
if app is not None: # pragma: no cover
self.init_app(app)
def setUp(self):
super(TestNews, self).setUp()
self.connection = StrictRedis()
self.connection.flushall()
test_contributions_guard.py 文件源码
项目:FRG-Crowdsourcing
作者: 97amarnathk
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def setUp(self):
self.connection = StrictRedis()
self.connection.flushall()
self.guard = ContributionsGuard(self.connection)
self.anon_user = {'user_id': None, 'user_ip': '127.0.0.1'}
self.auth_user = {'user_id': 33, 'user_ip': None}
self.task = Task(id=22)
def setUp(self):
super(TestMaintenance, self).setUp()
self.connection = StrictRedis()
self.connection.flushall()
def setUp(self):
super(TestWebHooks, self).setUp()
self.connection = StrictRedis()
self.connection.flushall()
self.project = ProjectFactory.create()
self.webhook_payload = dict(project_id=self.project.id,
project_short_name=self.project.short_name)
def init(cls):
cls.r = redis.StrictRedis(host='127.0.0.1',
port=Config.STATS_SERVER_PORT, db=0)
cls.r.set('rate', str(0.0).encode('utf-8'))
cls.r.set('npoints', str(0).encode('utf-8'))
cls.r.set('time_msec', str(0).encode('utf-8'))
def get_client(ip_port):
ip_port = normalize_ip_port(ip_port)
pid = os.getpid()
with _lock:
o = _pid_client[ip_port]
if pid not in o:
o[pid] = redis.StrictRedis(*ip_port)
return _pid_client[ip_port][pid]
def _get_connection(self):
conn = getattr(_thread_local, "_redisconn", None)
if conn:
return conn
conn = redis.StrictRedis(host=self.host, port=self.port, db=self.db)
setattr(_thread_local, "_redisconn", conn)
return conn
def get_redis_connection(self):
return redis.StrictRedis(connection_pool=self.connection_pool)
subscription_manager.py 文件源码
项目:graphql-python-subscriptions
作者: hballard
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def __init__(self, host='localhost', port=6379, *args, **kwargs):
redis.connection.socket = gevent.socket
self.redis = redis.StrictRedis(host, port, *args, **kwargs)
self.pubsub = self.redis.pubsub()
self.subscriptions = {}
self.sub_id_counter = 0
self.greenlet = None