def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
python类c_char()的实例源码
def recv_into(self, buffer, nbytes=None):
# Read short on EOF.
if self._closed:
return 0
if nbytes is None:
nbytes = len(buffer)
buffer = (ctypes.c_char * nbytes).from_buffer(buffer)
processed_bytes = ctypes.c_size_t(0)
with self._raise_on_error():
result = Security.SSLRead(
self.context, buffer, nbytes, ctypes.byref(processed_bytes)
)
# There are some result codes that we want to treat as "not always
# errors". Specifically, those are errSSLWouldBlock,
# errSSLClosedGraceful, and errSSLClosedNoNotify.
if (result == SecurityConst.errSSLWouldBlock):
# If we didn't process any bytes, then this was just a time out.
# However, we can get errSSLWouldBlock in situations when we *did*
# read some data, and in those cases we should just read "short"
# and return.
if processed_bytes.value == 0:
# Timed out, no data read.
raise socket.timeout("recv timed out")
elif result in (SecurityConst.errSSLClosedGraceful, SecurityConst.errSSLClosedNoNotify):
# The remote peer has closed this connection. We should do so as
# well. Note that we don't actually return here because in
# principle this could actually be fired along with return data.
# It's unlikely though.
self.close()
else:
_assert_no_error(result)
# Ok, we read and probably succeeded. We should return whatever data
# was actually read.
return processed_bytes.value
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def __init__(self, array):
CTypeArray.__init__(self, c_char, array)
def recv_into(self, buffer, nbytes=None):
# Read short on EOF.
if self._closed:
return 0
if nbytes is None:
nbytes = len(buffer)
buffer = (ctypes.c_char * nbytes).from_buffer(buffer)
processed_bytes = ctypes.c_size_t(0)
with self._raise_on_error():
result = Security.SSLRead(
self.context, buffer, nbytes, ctypes.byref(processed_bytes)
)
# There are some result codes that we want to treat as "not always
# errors". Specifically, those are errSSLWouldBlock,
# errSSLClosedGraceful, and errSSLClosedNoNotify.
if (result == SecurityConst.errSSLWouldBlock):
# If we didn't process any bytes, then this was just a time out.
# However, we can get errSSLWouldBlock in situations when we *did*
# read some data, and in those cases we should just read "short"
# and return.
if processed_bytes.value == 0:
# Timed out, no data read.
raise socket.timeout("recv timed out")
elif result in (SecurityConst.errSSLClosedGraceful, SecurityConst.errSSLClosedNoNotify):
# The remote peer has closed this connection. We should do so as
# well. Note that we don't actually return here because in
# principle this could actually be fired along with return data.
# It's unlikely though.
self.close()
else:
_assert_no_error(result)
# Ok, we read and probably succeeded. We should return whatever data
# was actually read.
return processed_bytes.value
def create_shader(self, strings, shader_type):
count = len(strings)
# if we have no source code, ignore this shader
if count < 1:
return
# create the shader handle
shader = glCreateShader(shader_type)
shaderstrings = []
for string in strings:
shaderstrings.append(bytes(string, 'ascii'))
# convert the source strings into a ctypes pointer-to-char array, and
# upload them this is deep, dark, dangerous black magic - don't try
# stuff like this at home!
src = (c_char_p * count)(*shaderstrings)
glShaderSource(shader, count, cast(
pointer(src), POINTER(POINTER(c_char))), None)
# compile the shader
glCompileShader(shader)
temp = c_int(0)
# retrieve the compile status
glGetShaderiv(shader, GL_COMPILE_STATUS, byref(temp))
# if compilation failed, print the log
if not temp:
# retrieve the log length
glGetShaderiv(shader, GL_INFO_LOG_LENGTH, byref(temp))
# create a buffer for the log
buffer = create_string_buffer(temp.value)
# retrieve the log text
glGetShaderInfoLog(shader, temp, None, buffer)
# print the log to the console
print(buffer.value)
else:
# all is well, so attach the shader to the program
glAttachShader(self.handle, shader)
def recv_into(self, buffer, nbytes=None):
# Read short on EOF.
if self._closed:
return 0
if nbytes is None:
nbytes = len(buffer)
buffer = (ctypes.c_char * nbytes).from_buffer(buffer)
processed_bytes = ctypes.c_size_t(0)
with self._raise_on_error():
result = Security.SSLRead(
self.context, buffer, nbytes, ctypes.byref(processed_bytes)
)
# There are some result codes that we want to treat as "not always
# errors". Specifically, those are errSSLWouldBlock,
# errSSLClosedGraceful, and errSSLClosedNoNotify.
if (result == SecurityConst.errSSLWouldBlock):
# If we didn't process any bytes, then this was just a time out.
# However, we can get errSSLWouldBlock in situations when we *did*
# read some data, and in those cases we should just read "short"
# and return.
if processed_bytes.value == 0:
# Timed out, no data read.
raise socket.timeout("recv timed out")
elif result in (SecurityConst.errSSLClosedGraceful, SecurityConst.errSSLClosedNoNotify):
# The remote peer has closed this connection. We should do so as
# well. Note that we don't actually return here because in
# principle this could actually be fired along with return data.
# It's unlikely though.
self.close()
else:
_assert_no_error(result)
# Ok, we read and probably succeeded. We should return whatever data
# was actually read.
return processed_bytes.value
def tobytes(self):
return cast(self.buffer, POINTER(c_char))[0:self._len]
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def recv_into(self, buffer, nbytes=None):
# Read short on EOF.
if self._closed:
return 0
if nbytes is None:
nbytes = len(buffer)
buffer = (ctypes.c_char * nbytes).from_buffer(buffer)
processed_bytes = ctypes.c_size_t(0)
with self._raise_on_error():
result = Security.SSLRead(
self.context, buffer, nbytes, ctypes.byref(processed_bytes)
)
# There are some result codes that we want to treat as "not always
# errors". Specifically, those are errSSLWouldBlock,
# errSSLClosedGraceful, and errSSLClosedNoNotify.
if (result == SecurityConst.errSSLWouldBlock):
# If we didn't process any bytes, then this was just a time out.
# However, we can get errSSLWouldBlock in situations when we *did*
# read some data, and in those cases we should just read "short"
# and return.
if processed_bytes.value == 0:
# Timed out, no data read.
raise socket.timeout("recv timed out")
elif result in (SecurityConst.errSSLClosedGraceful, SecurityConst.errSSLClosedNoNotify):
# The remote peer has closed this connection. We should do so as
# well. Note that we don't actually return here because in
# principle this could actually be fired along with return data.
# It's unlikely though.
self.close()
else:
_assert_no_error(result)
# Ok, we read and probably succeeded. We should return whatever data
# was actually read.
return processed_bytes.value
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def recv_into(self, buffer, nbytes=None):
# Read short on EOF.
if self._closed:
return 0
if nbytes is None:
nbytes = len(buffer)
buffer = (ctypes.c_char * nbytes).from_buffer(buffer)
processed_bytes = ctypes.c_size_t(0)
with self._raise_on_error():
result = Security.SSLRead(
self.context, buffer, nbytes, ctypes.byref(processed_bytes)
)
# There are some result codes that we want to treat as "not always
# errors". Specifically, those are errSSLWouldBlock,
# errSSLClosedGraceful, and errSSLClosedNoNotify.
if (result == SecurityConst.errSSLWouldBlock):
# If we didn't process any bytes, then this was just a time out.
# However, we can get errSSLWouldBlock in situations when we *did*
# read some data, and in those cases we should just read "short"
# and return.
if processed_bytes.value == 0:
# Timed out, no data read.
raise socket.timeout("recv timed out")
elif result in (SecurityConst.errSSLClosedGraceful, SecurityConst.errSSLClosedNoNotify):
# The remote peer has closed this connection. We should do so as
# well. Note that we don't actually return here because in
# principle this could actually be fired along with return data.
# It's unlikely though.
self.close()
else:
_assert_no_error(result)
# Ok, we read and probably succeeded. We should return whatever data
# was actually read.
return processed_bytes.value
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = handles[stream_id]
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def _get_tcp_ipv4_sockets():
size = ctypes.c_uint(0)
try:
winproxy.GetExtendedTcpTable(None, ctypes.byref(size), ulAf=AF_INET)
except winproxy.IphlpapiError:
pass # Allow us to set size to the needed value
buffer = (ctypes.c_char * size.value)()
winproxy.GetExtendedTcpTable(buffer, ctypes.byref(size), ulAf=AF_INET)
t = get_MIB_TCPTABLE_OWNER_PID_from_buffer(buffer)
return list(t.table)
def _get_tcp_ipv6_sockets():
size = ctypes.c_uint(0)
try:
winproxy.GetExtendedTcpTable(None, ctypes.byref(size), ulAf=AF_INET6)
except winproxy.IphlpapiError:
pass # Allow us to set size to the needed value
buffer = (ctypes.c_char * size.value)()
winproxy.GetExtendedTcpTable(buffer, ctypes.byref(size), ulAf=AF_INET6)
t = get_MIB_TCP6TABLE_OWNER_PID_from_buffer(buffer)
return list(t.table)