api.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:gnocchi 作者: gnocchixyz 项目源码 文件源码
def post(self):
        enforce("create archive policy", {})
        # NOTE(jd): Initialize this one at run-time because we rely on conf
        conf = pecan.request.conf
        valid_agg_methods = (
            archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS_VALUES
        )
        ArchivePolicySchema = voluptuous.Schema({
            voluptuous.Required("name"): six.text_type,
            voluptuous.Required("back_window", default=0): PositiveOrNullInt,
            voluptuous.Required(
                "aggregation_methods",
                default=set(conf.archive_policy.default_aggregation_methods)):
            voluptuous.All(list(valid_agg_methods), voluptuous.Coerce(set)),
            voluptuous.Required("definition"):
            voluptuous.All([{
                "granularity": Timespan,
                "points": PositiveNotNullInt,
                "timespan": Timespan,
                }], voluptuous.Length(min=1)),
            })

        body = deserialize_and_validate(ArchivePolicySchema)
        # Validate the data
        try:
            ap = archive_policy.ArchivePolicy.from_dict(body)
        except ValueError as e:
            abort(400, six.text_type(e))
        enforce("create archive policy", ap)
        try:
            ap = pecan.request.indexer.create_archive_policy(ap)
        except indexer.ArchivePolicyAlreadyExists as e:
            abort(409, six.text_type(e))

        location = "/archive_policy/" + ap.name
        set_resp_location_hdr(location)
        pecan.response.status = 201
        return ap
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号