def read_texture_names(self):
"""Iterate through all brush textures in the map."""
tex_data = self.get_lump(BSP_LUMPS.TEXDATA_STRING_DATA)
tex_table = self.get_lump(BSP_LUMPS.TEXDATA_STRING_TABLE)
# tex_table is an array of int offsets into tex_data. tex_data is a
# null-terminated block of strings.
table_offsets = struct.unpack(
# The number of ints + i, for the repetitions in the struct.
str(len(tex_table) // struct.calcsize('i')) + 'i',
tex_table,
)
for off in table_offsets:
# Look for the NULL at the end - strings are limited to 128 chars.
str_off = 0
for str_off in range(off, off + 128):
if tex_data[str_off] == 0:
yield tex_data[off: str_off].decode('ascii')
break
else:
# Reached the 128 char limit without finding a null.
raise ValueError('Bad string at', off, 'in BSP! ("{}")'.format(
tex_data[off:str_off]
))
python类calcsize()的实例源码
def get_size(self):
# XXX: even more hackish, we need a better way
if self.type.is_void():
return 0
elif self.type.is_bool():
# not strictly correct, but we cannot return 1/8
return 0
import struct
return struct.calcsize(self.get_fmt())
# =============================================
# hand-written union subclasses
# =============================================
#
# As of now, the compiler is not capable of generating different subclasses
# for each union tag. In the meantime, write it by hand
def check(self, fmt, value):
from random import randrange
# build a buffer which is surely big enough to contain what we need
# and check:
# 1) that we correctly write the bytes we expect
# 2) that we do NOT write outside the bounds
#
pattern = [six.int2byte(randrange(256)) for _ in range(256)]
pattern = b''.join(pattern)
buf = bytearray(pattern)
buf2 = bytearray(pattern)
offset = 16
pack_into(ord(fmt), buf, offset, value)
struct.pack_into(fmt, buf2, offset, value)
assert buf == buf2
#
# check that it raises if it's out of bound
out_of_bound = 256-struct.calcsize(fmt)+1
pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
def _get_launcher(self, kind):
if struct.calcsize('P') == 8: # 64-bit
bits = '64'
else:
bits = '32'
name = '%s%s.exe' % (kind, bits)
# Issue 31: don't hardcode an absolute package name, but
# determine it relative to the current package
distlib_package = __name__.rsplit('.', 1)[0]
result = finder(distlib_package).find(name).bytes
return result
# Public API follows
def is_64bit():
return struct.calcsize("P") == 8
def _process_pHYs(self, data):
# http://www.w3.org/TR/PNG/#11pHYs
self.phys = data
fmt = "!LLB"
if len(data) != struct.calcsize(fmt):
raise FormatError("pHYs chunk has incorrect length.")
self.x_pixels_per_unit, self.y_pixels_per_unit, unit = struct.unpack(fmt,data)
self.unit_is_meter = bool(unit)
def __unpack__(self, type_, buf, _size=None):
fmt = self.endian + type_
size = struct.calcsize(fmt) if _size is None else _size
try:
unpacked = struct.unpack(fmt, buf[:size]), buf[size:]
except struct.error as exc:
raise_from(UnpackError("Unable to unpack structure"), exc)
else:
return unpacked
def __len__(self):
fmt = ''
more_len = 0
for field in self._field_names_:
fmt_ = self._field_format_[field]
if isinstance(fmt_, StructArray):
fmt += fmt_.get_struct()
elif isinstance(fmt_, type) and issubclass(fmt_, Struct):
more_len = len(fmt)
elif fmt_ != 'variable':
fmt += fmt_
hdr_len = struct.calcsize(fmt) + more_len
if hasattr(self, 'data'):
hdr_len += len(self.data)
return hdr_len
def _get_launcher(self, kind):
if struct.calcsize('P') == 8: # 64-bit
bits = '64'
else:
bits = '32'
name = '%s%s.exe' % (kind, bits)
# Issue 31: don't hardcode an absolute package name, but
# determine it relative to the current package
distlib_package = __name__.rsplit('.', 1)[0]
result = finder(distlib_package).find(name).bytes
return result
# Public API follows
def is_64bit():
return struct.calcsize("P") == 8
def pack_main_header_keywords(hdr):
"""
Replace the 'keywords' field of the given BLUE header dictionary
with the X-Midas packed (str) form of the main header keywords.
The order of the key value pairs is indeterminate.
In packed form, keys are separated from values by a single '=',
key-value pairs are separated from one another by a single '\0'
and all values are stringized using str(). Hence, each key value
pair takes up keylength + stringized value length + 2 characters.
If the resulting packed string is longer than the max allowed for
the BLUE header main keywords (96 characters) the string is
truncated. If no 'keywords' field is present or if it is an empty
dict, then the keyword fields are updated to represent an empty
main header keywords section.
"""
keydict = hdr.get('keywords', {})
if keydict:
hdr['keywords'] = '\0'.join([k + '=' + str(v)
for k,v in keydict.items()]) + '\0'
hdr['keylength'] = min(len(hdr['keywords']),
struct.calcsize(_bluestructs['HEADER']
['lookups']['keywords'][1]))
if hdr['keylength'] < len(hdr['keywords']):
print "WARNING: Main header keywords truncated"
else:
hdr['keywords'] = '\0'
hdr['keylength'] = 0
def _getCaps(self):
bits = struct.calcsize(self.datatype) * 8
if self.datatype in ('f', 'd'):
return gst.Caps('audio/x-raw-float,endianness=%s,width=%d,rate=%d,channels=1' % (self.ENDIANNESS, bits, self.sample_rate))
else:
# In struct module, unsigned types are uppercase, signed are lower
if self.datatype.isupper():
signed = 'false'
else:
signed = 'true'
return gst.Caps('audio/x-raw-int,endianness=%s,signed=%s,width=%d,depth=%d,rate=%d,channels=1' % (self.ENDIANNESS, signed, bits, bits, self.sample_rate))
def __init__(self, name, PortTypeClass, PortTransferType=TRANSFER_TYPE, logger=None, noData=None ):
self.name = name
self.logger = logger
self.PortType = PortTypeClass
self.PortTransferType=PortTransferType
self.outConnections = {} # key=connectionId, value=port
self.stats = OutStats(self.name, PortTransferType )
self.port_lock = threading.Lock()
self.sriDict = {} # key=streamID value=SriMapStruct
self.filterTable = []
if noData==None:
self.noData = []
else:
self.noData = noData
# Determine maximum transfer size in advance
self.byteSize = 1
if self.PortTransferType:
self.byteSize = struct.calcsize(PortTransferType)
# Multiply by some number < 1 to leave some margin for the CORBA header
self.maxSamplesPerPush = int(MAX_TRANSFER_BYTES*.9)/self.byteSize
# Make sure maxSamplesPerPush is even so that complex data case is handled properly
if self.maxSamplesPerPush%2 != 0:
self.maxSamplesPerPush = self.maxSamplesPerPush - 1
if self.logger == None:
self.logger = logging.getLogger("redhawk.bulkio.outport."+name)
if self.logger:
self.logger.debug('bulkio::OutPort CTOR port:' + str(self.name))
def __init__(self, name, element_type ):
self.enabled = True
self.bitSize = struct.calcsize(element_type) * 8
self.historyWindow = 10
self.receivedStatistics = {}
self.name = name
self.receivedStatistics_idx = {}
self.activeStreamIDs = []
self.connection_errors={}
def __init__(self, name, PortTypeClass, PortTransferType=TRANSFER_TYPE, logger=None, noData=None ):
self.name = name
self.logger = logger
self.PortType = PortTypeClass
self.PortTransferType=PortTransferType
self.outConnections = {} # key=connectionId, value=port
self.stats = OutStats(self.name, PortTransferType )
self.port_lock = threading.Lock()
self.sriDict = {} # key=streamID value=SriMapStruct
self.filterTable = []
if noData==None:
self.noData = []
else:
self.noData = noData
# Determine maximum transfer size in advance
self.byteSize = 1
if self.PortTransferType:
self.byteSize = struct.calcsize(PortTransferType)
# Multiply by some number < 1 to leave some margin for the CORBA header
self.maxSamplesPerPush = int(MAX_TRANSFER_BYTES*.9)/self.byteSize
# Make sure maxSamplesPerPush is even so that complex data case is handled properly
if self.maxSamplesPerPush%2 != 0:
self.maxSamplesPerPush = self.maxSamplesPerPush - 1
if self.logger == None:
self.logger = logging.getLogger("redhawk.bulkio.outport."+name)
if self.logger:
self.logger.debug('bulkio::OutPort CTOR port:' + str(self.name))
def __init__(self, name, element_type ):
self.enabled = True
self.flushTime = None
self.historyWindow = 10
self.receivedStatistics = []
self.name = name
self.receivedStatistics_idx = 0
self.bitSize = struct.calcsize(element_type) * 8
self.activeStreamIDs = []
for i in range(self.historyWindow):
self.receivedStatistics.append(self.statPoint())
self.runningStats = None
def __init__(self, name, element_type ):
self.enabled = True
self.bitSize = struct.calcsize(element_type) * 8
self.historyWindow = 10
self.receivedStatistics = {}
self.name = name
self.receivedStatistics_idx = {}
self.activeStreamIDs = []
self.connection_errors={}
def FileHeader(self):
"""Return the per-file header as a string."""
dt = self.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
if self.flag_bits & 0x08:
# Set these to zero because we write them after the file data
CRC = compress_size = file_size = 0
else:
CRC = self.CRC
compress_size = self.compress_size
file_size = self.file_size
extra = self.extra
if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT:
# File is larger than what fits into a 4 byte integer,
# fall back to the ZIP64 extension
fmt = '<HHQQ'
extra = extra + struct.pack(fmt,
1, struct.calcsize(fmt)-4, file_size, compress_size)
file_size = 0xffffffff
compress_size = 0xffffffff
self.extract_version = max(45, self.extract_version)
self.create_version = max(45, self.extract_version)
filename, flag_bits = self._encodeFilenameFlags()
header = struct.pack(structFileHeader, stringFileHeader,
self.extract_version, self.reserved, flag_bits,
self.compress_type, dostime, dosdate, CRC,
compress_size, file_size,
len(filename), len(extra))
return header + filename + extra
def _findSoname_ldconfig(name):
import struct
if struct.calcsize('l') == 4:
machine = os.uname()[4] + '-32'
else:
machine = os.uname()[4] + '-64'
mach_map = {
'x86_64-64': 'libc6,x86-64',
'ppc64-64': 'libc6,64bit',
'sparc64-64': 'libc6,64bit',
's390x-64': 'libc6,64bit',
'ia64-64': 'libc6,IA-64',
}
abi_type = mach_map.get(machine, 'libc6')
# XXX assuming GLIBC's ldconfig (with option -p)
expr = r'(\S+)\s+\((%s(?:, OS ABI:[^\)]*)?)\)[^/]*(/[^\(\)\s]*lib%s\.[^\(\)\s]*)' \
% (abi_type, re.escape(name))
f = os.popen('/sbin/ldconfig -p 2>/dev/null')
try:
data = f.read()
finally:
f.close()
res = re.search(expr, data)
if not res:
return None
return res.group(1)
def _check_size(typ, typecode=None):
# Check if sizeof(ctypes_type) against struct.calcsize. This
# should protect somewhat against a misconfigured libffi.
from struct import calcsize
if typecode is None:
# Most _type_ codes are the same as used in struct
typecode = typ._type_
actual, required = sizeof(typ), calcsize(typecode)
if actual != required:
raise SystemError("sizeof(%s) wrong: %d instead of %d" % \
(typ, actual, required))