def get_pfn_to_rse(pfns, session=None):
"""
Get the RSE associated to a list of PFNs.
:param pfns: The list of pfn.
:param session: The database session in use.
"""
unknown_replicas = {}
storage_elements = []
se_condition = []
dict_rse = {}
surls = clean_surls(pfns)
scheme = surls[0].split(':')[0]
for surl in surls:
if surl.split(':')[0] != scheme:
raise exception.InvalidType('The PFNs specified must have the same protocol')
split_se = surl.split('/')[2].split(':')
storage_element = split_se[0]
if storage_element not in storage_elements:
storage_elements.append(storage_element)
se_condition.append(models.RSEProtocols.hostname == storage_element)
query = session.query(models.RSE.rse, models.RSEProtocols.scheme, models.RSEProtocols.hostname, models.RSEProtocols.port, models.RSEProtocols.prefix).\
filter(models.RSEProtocols.rse_id == models.RSE.id).filter(and_(or_(*se_condition), models.RSEProtocols.scheme == scheme)).filter(models.RSE.staging_area == false())
protocols = {}
for rse, protocol, hostname, port, prefix in query.yield_per(10000):
protocols[rse] = ('%s://%s%s' % (protocol, hostname, prefix), '%s://%s:%s%s' % (protocol, hostname, port, prefix))
hint = None
for surl in surls:
if hint and (surl.find(protocols[hint][0]) > -1 or surl.find(protocols[hint][1]) > -1):
dict_rse[hint].append(surl)
else:
mult_rse_match = 0
for rse in protocols:
if surl.find(protocols[rse][0]) > -1 or surl.find(protocols[rse][1]) > -1:
mult_rse_match += 1
if mult_rse_match > 1:
print 'ERROR, multiple matches : %s at %s' % (surl, rse)
raise exception.RucioException('ERROR, multiple matches : %s at %s' % (surl, rse))
hint = rse
if hint not in dict_rse:
dict_rse[hint] = []
dict_rse[hint].append(surl)
if mult_rse_match == 0:
if 'unknown' not in unknown_replicas:
unknown_replicas['unknown'] = []
unknown_replicas['unknown'].append(surl)
return scheme, dict_rse, unknown_replicas
评论列表
文章目录