def get_response_sspi(self, challenge=None):
dprint("pywin32 SSPI")
if challenge:
try:
challenge = base64.decodebytes(challenge.encode("utf-8"))
except:
challenge = base64.decodestring(challenge)
output_buffer = None
try:
error_msg, output_buffer = self.sspi_client.authorize(challenge)
except pywintypes.error:
traceback.print_exc(file=sys.stdout)
return None
response_msg = output_buffer[0].Buffer
try:
response_msg = base64.encodebytes(response_msg.encode("utf-8"))
except:
response_msg = base64.encodestring(response_msg)
response_msg = response_msg.decode("utf-8").replace('\012', '')
return response_msg
python类error()的实例源码
def loadTestsFromName(self, name, module=None):
test = unittest.TestLoader.loadTestsFromName(self, name, module)
if isinstance(test, unittest.TestSuite):
pass # hmmm? print "Don't wrap suites yet!", test._tests
elif isinstance(test, unittest.TestCase):
test = self._getTestWrapper(test)
else:
print("XXX - what is", test)
return test
# Lots of classes necessary to support one simple feature: we want a 3rd
# test result state - "SKIPPED" - to indicate that the test wasn't able
# to be executed for various reasons. Inspired by bzr's tests, but it
# has other concepts, such as "Expected Failure", which we don't bother
# with.
# win32 error codes that probably mean we need to be elevated (ie, if we
# aren't elevated, we treat these error codes as 'skipped')
def _GetServiceShortName(longName):
# looks up a services name
# from the display name
# Thanks to Andy McKay for this code.
access = win32con.KEY_READ | win32con.KEY_ENUMERATE_SUB_KEYS | win32con.KEY_QUERY_VALUE
hkey = win32api.RegOpenKey(win32con.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services", 0, access)
num = win32api.RegQueryInfoKey(hkey)[0]
longName = longName.lower()
# loop through number of subkeys
for x in range(0, num):
# find service name, open subkey
svc = win32api.RegEnumKey(hkey, x)
skey = win32api.RegOpenKey(hkey, svc, 0, access)
try:
# find display name
thisName = str(win32api.RegQueryValueEx(skey, "DisplayName")[0])
if thisName.lower() == longName:
return svc
except win32api.error:
# in case there is no key called DisplayName
pass
return None
# Open a service given either it's long or short name.
def RemoveService(serviceName):
try:
import perfmon
perfmon.UnloadPerfCounterTextStrings("python.exe "+serviceName)
except (ImportError, win32api.error):
pass
hscm = win32service.OpenSCManager(None,None,win32service.SC_MANAGER_ALL_ACCESS)
try:
hs = SmartOpenService(hscm, serviceName, win32service.SERVICE_ALL_ACCESS)
win32service.DeleteService(hs)
win32service.CloseServiceHandle(hs)
finally:
win32service.CloseServiceHandle(hscm)
import win32evtlogutil
try:
win32evtlogutil.RemoveSourceFromRegistry(serviceName)
except win32api.error:
pass
def __FindSvcDeps(findName):
if type(findName) is pywintypes.UnicodeType: findName = str(findName)
dict = {}
k = win32api.RegOpenKey(win32con.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services")
num = 0
while 1:
try:
svc = win32api.RegEnumKey(k, num)
except win32api.error:
break
num = num + 1
sk = win32api.RegOpenKey(k, svc)
try:
deps, typ = win32api.RegQueryValueEx(sk, "DependOnService")
except win32api.error:
deps = ()
for dep in deps:
dep = dep.lower()
dep_on = dict.get(dep, [])
dep_on.append(svc)
dict[dep]=dep_on
return __ResolveDeps(findName, dict)
def RestartService(serviceName, args = None, waitSeconds = 30, machine = None):
"Stop the service, and then start it again (with some tolerance for allowing it to stop.)"
try:
StopService(serviceName, machine)
except pywintypes.error as exc:
# Allow only "service not running" error
if exc.winerror!=winerror.ERROR_SERVICE_NOT_ACTIVE:
raise
# Give it a few goes, as the service may take time to stop
for i in range(waitSeconds):
try:
StartService(serviceName, args, machine)
break
except pywintypes.error as exc:
if exc.winerror!=winerror.ERROR_SERVICE_ALREADY_RUNNING:
raise
win32api.Sleep(1000)
else:
print("Gave up waiting for the old service to stop!")
def GetServiceClassString(cls, argv = None):
if argv is None:
argv = sys.argv
import pickle
modName = pickle.whichmodule(cls, cls.__name__)
if modName == '__main__':
try:
fname = win32api.GetFullPathName(argv[0])
path = os.path.split(fname)[0]
# Eaaaahhhh - sometimes this will be a short filename, which causes
# problems with 1.5.1 and the silly filename case rule.
# Get the long name
fname = os.path.join(path, win32api.FindFiles(fname)[0][8])
except win32api.error:
raise error("Could not resolve the path name '%s' to a full path" % (argv[0]))
modName = os.path.splitext(fname)[0]
return modName + "." + cls.__name__
def supported(cls, stream=sys.stdout):
"""A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except Exception:
# guess false in case of error
return False
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
def test_nested_explicit(self):
# Logging levels in a nested namespace, all explicitly set.
m = self.next_message
INF = logging.getLogger("INF")
INF.setLevel(logging.INFO)
INF_ERR = logging.getLogger("INF.ERR")
INF_ERR.setLevel(logging.ERROR)
# These should log.
INF_ERR.log(logging.CRITICAL, m())
INF_ERR.error(m())
# These should not log.
INF_ERR.warning(m())
INF_ERR.info(m())
INF_ERR.debug(m())
self.assert_log_lines([
('INF.ERR', 'CRITICAL', '1'),
('INF.ERR', 'ERROR', '2'),
])
def serve_forever(self, poll_interval):
"""
Run the :mod:`asyncore` loop until normal termination
conditions arise.
:param poll_interval: The interval, in seconds, used in the underlying
:func:`select` or :func:`poll` call by
:func:`asyncore.loop`.
"""
try:
asyncore.loop(poll_interval, map=self._map)
except OSError:
# On FreeBSD 8, closing the server repeatably
# raises this error. We swallow it if the
# server has been closed.
if self.connected or self.accepting:
raise
def test_config0_using_cp_ok(self):
# A simple config file which overrides the default settings.
with captured_stdout() as output:
file = io.StringIO(textwrap.dedent(self.config0))
cp = configparser.ConfigParser()
cp.read_file(file)
logging.config.fileConfig(cp)
logger = logging.getLogger()
# Won't output anything
logger.info(self.next_message())
# Outputs a message
logger.error(self.next_message())
self.assert_log_lines([
('ERROR', '2'),
], stream=output)
# Original logger output is empty.
self.assert_log_lines([])
def test_noserver(self):
# Avoid timing-related failures due to SocketHandler's own hard-wired
# one-second timeout on socket.create_connection() (issue #16264).
self.sock_hdlr.retryStart = 2.5
# Kill the server
self.server.stop(2.0)
# The logging call should try to connect, which should fail
try:
raise RuntimeError('Deliberate mistake')
except RuntimeError:
self.root_logger.exception('Never sent')
self.root_logger.error('Never sent, either')
now = time.time()
self.assertGreater(self.sock_hdlr.retryTime, now)
time.sleep(self.sock_hdlr.retryTime - now + 0.001)
self.root_logger.error('Nor this')
def test_output(self):
# The log message sent to the SysLogHandler is properly received.
logger = logging.getLogger("slh")
logger.error("sp\xe4m")
self.handled.wait()
self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
self.handled.clear()
self.sl_hdlr.append_nul = False
logger.error("sp\xe4m")
self.handled.wait()
self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
self.handled.clear()
self.sl_hdlr.ident = "h\xe4m-"
logger.error("sp\xe4m")
self.handled.wait()
self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
def test_config_10_ok(self):
with captured_stdout() as output:
self.apply_config(self.config10)
logger = logging.getLogger("compiler.parser")
logger.warning(self.next_message())
logger = logging.getLogger('compiler')
#Not output, because filtered
logger.warning(self.next_message())
logger = logging.getLogger('compiler.lexer')
#Not output, because filtered
logger.warning(self.next_message())
logger = logging.getLogger("compiler.parser.codegen")
#Output, as not filtered
logger.error(self.next_message())
self.assert_log_lines([
('WARNING', '1'),
('ERROR', '4'),
], stream=output)
def test_listen_config_10_ok(self):
with captured_stdout() as output:
self.setup_via_listener(json.dumps(self.config10))
logger = logging.getLogger("compiler.parser")
logger.warning(self.next_message())
logger = logging.getLogger('compiler')
#Not output, because filtered
logger.warning(self.next_message())
logger = logging.getLogger('compiler.lexer')
#Not output, because filtered
logger.warning(self.next_message())
logger = logging.getLogger("compiler.parser.codegen")
#Output, as not filtered
logger.error(self.next_message())
self.assert_log_lines([
('WARNING', '1'),
('ERROR', '4'),
], stream=output)
def unlock_and_close(self):
"""Close and unlock the file using the win32 primitive."""
if self._locked:
try:
hfile = win32file._get_osfhandle(self._fh.fileno())
win32file.UnlockFileEx(hfile, 0, -0x10000,
pywintypes.OVERLAPPED())
except pywintypes.error as e:
if e[0] != _Win32Opener.FILE_ALREADY_UNLOCKED_ERROR:
raise
self._locked = False
if self._fh:
self._fh.close()
def unlock_and_close(self):
"""Close and unlock the file using the win32 primitive."""
if self._locked:
try:
hfile = win32file._get_osfhandle(self._fh.fileno())
win32file.UnlockFileEx(hfile, 0, -0x10000, pywintypes.OVERLAPPED())
except pywintypes.error, e:
if e[0] != _Win32Opener.FILE_ALREADY_UNLOCKED_ERROR:
raise
self._locked = False
if self._fh:
self._fh.close()
def start_spectate(self):
if self.spectate is None:
self.spectate = SpectateThread()
self.spectate.error.connect(self.spectate_error, Qt.QueuedConnection)
self.spectate.start()
def spectate_error(self, error):
error_box = QMessageBox(self)
error_box.setIcon(QMessageBox.Critical)
error_box.setText(error)
error_box.show()
self.stop_spectate()