python类get()的实例源码

app.py 文件源码 项目:scrummasterbot 作者: ariyorinoari 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def handle_text_message(event):
    text = event.message.text
    sourceId = getSourceId(event.source)
    matcher = re.match(r'^#(\d+) (.+)', text)

    if text == '???':
        poker_mutex = Mutex(redis, POKER_MUTEX_KEY_PREFIX+ sourceId)
        poker_mutex.lock()
        if poker_mutex.is_lock():
            number = str(redis.incr(sourceId)).encode('utf-8')
            line_bot_api.reply_message(
               event.reply_token,
               generate_planning_poker_message(number))
            time.sleep(POKER_MUTEX_TIMEOUT)
            if poker_mutex.is_lock():
                poker_mutex.unlock()
    elif matcher is not None:
        number = matcher.group(1)
        value = matcher.group(2)
        current = redis.get(sourceId).encode('utf-8')
        vote_key = sourceId + number 
        status = redis.hget(vote_key, 'status')
        if status is None:
            if number != current:
                line_bot_api.reply_message(
                    event.reply_token,
                    TextMessage(text=MESSAGE_INVALID_VOTE.format(number)))
                return
            poker_mutex = Mutex(redis, POKER_MUTEX_KEY_PREFIX+ sourceId)
            vote_mutex = Mutex(redis, VOTE_MUTEX_KEY_PREFIX  + sourceId)
            location = mapping.keys()[mapping.values().index(value)]
            vote_mutex.lock()
            if vote_mutex.is_lock():
                time.sleep(VOTE_MUTEX_TIMEOUT)
                redis.hincrby(vote_key, location)
                line_bot_api.reply_message(
                    event.reply_token,
                    genenate_voting_result_message(vote_key)
                )
                redis.hset(vote_key, 'status', 'complete')
                vote_mutex.unlock()
                poker_mutex.release()
            else:
                redis.hincrby(vote_key, location)
        else:
            line_bot_api.reply_message(
                event.reply_token,
                TextMessage(text=MESSAGE_END_POKER.format(number)))
rosie-ci.py 文件源码 项目:rosie-ci 作者: adafruit 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def travis():
    signature = base64.b64decode(request.headers.get('Signature'))
    try:
        public_key = _get_travis_public_key()
    except requests.Timeout:
        print("Timed out when attempting to retrieve Travis CI public key")
        abort(500)
    except requests.RequestException as e:
        print("Failed to retrieve Travis CI public key")
        abort(500)
    try:
        check_authorized(signature, public_key, request.form["payload"])
    except SignatureError:
        abort(401)
    data = json.loads(request.form["payload"])

    repo = data["repository"]["owner_name"] + "/" + data["repository"]["name"]
    build_number = data["id"]
    sha = data["commit"]
    if data["type"] == "pull_request":
        sha = data["head_commit"]
    tag = None
    if data["type"] == "push" and data["tag"] != None:
        tag = data["tag"]
    print(data)

    key = sha
    if tag is not None:
        key = tag

    upload_lock = "upload-lock:" + sha

    if data["state"] in ("started", ):
        print("travis started", key)
        # Handle pulls differently.
        if data["pull_request"]:
            load_code.delay(repo, "pull/" + str(data["pull_request_number"]) + "/head")
        elif data["tag"]:
            load_code.delay(repo, "refs/tags/" + tag)
        else:
            load_code.delay(repo, "refs/heads/" + data["branch"])
        redis.setex(upload_lock, 20 * 60, "locked")
        set_status(repo, sha, "pending", data["build_url"], "Waiting on Travis to complete.")
    elif data["state"] in ("passed", "failed"):
        print("travis finished")
        key = repo + "/" + key
        set_status(repo, sha, "pending", "https://rosie-ci.ngrok.io/log/" + key, "Queueing Rosie test.")
        redis.delete(upload_lock)
        test_commit(repo, sha, tag)
    elif data["state"] is ("cancelled", ):
        print("travis cancelled")
        redis.delete(upload_lock)
        set_status(repo, sha, "error", data["build_url"], "Travis cancelled.")
    elif data["status"] is None:
        set_status(repo, sha, "error", data["build_url"], "Travis error.")
    else:
        print("unhandled state:", data["state"])
        print(data)
    return jsonify({'status': 'received'})


问题


面经


文章

微信
公众号

扫码关注公众号