def wmfactory_directory(self, request):
m = []
for user in pwd.getpwall():
pw_name, pw_passwd, pw_uid, pw_gid, pw_gecos, pw_dir, pw_shell \
= user
realname = string.split(pw_gecos,',')[0]
if not realname:
realname = pw_name
if os.path.exists(os.path.join(pw_dir, self.userDirName)):
m.append({
'href':'%s/'%pw_name,
'text':'%s (file)'%realname
})
twistdsock = os.path.join(pw_dir, self.userSocketName)
if os.path.exists(twistdsock):
linknm = '%s.twistd' % pw_name
m.append({
'href':'%s/'%linknm,
'text':'%s (twistd)'%realname})
return m
python类split()的实例源码
def getMessagePayload(self):
self.logger.debug("Preparing client->device message payload")
salon = -127
try:
salon = read_temp()
except Exception as e:
self.logger.error("error reading local temp")
self.logger.exception(e)
piwnica = -127
relay = 0
try:
os.system("sudo ifconfig eth0 192.168.1.101 netmask 255.255.255.0")
txt = urllib2.urlopen(relay1_addr).read()
lines = string.split(txt, '\n')
piwnica = float(lines[1])
relay = int(lines[0])
except Exception as e:
self.logger.error("error reading data from {0}".format(relay1_addr))
self.logger.exception(e)
payloadDict = {"values":{}}
payloadDict["values"]["relay"] = relay
if salon > -127:
payloadDict["values"]["salon"] = salon
if piwnica > -127:
payloadDict["values"]["piwnica"] = piwnica
payload = json.dumps(payloadDict)
return payload
def get_build_version():
"""Return the version of MSVC that was used to build Python.
For Python 2.3 and up, the version number is included in
sys.version. For earlier versions, assume the compiler is MSVC 6.
"""
prefix = "MSC v."
i = string.find(sys.version, prefix)
if i == -1:
return 6
i = i + len(prefix)
s, rest = sys.version[i:].split(" ", 1)
majorVersion = int(s[:-2]) - 6
minorVersion = int(s[2:3]) / 10.0
# I don't think paths are affected by minor version in version 6
if majorVersion == 6:
minorVersion = 0
if majorVersion >= 6:
return majorVersion + minorVersion
# else we don't know what version of the compiler this is
return None
def find_exe(self, exe):
"""Return path to an MSVC executable program.
Tries to find the program in several places: first, one of the
MSVC program search paths from the registry; next, the directories
in the PATH environment variable. If any of those work, return an
absolute path that is known to exist. If none of them work, just
return the original program name, 'exe'.
"""
for p in self.__paths:
fn = os.path.join(os.path.abspath(p), exe)
if os.path.isfile(fn):
return fn
# didn't find it; try existing path
for p in string.split(os.environ['Path'],';'):
fn = os.path.join(os.path.abspath(p),exe)
if os.path.isfile(fn):
return fn
return exe
def _format_changelog(self, changelog):
"""Format the changelog correctly and convert it to a list of strings
"""
if not changelog:
return changelog
new_changelog = []
for line in string.split(string.strip(changelog), '\n'):
line = string.strip(line)
if line[0] == '*':
new_changelog.extend(['', line])
elif line[0] == '-':
new_changelog.append(line)
else:
new_changelog.append(' ' + line)
# strip trailing newline inserted by first changelog entry
if not new_changelog[0]:
del new_changelog[0]
return new_changelog
# _format_changelog()
# class bdist_rpm
def source_synopsis(file):
line = file.readline()
while line[:1] == '#' or not strip(line):
line = file.readline()
if not line: break
line = strip(line)
if line[:4] == 'r"""': line = line[1:]
if line[:3] == '"""':
line = line[3:]
if line[-1:] == '\\': line = line[:-1]
while not strip(line):
line = file.readline()
if not line: break
result = strip(split(line, '"""')[0])
else: result = None
return result
def locate(path, forceload=0):
"""Locate an object by name or dotted path, importing as necessary."""
parts = [part for part in split(path, '.') if part]
module, n = None, 0
while n < len(parts):
nextmodule = safeimport(join(parts[:n+1], '.'), forceload)
if nextmodule: module, n = nextmodule, n + 1
else: break
if module:
object = module
for part in parts[n:]:
try: object = getattr(object, part)
except AttributeError: return None
return object
else:
if hasattr(__builtin__, path):
return getattr(__builtin__, path)
# --------------------------------------- interactive interpreter interface
def help(self, request):
if type(request) is type(''):
request = request.strip()
if request == 'help': self.intro()
elif request == 'keywords': self.listkeywords()
elif request == 'symbols': self.listsymbols()
elif request == 'topics': self.listtopics()
elif request == 'modules': self.listmodules()
elif request[:8] == 'modules ':
self.listmodules(split(request)[1])
elif request in self.symbols: self.showsymbol(request)
elif request in self.keywords: self.showtopic(request)
elif request in self.topics: self.showtopic(request)
elif request: doc(request, 'Help on %s:')
elif isinstance(request, Helper): self()
else: doc(request, 'Help on %s:')
self.output.write('\n')
def _norm_version(version, build=''):
""" Normalize the version and build strings and return a single
version string using the format major.minor.build (or patchlevel).
"""
l = string.split(version,'.')
if build:
l.append(build)
try:
ints = map(int,l)
except ValueError:
strings = l
else:
strings = map(str,ints)
version = string.join(strings[:3],'.')
return version
def loadConfig(self, config):
configFile = config.getConfigFile()
if not isfile(configFile):
self.logger.warn("Config file %s does not exist. Using defaults." % configFile)
return config
self.logger.debug("Loading config from %s." % configFile)
loader = ConfigParser.SafeConfigParser(allow_no_value=True)
loader.add_section("main")
loader.set("main", "log_file", config.logFile)
loader.set("main", "migration_dirs", join(config.migrationDirs, ":"))
loader.set("main", "pre_migration_dirs", join(config.preMigrationDirs, ":"))
loader.set("main", "post_migration_dirs", join(config.postMigrationDirs, ":"))
loader.set("main", "state_dir", config.stateDir)
loader.set("main", "run_dir", config.runDir)
loader.read(configFile)
config.logFile = loader.get("main", "log_file")
config.migrationDirs = split(loader.get("main", "migration_dirs"), ":")
config.preMigrationDirs = split(loader.get("main", "pre_migration_dirs"), ":")
config.postMigrationDirs = split(loader.get("main", "post_migration_dirs"), ":")
config.stateDir = loader.get("main", "state_dir")
config.runDir = loader.get("main", "run_dir")
return config
def pattern_scan(string):
words = string.split()
wcount = len(words)
eng_count = english_test(string)
print "\n\t\tNumber of Wrods : ", wcount
print "\n\t\tEnglish words =", eng_count
ratio = float("0.0")
ratio = float(eng_count)/float(wcount)
if (ratio > float("0.5")):
print "\n\t\t> The given string is a collection of English words"
return False
else :
return True
def _split(self, definition):
"""
In our YAML parameter definition line, split the key part from the value part.
:param definition: a parameter definition from our deployfish.yml
:type definition: string
:rtype: 2-tuple of strings
"""
key = definition
value = None
delimiter_loc = definition.find('=')
if delimiter_loc > 0:
key = definition[:delimiter_loc]
if len(definition) > delimiter_loc + 1:
value = definition[delimiter_loc + 1:].strip('"')
else:
value = ""
return (key, value)
def _getNetworksNetstatProto(self, proto) :
if(proto == "tcp") :
current_list = self.networks_netstat.list_tcp
argproto = "-t"
elif(proto == "udp") :
current_list = self.networks_netstat.list_udp
argproto = "-u"
i, o = os.popen2("/bin/netstat -an " + argproto)
j = o.readline()
j = o.readline()
j = o.readline()
while(j != ""):
liste = j.split()
if(proto == "tcp") :
current_list.append([liste[3], liste[4], liste[5]])
elif(proto == "udp") :
current_list.append([liste[3], liste[4], None])
j = o.readline()
o.close()
i.close()
def _getNetworksProcProto(self, proto) :
if(proto == "tcp") :
current_list = self.networks_proc.list_tcp
elif(proto == "udp") :
current_list = self.networks_proc.list_udp
try :
fichier = open("/proc/net/" + proto, "r")
except IOError :
print "No such file /proc/net/" + proto
sys.exit(-1)
liste = fichier.readlines()
fichier.close()
liste.pop(0)
for i in liste :
l = string.split(i)
if(proto == "tcp") :
current_list.append([self._iphexatodec(l[1]), self._iphexatodec(l[2]), self.tranlateState[l[3]]])
elif(proto == "udp") :
current_list.append([self._iphexatodec(l[1]), self._iphexatodec(l[2]), None])
def parsePhrase(self, phrase):
"""Decode the phrase, and return a 64bit OTP
I will raise Unauthorized if the parity is wrong
TODO: Add support for hex (MUST) and the '2nd scheme'(SHOULD)"""
words = string.split(phrase)
for i in xrange(len(words)):
words[i] = string.upper(words[i])
b = 0L
for i in xrange(0,5):
b = b | ((long(dict.index(words[i])) << ((4-i)*11L+9L)))
tmp = dict.index(words[5])
b = b | (tmp & 0x7FC ) >> 2
if (tmp & 3) <> self.calculateParity(b):
raise Unauthorized("Parity error")
digest = longToString(b)
return digest
def irc_RPL_NAMREPLY(self,prefix,params):
"""
RPL_NAMREPLY
>> NAMES #bnl
<< :Arlington.VA.US.Undernet.Org 353 z3p = #bnl :pSwede Dan-- SkOyg AG
"""
group=string.lower(params[2][1:])
users=string.split(params[3])
for ui in range(len(users)):
while users[ui][0] in ["@","+"]: # channel modes
users[ui]=users[ui][1:]
if not self._namreplies.has_key(group):
self._namreplies[group]=[]
self._namreplies[group].extend(users)
for nickname in users:
try:
self._ingroups[nickname].append(group)
except:
self._ingroups[nickname]=[group]
def parsemsg(s):
"""Breaks a message from an IRC server into its prefix, command, and arguments.
"""
prefix = ''
trailing = []
if not s:
raise IRCBadMessage("Empty line.")
if s[0] == ':':
prefix, s = s[1:].split(' ', 1)
if s.find(' :') != -1:
s, trailing = s.split(' :', 1)
args = s.split()
args.append(trailing)
else:
args = s.split()
command = args.pop(0)
return prefix, command, args
def split(str, length = 80):
"""I break a message into multiple lines.
I prefer to break at whitespace near str[length]. I also break at \\n.
@returns: list of strings
"""
if length <= 0:
raise ValueError("Length must be a number greater than zero")
r = []
while len(str) > length:
w, n = str[:length].rfind(' '), str[:length].find('\n')
if w == -1 and n == -1:
line, str = str[:length], str[length:]
else:
i = n == -1 and w or n
line, str = str[:i], str[i+1:]
r.append(line)
if len(str):
r.extend(str.split('\n'))
return r
def ctcpQuery_DCC(self, user, channel, data):
"""Initiate a Direct Client Connection
"""
if not data: return
dcctype = data.split(None, 1)[0].upper()
handler = getattr(self, "dcc_" + dcctype, None)
if handler:
if self.dcc_sessions is None:
self.dcc_sessions = []
data = data[len(dcctype)+1:]
handler(user, channel, data)
else:
nick = string.split(user,"!")[0]
self.ctcpMakeReply(nick, [('ERRMSG',
"DCC %s :Unknown DCC type '%s'"
% (data, dcctype))])
self.quirkyMessage("%s offered unknown DCC type %s"
% (user, dcctype))
def modeTocSignon(self):
flap=self.readFlap()
if flap==None:
return "TocSignon"
if flap[0]!=DATA: raise TOCParseError
data=string.split(flap[1]," ")
if data[0]!="toc_signon": raise TOCParseError
for i in data:
if not i:data.remove(i)
password=unroast(data[4])
if not(self.authorize(data[1],int(data[2]),data[3],password)):
self.sendError(BAD_NICKNAME)
self.transport.loseConnection()
return
self.sendFlap(DATA,"SIGN_ON:TOC1.0")
self.sendFlap(DATA,"NICK:%s"%self.saved.nick)
self.sendFlap(DATA,"CONFIG:%s"%self.saved.config)
# sending user configuration goes here
return "Connected"
def modeConnected(self):
flap=self.readFlap()
while flap!=None:
if flap[0] not in [DATA,KEEP_ALIVE]: raise TOCParseError
flapdata=string.split(flap[1]," ",1)
tocname=flapdata[0][4:]
if len(flapdata)==2:
data=flapdata[1]
else:
data=""
func=getattr(self,"toc_"+tocname,None)
if func!=None:
func(data)
else:
self.toc_unknown(tocname,data)
flap=self.readFlap()
return "Connected"
def toc_add_buddy(self,data):
"""
adds users to the buddy list
toc_add_buddy <buddyname1> [<buddyname2>] [<buddyname3>]...
"""
buddies=map(normalize,string.split(data," "))
for b in buddies:
if b not in self.buddylist:
self.buddylist.append(b)
for buddy in buddies:
try:
buddy=self.factory.users[buddy]
except:
pass
else:
self.buddyUpdate(buddy)
def toc_send_im(self,data):
"""
incoming instant message
toc_send_im <screenname> <quoted message> [auto]
"""
username,data=string.split(data," ",1)
auto=0
if data[-4:]=="auto":
auto=1
data=data[:-5]
data=unquote(data)
if not(self.factory.users.has_key(username)):
self.sendError(NOT_AVAILABLE,username)
return
user=self.factory.users[username]
if not(self.canContact(user)):
self.sendError(NOT_AVAILABLE,username)
return
user.hearWhisper(self,data,auto)
def __init__(self, path, defaultType="text/html", ignoredExts=(), registry=None, allowExt=0):
"""Create a file with the given path.
"""
resource.Resource.__init__(self)
filepath.FilePath.__init__(self, path)
# Remove the dots from the path to split
self.defaultType = defaultType
if ignoredExts in (0, 1) or allowExt:
warnings.warn("ignoredExts should receive a list, not a boolean")
if ignoredExts or allowExt:
self.ignoredExts = ['*']
else:
self.ignoredExts = []
else:
self.ignoredExts = list(ignoredExts)
self.registry = registry or Registry()
def lineReceived(self, line):
parts = string.split(line)
if not parts:
parts = ['']
if len(parts) == 1:
slash_w = 0
else:
slash_w = 1
user = parts[-1]
if '@' in user:
host_place = string.rfind(user, '@')
user = user[:host_place]
host = user[host_place+1:]
return self.forwardQuery(slash_w, user, host)
if user:
return self.getUser(slash_w, user)
else:
return self.getDomain(slash_w)
def get_build_version():
"""Return the version of MSVC that was used to build Python.
For Python 2.3 and up, the version number is included in
sys.version. For earlier versions, assume the compiler is MSVC 6.
"""
prefix = "MSC v."
i = string.find(sys.version, prefix)
if i == -1:
return 6
i = i + len(prefix)
s, rest = sys.version[i:].split(" ", 1)
majorVersion = int(s[:-2]) - 6
minorVersion = int(s[2:3]) / 10.0
# I don't think paths are affected by minor version in version 6
if majorVersion == 6:
minorVersion = 0
if majorVersion >= 6:
return majorVersion + minorVersion
# else we don't know what version of the compiler this is
return None
def find_exe(self, exe):
"""Return path to an MSVC executable program.
Tries to find the program in several places: first, one of the
MSVC program search paths from the registry; next, the directories
in the PATH environment variable. If any of those work, return an
absolute path that is known to exist. If none of them work, just
return the original program name, 'exe'.
"""
for p in self.__paths:
fn = os.path.join(os.path.abspath(p), exe)
if os.path.isfile(fn):
return fn
# didn't find it; try existing path
for p in string.split(os.environ['Path'],';'):
fn = os.path.join(os.path.abspath(p),exe)
if os.path.isfile(fn):
return fn
return exe
def source_synopsis(file):
line = file.readline()
while line[:1] == '#' or not strip(line):
line = file.readline()
if not line: break
line = strip(line)
if line[:4] == 'r"""': line = line[1:]
if line[:3] == '"""':
line = line[3:]
if line[-1:] == '\\': line = line[:-1]
while not strip(line):
line = file.readline()
if not line: break
result = strip(split(line, '"""')[0])
else: result = None
return result
def locate(path, forceload=0):
"""Locate an object by name or dotted path, importing as necessary."""
parts = [part for part in split(path, '.') if part]
module, n = None, 0
while n < len(parts):
nextmodule = safeimport(join(parts[:n+1], '.'), forceload)
if nextmodule: module, n = nextmodule, n + 1
else: break
if module:
object = module
else:
object = __builtin__
for part in parts[n:]:
try:
object = getattr(object, part)
except AttributeError:
return None
return object
# --------------------------------------- interactive interpreter interface
def help(self, request):
if type(request) is type(''):
request = request.strip()
if request == 'help': self.intro()
elif request == 'keywords': self.listkeywords()
elif request == 'symbols': self.listsymbols()
elif request == 'topics': self.listtopics()
elif request == 'modules': self.listmodules()
elif request[:8] == 'modules ':
self.listmodules(split(request)[1])
elif request in self.symbols: self.showsymbol(request)
elif request in self.keywords: self.showtopic(request)
elif request in self.topics: self.showtopic(request)
elif request: doc(request, 'Help on %s:')
elif isinstance(request, Helper): self()
else: doc(request, 'Help on %s:')
self.output.write('\n')