python类from_url()的实例源码

test_db.py 文件源码 项目:coscup-line-bot 作者: ncuoolab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_update_command(self):
        r = redis.from_url(REDIS_URL)
        get_dao().add_commands(gen_test_commands(10, 'zh_TW'))
        assert 10 == len(r.keys('COMMAND::zh_TW::*'))
        get_dao().update_commands(gen_test_commands(20, 'en'))
        assert 0 == len(r.keys('COMMAND::zh_TW::*'))
        assert 20 == len(r.keys('COMMAND::en::*'))
        assert 20 == len(r.keys('COMMAND::*'))
uifunc.py 文件源码 项目:map-of-innovation 作者: AnanseGroup 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def index(self):
        # Return a rendered front page  template
        markers = []
        indices = {
          "name": "name",
          "city": "city",
          "country": "country",
          "website": "primary_website",
          "primarytype": "primary_type",
          "multitypes": "types_multiple",
          "description": "description",
          "latitude": "latitude",
          "longitude":"longitude",
          "services": "services"
        }
        if os.environ.get("REDIS_URL") :
          redis_url = os.environ.get("REDIS_URL")
        else:
          redis_url = "localhost"
        r = redis.from_url(redis_url)
        i = 0 
        for key in r.scan_iter():
          marker = {}
          row = r.hgetall(key)
          for header in indices.keys():
            marker[header] = unicode(row[str(indices[header])], errors='replace')
          markers.append(marker)
        c.markers = json.dumps(markers)     
        return render('/makermap.html')
baseapi.py 文件源码 项目:map-of-innovation 作者: AnanseGroup 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def archiveSpace(self):
    #archive a space
    skey = request.params.get("id")
    if os.environ.get("REDIS_URL") :
        redis_url = os.environ.get("REDIS_URL")
    else:
        redis_url = "localhost"
    r = redis.from_url(redis_url)
    r.hset(skey,'archived',True) 
    return {'sucess':'true'}
tests.py 文件源码 项目:turingtweets 作者: jjfeore 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_markov():
    """Return a redis connection."""
    from turingtweets.scripts.builddict import gen_markov
    gen_markov()
    host_url = os.environ.get('REDIS_URL')
    chains = redis.from_url(host_url)
    return chains
nlp.py 文件源码 项目:turingtweets 作者: jjfeore 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def gen_tweet():
    """Read the redis, and build a fake tweet from that."""
    host_url = os.environ.get('REDIS_URL')
    chains = redis.from_url(host_url)
    markov_chains = chains.get('markov_tweets')
    markov_chains = pickle.loads(markov_chains)
    the_tweet = None

    while not the_tweet:
        the_tweet = markov_chains.make_short_sentence(140, 70)
    return the_tweet
controller.py 文件源码 项目:bqueryd 作者: visualfabriq 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO):

        self.redis_url = redis_url
        self.redis_server = redis.from_url(redis_url)
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.ROUTER)
        self.socket.setsockopt(zmq.LINGER, 500)
        self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1)  # Paranoid for debugging purposes
        self.socket.setsockopt(zmq.SNDTIMEO, 1000)  # Short timeout
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)

        self.node_name = socket.gethostname()
        self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399,
                                           max_tries=100)
        with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F:
            F.write(self.address)
        with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F:
            F.write(str(os.getpid()))

        self.logger = bqueryd.logger.getChild('controller').getChild(self.address)
        self.logger.setLevel(loglevel)

        self.msg_count_in = 0
        self.rpc_results = []  # buffer of results that are ready to be returned to callers
        self.rpc_segments = {}  # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs
        self.worker_map = {}  # maintain a list of connected workers TODO get rid of unresponsive ones...
        self.files_map = {}  # shows on which workers a file is available on
        self.worker_out_messages = {None: []}  # A dict of buffers, used to round-robin based on message affinity
        self.worker_out_messages_sequence = [None]  # used to round-robin the outgoing messages
        self.is_running = True
        self.last_heartbeat = 0
        self.others = {}  # A dict of other Controllers running on other DQE nodes
        self.start_time = time.time()
rpc.py 文件源码 项目:bqueryd 作者: visualfabriq 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_download_data(self):
        redis_server = redis.from_url(self.redis_url)
        tickets = set(redis_server.keys(bqueryd.REDIS_TICKET_KEY_PREFIX+'*'))
        data = {}
        for ticket in tickets:
            tmp = redis_server.hgetall(ticket)
            data[ticket] = tmp
        return data
rpc.py 文件源码 项目:bqueryd 作者: visualfabriq 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def delete_download(self, ticket):
        redis_server = redis.from_url(self.redis_url)
        tmp = redis_server.hgetall(ticket)
        count = 0
        for k, v in tmp.items():
            count += redis_server.hdel(ticket, k)
        return count
