def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)
python类compile()的实例源码
def parse_treasury_csv_column(column):
"""
Parse a treasury CSV column into a more human-readable format.
Columns start with 'RIFLGFC', followed by Y or M (year or month), followed
by a two-digit number signifying number of years/months, followed by _N.B.
We only care about the middle two entries, which we turn into a string like
3month or 30year.
"""
column_re = re.compile(
r"^(?P<prefix>RIFLGFC)"
"(?P<unit>[YM])"
"(?P<periods>[0-9]{2})"
"(?P<suffix>_N.B)$"
)
match = column_re.match(column)
if match is None:
raise ValueError("Couldn't parse CSV column %r." % column)
unit, periods = get_unit_and_periods(match.groupdict())
# Roundtrip through int to coerce '06' into '6'.
return str(int(periods)) + ('year' if unit == 'Y' else 'month')
def get_transcript_gc_content(self, transcript_obj):
pattern = re.compile('[cCgG]')
gc, length = 0, 0
for interval in transcript_obj.intervals:
if interval.chrom not in self.chroms:
continue
seq = self.chroms[interval.chrom][interval.start:interval.end]
gc += len(re.findall(pattern, seq))
length += interval.length
if length > 0:
return float(gc) / float(length)
else:
return 0
# NOTE: these stub classes are necessary to maintain backwards compatibility with old refdata (1.2 or older)
def default(self, line):
if line[:1] == '!': line = line[1:]
locals = self.curframe_locals
globals = self.curframe.f_globals
try:
code = compile(line + '\n', '<stdin>', 'single')
save_stdout = sys.stdout
save_stdin = sys.stdin
save_displayhook = sys.displayhook
try:
sys.stdin = self.stdin
sys.stdout = self.stdout
sys.displayhook = self.displayhook
exec code in globals, locals
finally:
sys.stdout = save_stdout
sys.stdin = save_stdin
sys.displayhook = save_displayhook
except:
t, v = sys.exc_info()[:2]
if type(t) == type(''):
exc_type_name = t
else: exc_type_name = t.__name__
print >>self.stdout, '***', exc_type_name + ':', v
def __exit__(self, exc_type, exc_value, tb):
if exc_type is None:
try:
exc_name = self.expected.__name__
except AttributeError:
exc_name = str(self.expected)
raise self.failureException(
"{0} not raised".format(exc_name))
if not issubclass(exc_type, self.expected):
# let unexpected exceptions pass through
return False
self.exception = exc_value # store for later retrieval
if self.expected_regexp is None:
return True
expected_regexp = self.expected_regexp
if isinstance(expected_regexp, basestring):
expected_regexp = re.compile(expected_regexp)
if not expected_regexp.search(str(exc_value)):
raise self.failureException('"%s" does not match "%s"' %
(expected_regexp.pattern, str(exc_value)))
return True
def descriptions(self, group_pattern):
"""Get descriptions for a range of groups."""
line_pat = re.compile("^(?P<group>[^ \t]+)[ \t]+(.*)$")
# Try the more std (acc. to RFC2980) LIST NEWSGROUPS first
resp, raw_lines = self.longcmd('LIST NEWSGROUPS ' + group_pattern)
if resp[:3] != "215":
# Now the deprecated XGTITLE. This either raises an error
# or succeeds with the same output structure as LIST
# NEWSGROUPS.
resp, raw_lines = self.longcmd('XGTITLE ' + group_pattern)
lines = []
for raw_line in raw_lines:
match = line_pat.search(raw_line.strip())
if match:
lines.append(match.group(1, 2))
return resp, lines
def xhdr(self, hdr, str, file=None):
"""Process an XHDR command (optional server extension). Arguments:
- hdr: the header type (e.g. 'subject')
- str: an article nr, a message id, or a range nr1-nr2
Returns:
- resp: server response if successful
- list: list of (nr, value) strings"""
pat = re.compile('^([0-9]+) ?(.*)\n?')
resp, lines = self.longcmd('XHDR ' + hdr + ' ' + str, file)
for i in range(len(lines)):
line = lines[i]
m = pat.match(line)
if m:
lines[i] = m.group(1, 2)
return resp, lines
def parse150(resp):
'''Parse the '150' response for a RETR request.
Returns the expected transfer size or None; size is not guaranteed to
be present in the 150 message.
'''
if resp[:3] != '150':
raise error_reply, resp
global _150_re
if _150_re is None:
import re
_150_re = re.compile("150 .* \((\d+) bytes\)", re.IGNORECASE)
m = _150_re.match(resp)
if not m:
return None
s = m.group(1)
try:
return int(s)
except (OverflowError, ValueError):
return long(s)
def parse227(resp):
'''Parse the '227' response for a PASV request.
Raises error_proto if it does not contain '(h1,h2,h3,h4,p1,p2)'
Return ('host.addr.as.numbers', port#) tuple.'''
if resp[:3] != '227':
raise error_reply, resp
global _227_re
if _227_re is None:
import re
_227_re = re.compile(r'(\d+),(\d+),(\d+),(\d+),(\d+),(\d+)')
m = _227_re.search(resp)
if not m:
raise error_proto, resp
numbers = m.groups()
host = '.'.join(numbers[:4])
port = (int(numbers[4]) << 8) + int(numbers[5])
return host, port
def find_templates():
"""
Load python modules from templates directory and get templates list
:return: list of tuples (pairs):
[(compiled regex, lambda regex_match: return message_data)]
"""
templates = []
templates_directory = (inspect.getsourcefile(lambda: 0).rstrip('__init__.py') +
'templates')
template_files = os.listdir(templates_directory)
for template_file in template_files:
if template_file.startswith('.') or not template_file.endswith('.py'):
continue
# Hack for dev development and disutils
try:
template_module = importlib.import_module('templates.{}'.format(
template_file.rstrip('.py')
))
except ImportError:
template_module = importlib.import_module('ross.templates.{}'.format(
template_file.rstrip('.py')
))
# Iterate throw items in template.
# If there are variable ends with 'templates',
# extend templates list with it.
for (name, content) in template_module.__dict__.items():
if name.endswith('templates'):
for (regex_text, data_func) in content:
templates.append((re.compile(regex_text, re.IGNORECASE), data_func))
return templates
def list_nics(nic_type=None):
"""Return a list of nics of given type(s)"""
if isinstance(nic_type, six.string_types):
int_types = [nic_type]
else:
int_types = nic_type
interfaces = []
if nic_type:
for int_type in int_types:
cmd = ['ip', 'addr', 'show', 'label', int_type + '*']
ip_output = subprocess.check_output(cmd).decode('UTF-8')
ip_output = ip_output.split('\n')
ip_output = (line for line in ip_output if line)
for line in ip_output:
if line.split()[1].startswith(int_type):
matched = re.search('.*: (' + int_type +
r'[0-9]+\.[0-9]+)@.*', line)
if matched:
iface = matched.groups()[0]
else:
iface = line.split()[1].replace(":", "")
if iface not in interfaces:
interfaces.append(iface)
else:
cmd = ['ip', 'a']
ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n')
ip_output = (line.strip() for line in ip_output if line)
key = re.compile('^[0-9]+:\s+(.+):')
for line in ip_output:
matched = re.search(key, line)
if matched:
iface = matched.group(1)
iface = iface.partition("@")[0]
if iface not in interfaces:
interfaces.append(iface)
return interfaces
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
"""Wait for rmq units extended status to show cluster readiness,
after an optional initial sleep period. Initial sleep is likely
necessary to be effective following a config change, as status
message may not instantly update to non-ready."""
if init_sleep:
time.sleep(init_sleep)
message = re.compile('^Unit is ready and clustered$')
deployment._auto_wait_for_status(message=message,
timeout=timeout,
include_only=['rabbitmq-server'])
def splituser(host):
'''urllib.splituser(), but six's support of this seems broken'''
_userprog = re.compile('^(.*)@(.*)$')
match = _userprog.match(host)
if match:
return match.group(1, 2)
return None, host
def splitpasswd(user):
'''urllib.splitpasswd(), but six's support of this is missing'''
_passwdprog = re.compile('^([^:]*):(.*)$', re.S)
match = _passwdprog.match(user)
if match:
return match.group(1, 2)
return user, None
def strip_tags(text, strip_punctuation=False):
# Return only the words from content, stripping punctuation and HTML.
soup = BeautifulSoup(text)
if strip_punctuation:
punctuation = re.compile('[{}]+'.format(re.escape(p)))
words_only = punctuation.sub('', soup.get_text())
return words_only
words_only = soup.get_text()
return words_only
def __init__(self):
threading.Thread.__init__(self)
self.finished = threading.Event()
# Give these some initial values
self.mouse_position_x = 0
self.mouse_position_y = 0
self.ison = {"shift":False, "caps":False}
# Compile our regex statements.
self.isshift = re.compile('^Shift')
self.iscaps = re.compile('^Caps_Lock')
self.shiftablechar = re.compile('^[a-z0-9]$|^minus$|^equal$|^bracketleft$|^bracketright$|^semicolon$|^backslash$|^apostrophe$|^comma$|^period$|^slash$|^grave$')
self.logrelease = re.compile('.*')
self.isspace = re.compile('^space$')
# Assign default function actions (do nothing).
self.KeyDown = lambda x: True
self.KeyUp = lambda x: True
self.MouseAllButtonsDown = lambda x: True
self.MouseAllButtonsUp = lambda x: True
self.contextEventMask = [X.KeyPress,X.MotionNotify]
# Hook to our display.
self.local_dpy = display.Display()
self.record_dpy = display.Display()
def _search_for_query(self, query):
if query in self._search_pattern_cache:
return self._search_pattern_cache[query]
# Build pattern: include all characters
pattern = []
for c in query:
# pattern.append('[^{0}]*{0}'.format(re.escape(c)))
pattern.append('.*?{0}'.format(re.escape(c)))
pattern = ''.join(pattern)
search = re.compile(pattern, re.IGNORECASE).search
self._search_pattern_cache[query] = search
return search
def defSyntax(self):
'''Define re patterns according to syntax.'''
#------------------REGEX patterns------------------
if self.syntax=='markdown':
self._img_re=re.compile('^(.*)!\\[(.+?)\\]\\((.+?)\\)', re.M | re.L)
self._h_re_base = r'''
(^(.+)[ \t]*\n(=+|-+)[ \t]*\n+)
|
(^(\#{%s}) # \1 = string of #'s
[ \t]*
(.+?) # \2 = Header text
[ \t]*
(?<!\\) # ensure not an escaped trailing '#'
\#* # optional closing #'s (not counted)
\n+
)
'''
self._all_h_re=re.compile(self._h_re_base %'1,6', re.X | re.M)
elif self.syntax=='zim':
self._img_re=re.compile('^(.*)\\{\\{(.+?)\\}\\}(.*)$', re.M | re.L)
self._h_re_base = r'''
^(\={%s}) # \1 = string of ='s
[ \t]*
(.+?) # \2 = Header text
[ \t]*
\1
\n+
'''
self._all_h_re=re.compile(self._h_re_base %'1,6', re.X | re.M)
else:
raise Exception("Unknown syntax %s" %self.syntax)
return
def createNoteBook(title,geeknote=None,verbose=True):
#-------------------Trunc title-------------------
title=title.strip()
title=truncStr(title,MAX_NOTEBOOK_TITLE_LEN)
#-------Make sure title doesnt start with #-------
tp=textparse.TextParser('markdown')
_h_re=re.compile(tp._h_re_base %'1,', re.X | re.M)
m=_h_re.match(title)
if m:
title=m.group(6)
#---------------------Connect---------------------
if geeknote is None:
geeknote=GeekNoteConnector()
geeknote.connectToEvertone()
#-----------------Check if exists-----------------
notebooks=geeknote.getEvernote().findNotebooks()
out.preloader.stop()
if not isinstance(title,unicode):
title=unicode(title,'utf8')
notebooks=[unicode(ii.name,'utf8') for ii in notebooks]
if title in notebooks:
out.successMessage('Notebook already exists.')
return 0
else:
out.preloader.setMessage("Creating notebook...")
result = geeknote.getEvernote().createNotebook(name=title)
if result:
out.successMessage("Notebook has been successfully created.")
return 0
else:
out.failureMessage("Error while the process "
"of creating the notebook.")
return tools.exitErr()