python类action_get()的实例源码

ceph_ops.py 文件源码 项目:charm-ceph-proxy 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def pool_get():
    key = action_get("key")
    pool_name = action_get("pool_name")
    try:
        value = check_output(['ceph', 'osd', 'pool', 'get', pool_name, key])
        return value
    except CalledProcessError as e:
        action_fail(e.message)
ceph_ops.py 文件源码 项目:charm-ceph-proxy 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set_pool():
    key = action_get("key")
    value = action_get("value")
    pool_name = action_get("pool_name")
    pool_set(service='ceph', pool_name=pool_name, key=key, value=value)
ceph_ops.py 文件源码 项目:charm-ceph-proxy 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def delete_pool_snapshot():
    pool_name = action_get("pool-name")
    snapshot_name = action_get("snapshot-name")
    remove_pool_snapshot(service='ceph',
                         pool_name=pool_name,
                         snapshot_name=snapshot_name)


# Note only one or the other can be set
ceph_ops.py 文件源码 项目:charm-ceph-proxy 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def set_pool_max_bytes():
    pool_name = action_get("pool-name")
    max_bytes = action_get("max")
    set_pool_quota(service='ceph',
                   pool_name=pool_name,
                   max_bytes=max_bytes)
create-cache-tier.py 文件源码 项目:charm-ceph-proxy 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def make_cache_tier():
    backer_pool = action_get("backer-pool")
    cache_pool = action_get("cache-pool")
    cache_mode = action_get("cache-mode")

    # Pre flight checks
    if not pool_exists('admin', backer_pool):
        log("Please create {} pool before calling create-cache-tier".format(
            backer_pool))
        action_fail("create-cache-tier failed. Backer pool {} must exist "
                    "before calling this".format(backer_pool))

    if not pool_exists('admin', cache_pool):
        log("Please create {} pool before calling create-cache-tier".format(
            cache_pool))
        action_fail("create-cache-tier failed. Cache pool {} must exist "
                    "before calling this".format(cache_pool))

    pool = Pool(service='admin', name=backer_pool)
    try:
        pool.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
    except CalledProcessError as err:
        log("Add cache tier failed with message: {}".format(
            err.message))
        action_fail("create-cache-tier failed.  Add cache tier failed with "
                    "message: {}".format(err.message))
test_hookenv.py 文件源码 项目:charm-helpers 作者: juju 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_action_get_with_key(self, check_output):
        action_data = 'bar'
        check_output.return_value = json.dumps(action_data).encode('UTF-8')

        result = hookenv.action_get(key='foo')

        self.assertEqual(result, 'bar')
        check_output.assert_called_with(['action-get', 'foo', '--format=json'])
test_hookenv.py 文件源码 项目:charm-helpers 作者: juju 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_action_get_without_key(self, check_output):
        check_output.return_value = json.dumps(dict(foo='bar')).encode('UTF-8')

        result = hookenv.action_get()

        self.assertEqual(result['foo'], 'bar')
        check_output.assert_called_with(['action-get', '--format=json'])


问题


面经


文章

微信
公众号

扫码关注公众号