test_mutex.py 文件源码 项目:scrummasterbot 作者: ariyorinoari 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def setUp(self):
        self.redis = redis.from_url(TestLock.REDIS_URL)
        self.mutex= Mutex(self.redis, TestLock.KEY)
test_mutex.py 文件源码 项目:scrummasterbot 作者: ariyorinoari 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_multi_process(self):
        mutex = self.mutex

        def lock_success(self):
            r = redis.from_url(TestLock._REDIS_URL)
            m = Mutex(self.redis, TestLock._KEY)
            m.lock()
            self.assertTrue(m.is_lock())
            time.sleep(10)
            m.unlock()

        def lock_error(self):
            r = redis.from_url(TestLock._REDIS_URL)
            m = Mutex(self.redis, TestLock._KEY)
            m.lock()
            self.assertFalse(m.is_lock())

        jobs = [
            Process(target=lock_success, args=(self, )),
            Process(target=lock_error, args=(self, ))
        ]

        for i in jobs:
            i.start()

        for i in jobs:
            i.join()
statcollection.py 文件源码 项目:scrappy 作者: DormyMo 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def __init__(self, crawler):
        self._dump = crawler.settings.getbool('STATS_DUMP')#default: STATS_DUMP = True
        redis_url = crawler.settings.get('REDIS_URL', REDIS_URL)
        self.stats_key = crawler.settings.get('STATS_KEY', 'scrappy:stats')
        self.server = redis.from_url(redis_url)
connection.py 文件源码 项目:scrappy 作者: DormyMo 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def from_settings(settings):
    url = settings.get('REDIS_URL',  REDIS_URL)
    host = settings.get('REDIS_HOST', REDIS_HOST)
    port = settings.get('REDIS_PORT', REDIS_PORT)

    # REDIS_URL takes precedence over host/port specification.
    if url:
        return redis.from_url(url)
    else:
        return redis.Redis(host=host, port=port)
SMSWorker.py 文件源码 项目:mWorkerService 作者: smices 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main(msg, config=None, silent=False):
    """
    Job enqueue
    :param msg:str
    :param config:object
    :return:
    """
    queue_dsn = config["queue"]["dsn"]
    redis_conn = redis.from_url(queue_dsn)

    q = Queue('high', connection=redis_conn)
    ret = q.enqueue(push_messenger, msg, result_ttl=60)
    print ret
    return ret
SMSWorker.py 文件源码 项目:mWorkerService 作者: smices 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main(config):
    global worker_config
    worker_config = config

    listen = config["listen"].values()

    queue_dsn = config["queue"]["dsn"]

    conn = redis.from_url(queue_dsn)

    with Connection(conn):
        worker = Worker(map(Queue, listen))
        worker.work()
PushWorker.py 文件源码 项目:mWorkerService 作者: smices 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main(config=None):

    listen = config["listen"].values()

    queue_dsn = config["queue"]["dsn"]

    conn = redis.from_url(queue_dsn)

    with Connection(conn):
        worker = Worker(map(Queue, listen))
        worker.work()
redis.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def init_app(self, app):
        self.redis = redis.from_url(app.config['REDIS_URL'])
        self.logger = app.logger
flask_tus.py 文件源码 项目:posm-opendronemap-api 作者: posm 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def redis_connect(self):
        return redis.from_url(os.environ.get('REDIS_URL', 'redis://'))
redis.py 文件源码 项目:rqalpha-mod-fxdayu-source 作者: xingetouzi 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, redis_url):
        import redis
        self._client = redis.from_url(redis_url)
redis_data_source.py 文件源码 项目:rqalpha 作者: ricequant 项目源码 文件源码 阅读 12 收藏 0 点赞 0 评论 0
def __init__(self, path, redis_uri):
        super(RedisDataSource, self).__init__(path)
        self._env = Environment.get_instance()
        import redis
        self._redis_client = redis.from_url(redis_uri)
backends.py 文件源码 项目:flask-caching 作者: sh4nks 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def redis(app, config, args, kwargs):
    try:
        from redis import from_url as redis_from_url
    except ImportError:
        raise RuntimeError('no redis module found')

    kwargs.update(dict(
        host=config.get('CACHE_REDIS_HOST', 'localhost'),
        port=config.get('CACHE_REDIS_PORT', 6379),
    ))
    password = config.get('CACHE_REDIS_PASSWORD')
    if password:
        kwargs['password'] = password

    key_prefix = config.get('CACHE_KEY_PREFIX')
    if key_prefix:
        kwargs['key_prefix'] = key_prefix

    db_number = config.get('CACHE_REDIS_DB')
    if db_number:
        kwargs['db'] = db_number

    redis_url = config.get('CACHE_REDIS_URL')
    if redis_url:
        kwargs['host'] = redis_from_url(
            redis_url,
            db=kwargs.pop('db', None),
        )

    return RedisCache(*args, **kwargs)


问题


面经


文章

微信
公众号

扫码关注公众号