def lock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)
python类get_osfhandle()的实例源码
def unlock(f):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)
def lock(file, flags):
hfile = msvcrt.get_osfhandle(file.fileno())
overlapped = OVERLAPPED()
LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, ctypes.byref(overlapped))
def unlock(file):
hfile = msvcrt.get_osfhandle(file.fileno())
overlapped = OVERLAPPED()
UnlockFileEx(hfile, 0, 0, 0xFFFF0000, ctypes.byref(overlapped))
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: http://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def lock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)
def unlock(f):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)
def _lock_file(f, exclusive):
overlapped = OVERLAPPED()
overlapped.Offset = 0
overlapped.OffsetHigh = 0
overlapped.hEvent = 0
f._lock_file_overlapped_p = ctypes.pointer(overlapped)
handle = msvcrt.get_osfhandle(f.fileno())
if not LockFileEx(handle, 0x2 if exclusive else 0x0, 0,
whole_low, whole_high, f._lock_file_overlapped_p):
raise OSError('Locking file failed: %r' % ctypes.FormatError())
def _unlock_file(f):
assert f._lock_file_overlapped_p
handle = msvcrt.get_osfhandle(f.fileno())
if not UnlockFileEx(handle, 0,
whole_low, whole_high, f._lock_file_overlapped_p):
raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
def _lock_file(f, exclusive):
overlapped = OVERLAPPED()
overlapped.Offset = 0
overlapped.OffsetHigh = 0
overlapped.hEvent = 0
f._lock_file_overlapped_p = ctypes.pointer(overlapped)
handle = msvcrt.get_osfhandle(f.fileno())
if not LockFileEx(handle, 0x2 if exclusive else 0x0, 0,
whole_low, whole_high, f._lock_file_overlapped_p):
raise OSError('Locking file failed: %r' % ctypes.FormatError())
def _unlock_file(f):
assert f._lock_file_overlapped_p
handle = msvcrt.get_osfhandle(f.fileno())
if not UnlockFileEx(handle, 0,
whole_low, whole_high, f._lock_file_overlapped_p):
raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
def _lock_file(f, exclusive):
overlapped = OVERLAPPED()
overlapped.Offset = 0
overlapped.OffsetHigh = 0
overlapped.hEvent = 0
f._lock_file_overlapped_p = ctypes.pointer(overlapped)
handle = msvcrt.get_osfhandle(f.fileno())
if not LockFileEx(handle, 0x2 if exclusive else 0x0, 0,
whole_low, whole_high, f._lock_file_overlapped_p):
raise OSError('Locking file failed: %r' % ctypes.FormatError())
def _unlock_file(f):
assert f._lock_file_overlapped_p
handle = msvcrt.get_osfhandle(f.fileno())
if not UnlockFileEx(handle, 0,
whole_low, whole_high, f._lock_file_overlapped_p):
raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
def add_reader(self, fd, callback):
" Start watching the file descriptor for read availability. "
h = msvcrt.get_osfhandle(fd)
self._read_fds[h] = callback
def remove_reader(self, fd):
" Stop watching the file descriptor for read availability. "
h = msvcrt.get_osfhandle(fd)
if h in self._read_fds:
del self._read_fds[h]
def __init__(self, recognize_paste=True):
self._fdcon = None
self.recognize_paste = recognize_paste
# When stdin is a tty, use that handle, otherwise, create a handle from
# CONIN$.
if sys.stdin.isatty():
self.handle = windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)
else:
self._fdcon = os.open('CONIN$', os.O_RDWR | os.O_BINARY)
self.handle = msvcrt.get_osfhandle(self._fdcon)
def _stdin_raw_nonblock(self):
"""Use the raw Win32 handle of sys.stdin to do non-blocking reads"""
# WARNING: This is experimental, and produces inconsistent results.
# It's possible for the handle not to be appropriate for use
# with WaitForSingleObject, among other things.
handle = msvcrt.get_osfhandle(sys.stdin.fileno())
result = WaitForSingleObject(handle, 100)
if result == WAIT_FAILED:
raise ctypes.WinError()
elif result == WAIT_TIMEOUT:
print(".", end='')
return None
else:
data = ctypes.create_string_buffer(256)
bytesRead = DWORD(0)
print('?', end='')
if not ReadFile(handle, data, 256,
ctypes.byref(bytesRead), None):
raise ctypes.WinError()
# This ensures the non-blocking works with an actual console
# Not checking the error, so the processing will still work with
# other handle types
FlushConsoleInputBuffer(handle)
data = data.value
data = data.replace('\r\n', '\n')
data = data.replace('\r', '\n')
print(repr(data) + " ", end='')
return data
def _run_stdio(self):
"""Runs the process using the system standard I/O.
IMPORTANT: stdin needs to be asynchronous, so the Python
sys.stdin object is not used. Instead,
msvcrt.kbhit/getwch are used asynchronously.
"""
# Disable Line and Echo mode
#lpMode = DWORD()
#handle = msvcrt.get_osfhandle(sys.stdin.fileno())
#if GetConsoleMode(handle, ctypes.byref(lpMode)):
# set_console_mode = True
# if not SetConsoleMode(handle, lpMode.value &
# ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT)):
# raise ctypes.WinError()
if self.mergeout:
return self.run(stdout_func = self._stdout_raw,
stdin_func = self._stdin_raw_block)
else:
return self.run(stdout_func = self._stdout_raw,
stdin_func = self._stdin_raw_block,
stderr_func = self._stderr_raw)
# Restore the previous console mode
#if set_console_mode:
# if not SetConsoleMode(handle, lpMode.value):
# raise ctypes.WinError()
def add_reader(self, fd, callback):
" Start watching the file descriptor for read availability. "
h = msvcrt.get_osfhandle(fd)
self._read_fds[h] = callback
def remove_reader(self, fd):
" Stop watching the file descriptor for read availability. "
h = msvcrt.get_osfhandle(fd)
if h in self._read_fds:
del self._read_fds[h]