def unquote_unreserved(uri):
"""Un-escape any percent-escape sequences in a URI that are unreserved
characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
:rtype: str
"""
parts = uri.split('%')
for i in range(1, len(parts)):
h = parts[i][0:2]
if len(h) == 2 and h.isalnum():
try:
c = chr(int(h, 16))
except ValueError:
raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
else:
parts[i] = '%' + parts[i]
else:
parts[i] = '%' + parts[i]
return ''.join(parts)
python类split()的实例源码
application.py 文件源码
项目:almond-nnparser
作者: Stanford-Mobisocial-IoT-Lab
项目源码
文件源码
阅读 40
收藏 0
点赞 0
评论 0
def get_language(self, locale):
'''
Convert a locale tag into a preloaded language
'''
split_tag = re.split("[_\\.\\-]", locale)
# try with language and country
language = None
if len(split_tag) >= 2:
language = self._languages.get(split_tag[0] + "-" + split_tag[1], None)
if language is None and len(split_tag) >= 1:
language = self._languages.get(split_tag[0], None)
# fallback to english if the language is not recognized or
# locale was not specified
if language:
return language
else:
return language['en']
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def attempt_special(self, cmd_str):
# Special command handling
fields = cmd_str.split(" ")
SKIP = False
command_set = []
if cmd_str[0] == '@' or cmd_str[0] == '%':
special_cmd = fields[0][1:]
if special_cmd == "script":
command_set = self.parse_script(fields[1], *fields[2:])
elif special_cmd in special_commands:
try:
special_commands[special_cmd](*fields[1:])
SKIP = True
except TypeError as e:
print e
print("Likely incorrect usage of '%s'" % special_cmd)
return SKIP, command_set
def super_handle(self, command_set):
handled = None
for command_i, command in enumerate(command_set):
to_capture = []
try:
to_capture = self.uncaptured_variable_blocks[command_i]
except:
pass
handled = self.handle_command(command.split(" "), to_capture, self.variables, self.meta)
if handled:
if "captured" in handled:
self.variables.update(handled["captured"])
print("")
#####################################
#TODO return aggregate message for scripts instead of last message
return handled
def get_proc_etime(self,pid):
fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d ' | awk '{print $NF}'" % pid).strip()
if fmt == '':
return -1
parts = fmt.split('-')
days = int(parts[0]) if len(parts) == 2 else 0
fmt = parts[-1]
parts = fmt.split(':')
hours = int(parts[0]) if len(parts) == 3 else 0
parts = parts[len(parts)-2:]
minutes = int(parts[0])
seconds = int(parts[1])
return ((days * 24 + hours) * 60 + minutes) * 60 + seconds
# compute the billing val this running hour
# if isreal is True, it will also make users' beans decrease to pay for the bill.
# return the billing value in this running hour
def unquote_unreserved(uri):
"""Un-escape any percent-escape sequences in a URI that are unreserved
characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
:rtype: str
"""
parts = uri.split('%')
for i in range(1, len(parts)):
h = parts[i][0:2]
if len(h) == 2 and h.isalnum():
try:
c = chr(int(h, 16))
except ValueError:
raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
else:
parts[i] = '%' + parts[i]
else:
parts[i] = '%' + parts[i]
return ''.join(parts)
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def unquote_unreserved(uri):
"""Un-escape any percent-escape sequences in a URI that are unreserved
characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
:rtype: str
"""
parts = uri.split('%')
for i in range(1, len(parts)):
h = parts[i][0:2]
if len(h) == 2 and h.isalnum():
try:
c = chr(int(h, 16))
except ValueError:
raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
else:
parts[i] = '%' + parts[i]
else:
parts[i] = '%' + parts[i]
return ''.join(parts)
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
sam-to-bedseq.py 文件源码
项目:personal-identification-pipeline
作者: TeamErlich
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def build_sam_tags(flds):
"""
Given a list of fields from a SAM file
(all fields, including the first 11 fixed fields),
returns a dictionary with the SAM tags (e.g. 'MD', 'NM').
Tags with type 'i' are converted to integers.
Tags with type 'f' are converted to floats.
Example:
tags = build_sam_tags( ["NM:i:0","MD:Z:77"])
=> tags = { 'NM':0, 'MD':'77' }
"""
# Split tags into tuples of (name,type,value)
# e.g. ["NM:i:0","MD:Z:77"] => [('NM', 'i', '0'), ('MD', 'Z', '77')]
in_tags = [ tuple(x.split(':')) for x in flds[11:]]
out_tags = {}
for n,t,v in in_tags:
if t=="i":
v = int(v)
elif t=='f':
v = float(v)
out_tags[n]=v
return out_tags
def slice_arg(s):
"""
Parse a string that describes a slice with start and end.
>>> slice_arg('2:-3')
slice(2, -3, None)
>> slice_arg(':-3')
slice(None, -3, None)
>> slice_arg('2:')
slice(2, None, None)
"""
start, end = s.split(':')
start = None if start == '' else int(start)
end = None if end == '' else int(end)
return slice(start, end)
def __init__(self, fullpath):
fn = os.path.split(fullpath)[-1]
dot_parts = fn.split(".")
if dot_parts[-1] == "fastq":
name = dot_parts[-2]
elif len(dot_parts) > 2 and dot_parts[-2] == "fastq":
name = dot_parts[-3]
else:
raise NameError("%s is not a fastq file" % fullpath)
all_flds = name.split("_")
flds = all_flds[-4:]
self.prefix = "_".join(all_flds[:-4])
self.s = flds[0][1:]
self.lane = int(flds[1][2:])
self.read = flds[2]
self.group = int(flds[3])
self.filename = fullpath
def get_run_data(fn):
""" Parse flowcell + lane from the first FASTQ record.
NOTE: we don't check whether there are multiple FC / lanes in this file.
NOTE: taken from longranger/mro/stages/reads/setup_chunks
"""
if fn[-2:] == 'gz':
reader = gzip.open(fn)
else:
reader = open(fn, 'r')
gen = read_generator_fastq(reader)
try:
(name, seq, qual) = gen.next()
(flowcell, lane) = re.split(':', name)[2:4]
return (flowcell, lane)
except StopIteration:
# empty fastq
raise ValueError('Could not extract flowcell and lane from FASTQ file. File is empty: %s' % fn)
def set_data_field(record, field_name, field_val):
assert(len(record.samples) == 1)
new_format = record.FORMAT
new_fields = new_format.split(':')
if not(field_name in new_fields):
new_fields = new_fields + [field_name]
new_format = ':'.join(new_fields)
sample_call = get_record_sample_call(record)
data = sample_call.data
data_dict = data._asdict()
data_dict[field_name] = field_val
new_sample_vals = []
for field in new_fields:
new_sample_vals.append(data_dict[field])
# Note - the old way of passing the fields to pyVCF is memory intensive
# because a fresh type is allocated for each call to make_calldata_tuple
#data_instantiator = vcf.model.make_calldata_tuple(new_fields)
#data = data_instantiator(*new_sample_vals)
data = FakeNamedTuple(new_fields, new_sample_vals)
sample_call.data = data
record.samples[0] = sample_call
record.FORMAT = new_format
def get_locus_info(locus):
""" Returns chrom, start and stop from locus string.
Enforces standardization of how locus is represented.
chrom:start_stop (start and stop should be ints or 'None')
"""
chrom, start_stop = locus.split(':')
if chrom == 'None':
chrom = None
start, stop = re.split("\.\.|-", start_stop)
if start == 'None':
start = None
else:
start = int(float(start))
if stop == 'None':
stop = None
else:
stop = int(float(stop))
return (str(chrom), start, stop)
def get_target_regions_dict(targets_file):
""" Gets the target regions from a targets file as a chrom-indexed dictionary,
with every entry given as a list of (start, end) tuples
"""
targets = {}
for line in targets_file:
info = line.strip().split('\t')
if line.startswith('browser') or line.startswith('track') or line.startswith('-browser') or line.startswith('-track') or line.startswith('#'):
continue
if len(line.strip()) == 0:
continue
chrom = info[0]
start = int(info[1])
end = int(info[2])
chrom_targs = targets.setdefault(chrom, [])
chrom_targs.append((start, end))
return targets
def read_reference(self):
profiles = {}
reference = {}
reference_file = open(self.path + "Reference.txt", "rU")
bin_size = float("nan")
for line in reference_file:
line = line.strip("\n").strip("\r")
(key, value) = line.split("\t")
if key == "BinSize":
bin_size = int(value)
else:
reference[key] = float(value)
# if BinSize else
# for line
reference_file.close()
profiles["BinSize"] = bin_size
profiles["Reference"] = reference
self.profiles = profiles
return profiles
# read_reference
#...........................................................................
def debug_print(self, msg):
"""Print 'msg' to stdout if the global DEBUG (taken from the
DISTUTILS_DEBUG environment variable) flag is true.
"""
from distutils.debug import DEBUG
if DEBUG:
print msg
sys.stdout.flush()
# -- Option validation methods -------------------------------------
# (these are very handy in writing the 'finalize_options()' method)
#
# NB. the general philosophy here is to ensure that a particular option
# value meets certain type and value constraints. If not, we try to
# force it into conformance (eg. if we expect a list but have a string,
# split the string on comma and/or whitespace). If we can't force the
# option into conformance, raise DistutilsOptionError. Thus, command
# classes need do nothing more than (eg.)
# self.ensure_string_list('foo')
# and they can be guaranteed that thereafter, self.foo will be
# a list of strings.
def push(self, data):
"""Push some new data into this object."""
# Handle any previous leftovers
data, self._partial = self._partial + data, ''
# Crack into lines, but preserve the newlines on the end of each
parts = NLCRE_crack.split(data)
# The *ahem* interesting behaviour of re.split when supplied grouping
# parentheses is that the last element of the resulting list is the
# data after the final RE. In the case of a NL/CR terminated string,
# this is the empty string.
self._partial = parts.pop()
#GAN 29Mar09 bugs 1555570, 1721862 Confusion at 8K boundary ending with \r:
# is there a \n to follow later?
if not self._partial and parts and parts[-1].endswith('\r'):
self._partial = parts.pop(-2)+parts.pop()
# parts is a list of strings, alternating between the line contents
# and the eol character(s). Gather up a list of lines after
# re-attaching the newlines.
lines = []
for i in range(len(parts) // 2):
lines.append(parts[i*2] + parts[i*2+1])
self.pushlines(lines)
def which(program):
# type: (str) -> Optional[str]
program = exename(program)
fpath, _ = os.path.split(program)
if fpath:
if is_executable(program):
return program
else:
for path in [os.path.abspath(os.curdir)] + os.environ['PATH'].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(unifilename(path), unifilename(program))
if is_executable(exe_file):
return exe_file
return None
# ----------------------------------------------------------------------
def reformat(self, sourcefile, destfile, configfile):
# type: (str, str, str) -> None
formatstyle = style_make()
with open(configfile) as fp:
for line in fp.readlines():
line = line.rstrip()
if line.startswith('#'):
continue
parts = line.split('=')
if len(parts) == 2:
optionname, value = parts
set_option(formatstyle, optionname, value)
sourcedata = readbinary(sourcefile)
data = self.formatcode(formatstyle, sourcedata, filename=sourcefile)
if data is None:
data = b''
writebinary(destfile, data)
# ----------------------------------------------------------------------
def reformat(self, sourcefile, destfile, configfile):
# type: (str, str, str) -> None
formatstyle = style_make()
with open(configfile) as fp:
for line in fp.readlines():
line = line.rstrip()
if line.startswith('#'):
continue
parts = re.split(r'\s+=\s+', line)
if len(parts) == 2:
optionname, value = parts
set_option(formatstyle, optionname, value)
sourcedata = readbinary(sourcefile)
data = self.formatcode(formatstyle, sourcedata, filename=sourcefile)
if data is None:
data = b''
writebinary(destfile, data)
# ----------------------------------------------------------------------
# Functions for the in-memory cache
def unpack_exeresult(buf):
# type: (bytes) -> Tuple[int, bytes, bytes]
def unpack_error():
# type: () -> None
raise ValueError('invalid buffer in unpack_exeresult')
buf = binary_type(buf)
pos = buf.find(b'|')
if pos < 0:
unpack_error()
lengths, data = buf[:pos], buf[pos + 1:]
try:
numvalues = [int(s) for s in lengths.split()]
except ValueError:
numvalues = []
if len(numvalues) != 3:
unpack_error()
returncode, outlen, errlen = numvalues
if outlen + errlen != len(data):
unpack_error()
return returncode, data[:outlen], data[outlen:outlen + errlen]
# ----------------------------------------------------------------------
def formatters_for_files(filenames):
# type: (List[str]) -> List[str]
"""Returns a list of formatter names that support every extension of these filenames.
"""
exts = set() # type: Set[str]
for f in filenames:
root, ext = os.path.splitext(f)
ext = ext.lower()
if not ext and root.startswith('.'):
# Recognize extension-only filenames as well.
ext = root.lower()
exts.add(ext)
supported = []
for fmt, fmtexts in SUPPORTED_EXTS:
fmt_exts = set(fmtexts.split()) # type: Set[str]
if not exts or exts.issubset(fmt_exts):
supported.append(fmt)
return supported
def get_release_quality(release_name):
if release_name is None: return
try: release_name = release_name.encode('utf-8')
except: pass
try:
release_name = release_name.upper()
fmt = re.sub('(.+)(\.|\(|\[|\s)(\d{4}|S\d*E\d*|S\d*)(\.|\)|\]|\s)', '', release_name)
fmt = re.split('\.|\(|\)|\[|\]|\s|-', fmt)
fmt = [i.lower() for i in fmt]
if '1080p' in fmt: quality = '1080p'
elif '720p' in fmt: quality = 'HD'
else: quality = 'SD'
if any(i in ['dvdscr', 'r5', 'r6'] for i in fmt): quality = 'SCR'
elif any(i in ['camrip', 'tsrip', 'hdcam', 'hdts', 'dvdcam', 'dvdts', 'cam', 'telesync', 'ts'] for i in fmt): quality = 'CAM'
info = []
if '3d' in fmt or '.3D.' in release_name: info.append('3D')
if any(i in ['hevc', 'h265', 'x265'] for i in fmt): info.append('HEVC')
return quality, info
except:
return 'SD', []
def unquote_unreserved(uri):
"""Un-escape any percent-escape sequences in a URI that are unreserved
characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
:rtype: str
"""
parts = uri.split('%')
for i in range(1, len(parts)):
h = parts[i][0:2]
if len(h) == 2 and h.isalnum():
try:
c = chr(int(h, 16))
except ValueError:
raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
else:
parts[i] = '%' + parts[i]
else:
parts[i] = '%' + parts[i]
return ''.join(parts)
def __init__(self, xmap_file, r_cmap_file, q_cmap_file, confidence_score, reference):
self.xmap = xmap_file
self.rcmap = r_cmap_file
self.qcmap = q_cmap_file
self.confidence_score = confidence_score
self.ref = reference
self.name=xmap_file.rsplit('.',1)[0].split('/')[-1]
self.XmapTable = None
self.filtered_XmapTable = None
self.RcmapTable = None
self.QcmapTable = None
self.ref_id = None
self.ref_inf = None
self.cmap = None
self.unqualified = None
self.qualified = None
self.mapped = None
self.unmapped = None
self.BN = None
self.detail = None
self.no_data = None
self.kicked = None
def pathtype(cls, path=None):
path = os.path.abspath(path or getcwd())
depth = 0
while cd(path):
tpath = path
path = Repo.findparent(path)
if path:
depth += 1
path = os.path.split(path)[0]
if tpath == path: # Reached root.
break
else:
break
return "directory" if depth == 0 else ("program" if depth == 1 else "library")
def __init__(self, path=None, print_warning=False):
path = os.path.abspath(path or getcwd())
self.path = path
self.is_cwd = True
while cd(path):
tpath = path
if os.path.isfile(os.path.join(path, Cfg.file)):
self.path = path
self.is_cwd = False
break
path = os.path.split(path)[0]
if tpath == path: # Reached root.
break
self.name = os.path.basename(self.path)
self.is_classic = os.path.isfile(os.path.join(self.path, 'mbed.bld'))
# is_cwd flag indicates that current dir is assumed to be root, not root repo
if self.is_cwd and print_warning:
warning(
"Could not find mbed program in current path \"%s\".\n"
"You can fix this by calling \"mbed new .\" in the root of your program." % self.path)