def calcAll(genotypeRDD, gwasRDD, thresholdlist, logsnp, samplenum,calltableRDD=False):
logger.info("Started calculating PRS at each threshold")
prsMap={}
thresholdNoMaxSorted=sorted(thresholdlist, reverse=True)
thresholdmax=max(thresholdlist)
idlog={}
start=time.time()
for threshold in thresholdNoMaxSorted:
tic=time.time()
gwasFilteredBC=sc.broadcast(filterGWASByP_DF(GWASdf=gwasRDD, pcolumn=gwas_p, idcolumn=gwas_id, oddscolumn=gwas_or, pHigh=threshold, logOdds=log_or))
#gwasFiltered=spark.sql("SELECT snpid, gwas_or_float FROM gwastable WHERE gwas_p_float < {:f}".format(threshold)
logger.info("Filtered GWAS at threshold of {}. Time spent : {:.1f} seconds".format(str(threshold), time.time()-tic))
checkpoint=time.time()
filteredgenotype=genotypeRDD.filter(lambda line: line[0] in gwasFilteredBC.value)
assert calltableRDD, "Error, calltable must be provided"
filteredcalltable=calltableRDD.filter(lambda line: line[0] in gwasFilteredBC.value )
if not filteredgenotype.isEmpty():
#assert filteredcalltable.count()==filteredgenotype.count(), "Error, call table have different size from genotype"
if logsnp:
idlog[threshold]=filteredgenotype.map(lambda line:line[0]).collect()
prsMap[threshold]=calcPRSFromGeno(filteredgenotype, gwasFilteredBC.value,samplenum=samplenum, calltable=filteredcalltable)
logger.info("Finished calculating PRS at threshold of {}. Time spent : {:.1f} seconds".format(str(threshold), time.time()-checkpoint))
else:
logger.warn("No SNPs left at threshold {}" .format(threshold))
return prsMap, idlog
评论列表
文章目录