replica.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:rucio 作者: rucio01 项目源码 文件源码
def touch_collection_replicas(collection_replicas, session=None):
    """
    Update the accessed_at timestamp of the given collection replicas.

    :param collection_replicas: the list of collection replicas.
    :param session: The database session in use.

    :returns: True, if successful, False otherwise.
    """

    rse_ids, now = {}, datetime.utcnow()
    for collection_replica in collection_replicas:
        if 'rse_id' not in collection_replica:
            if collection_replica['rse'] not in rse_ids:
                rse_ids[collection_replica['rse']] = get_rse_id(rse=collection_replica['rse'], session=session)
            collection_replica['rse_id'] = rse_ids[collection_replica['rse']]

        try:
            session.query(models.CollectionReplica).filter_by(scope=collection_replica['scope'], name=collection_replica['name'], rse_id=collection_replica['rse_id']).\
                update({'accessed_at': collection_replica.get('accessed_at') or now}, synchronize_session=False)
        except DatabaseError:
            return False

    return True
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号