def command(self, op=None, data=None, chk=0):
if op is not None:
pkt = struct.pack('<BBHI', 0x00, op, len(data), chk) + data
self.write(pkt)
# tries to get a response until that response has the
# same operation as the request or a retries limit has
# exceeded. This is needed for some esp8266s that
# reply with more sync responses than expected.
for retry in xrange(100):
p = self.read()
if len(p) < 8:
continue
(resp, op_ret, len_ret, val) = struct.unpack('<BBHI', p[:8])
if resp != 1:
continue
body = p[8:]
if op is None or op_ret == op:
return val, body # valid response received
raise FatalError("Response doesn't match request")
python类read()的实例源码
def __init__(self, load_file=None):
super(ESPFirmwareImage, self).__init__()
self.flash_mode = 0
self.flash_size_freq = 0
self.version = 1
if load_file is not None:
(magic, segments, self.flash_mode, self.flash_size_freq, self.entrypoint) = struct.unpack('<BBBBI', load_file.read(8))
# some sanity check
if magic != ESPROM.ESP_IMAGE_MAGIC or segments > 16:
raise FatalError('Invalid firmware image magic=%d segments=%d' % (magic, segments))
for i in xrange(segments):
self.load_segment(load_file)
self.checksum = self.read_checksum(load_file)
def flash_digest(self, addr, length, digest_block_size=0):
self._esp.write(struct.pack('<B', self.CMD_FLASH_DIGEST))
self._esp.write(struct.pack('<III', addr, length, digest_block_size))
digests = []
while True:
p = self._esp.read()
if len(p) == 16:
digests.append(p)
elif len(p) == 1:
status_code = struct.unpack('<B', p)[0]
if status_code != 0:
raise FatalError('Write failure, status: %x' % status_code)
break
else:
raise FatalError('Unexpected packet: %s' % hexify(p))
return digests[-1], digests[:-1]
def __checkType__(self, serial):
byte = serial.read().decode("utf-8")
print(byte)
if byte == "1":
print("??? ???? ?? : " + serial.port)
serial.write(b'0')
self.threads["Sensor"] = SensorThread(serial)
return self.threads["Sensor"]
elif byte == "2" :
print("??? ???? ?? : " + serial.port)
serial.write(b'0')
self.threads["Control"] = ControlThread(serial)
return self.threads["Control"]
else :
print("? ? ?? ???? : " + serial.port)
serial.write(b'1')
return None
#?? ???
def detect_chip(port=DEFAULT_PORT, baud=ESP_ROM_BAUD, connect_mode='default_reset'):
""" Use serial access to detect the chip type.
We use the UART's datecode register for this, it's mapped at
the same address on ESP8266 & ESP32 so we can use one
memory read and compare to the datecode register for each chip
type.
This routine automatically performs ESPLoader.connect() (passing
connect_mode parameter) as part of querying the chip.
"""
detect_port = ESPLoader(port, baud)
detect_port.connect(connect_mode)
print('Detecting chip type...', end='')
sys.stdout.flush()
date_reg = detect_port.read_reg(ESPLoader.UART_DATA_REG_ADDR)
for cls in [ESP8266ROM, ESP32ROM]:
if date_reg == cls.DATE_REG_VALUE:
# don't connect a second time
inst = cls(detect_port._port, baud)
print(' %s' % inst.CHIP_NAME)
return inst
print('')
raise FatalError("Unexpected UART datecode value 0x%08x. Failed to autodetect chip type." % date_reg)
def command(self, op=None, data=b"", chk=0, wait_response=True):
if op is not None:
pkt = struct.pack(b'<BBHI', 0x00, op, len(data), chk) + data
self.write(pkt)
if not wait_response:
return
# tries to get a response until that response has the
# same operation as the request or a retries limit has
# exceeded. This is needed for some esp8266s that
# reply with more sync responses than expected.
for retry in range(100):
p = self.read()
if len(p) < 8:
continue
(resp, op_ret, len_ret, val) = struct.unpack('<BBHI', p[:8])
if resp != 1:
continue
data = p[8:]
if op is None or op_ret == op:
return val, data
raise FatalError("Response doesn't match request")
def LoadFirmwareImage(chip, filename):
""" Load a firmware image. Can be for ESP8266 or ESP32. ESP8266 images will be examined to determine if they are
original ROM firmware images (ESPFirmwareImage) or "v2" OTA bootloader images.
Returns a BaseFirmwareImage subclass, either ESPFirmwareImage (v1) or OTAFirmwareImage (v2).
"""
with open(filename, 'rb') as f:
if chip == 'esp32':
return ESP32FirmwareImage(f)
else: # Otherwise, ESP8266 so look at magic to determine the image type
magic = ord(f.read(1))
f.seek(0)
if magic == ESPLoader.ESP_IMAGE_MAGIC:
return ESPFirmwareImage(f)
elif magic == ESPBOOTLOADER.IMAGE_V2_MAGIC:
return OTAFirmwareImage(f)
else:
raise FatalError("Invalid image magic number: %d" % magic)
def __init__(self, load_file=None):
super(ESP32FirmwareImage, self).__init__()
self.flash_mode = 0
self.flash_size_freq = 0
self.version = 1
if load_file is not None:
segments = self.load_common_header(load_file, ESPLoader.ESP_IMAGE_MAGIC)
additional_header = list(struct.unpack(self.EXTENDED_HEADER_STRUCT_FMT, load_file.read(16)))
# check these bytes are unused
if additional_header != self.EXTENDED_HEADER:
print("WARNING: ESP32 image header contains unknown flags. Possibly this image is from a different version of esptool.py")
for _ in range(segments):
self.load_segment(load_file)
self.checksum = self.read_checksum(load_file)
def _read_elf_file(self, f):
# read the ELF file header
LEN_FILE_HEADER = 0x34
try:
(ident,_type,machine,_version,
self.entrypoint,_phoff,shoff,_flags,
_ehsize, _phentsize,_phnum, shentsize,
shnum,shstrndx) = struct.unpack("<16sHHLLLLLHHHHHH", f.read(LEN_FILE_HEADER))
except struct.error as e:
raise FatalError("Failed to read a valid ELF header from %s: %s" % (self.name, e))
if byte(ident, 0) != 0x7f or ident[1:4] != b'ELF':
raise FatalError("%s has invalid ELF magic header" % self.name)
if machine != 0x5e:
raise FatalError("%s does not appear to be an Xtensa ELF file. e_machine=%04x" % (self.name, machine))
if shentsize != self.LEN_SEC_HEADER:
raise FatalError("%s has unexpected section header entry size 0x%x (not 0x28)" % (self.name, shentsize, self.LEN_SEC_HEADER))
if shnum == 0:
raise FatalError("%s has 0 section headers" % (self.name))
self._read_sections(f, shoff, shnum, shstrndx)
def expand_file_arguments():
""" Any argument starting with "@" gets replaced with all values read from a text file.
Text file arguments can be split by newline or by space.
Values are added "as-is", as if they were specified in this order on the command line.
"""
new_args = []
expanded = False
for arg in sys.argv:
if arg.startswith("@"):
expanded = True
with open(arg[1:],"r") as f:
for line in f.readlines():
new_args += shlex.split(line)
else:
new_args.append(arg)
if expanded:
print("esptool.py %s" % (" ".join(new_args[1:])))
sys.argv = new_args
def read(self):
return next(self._slip_reader)
def read_reg(self, addr):
# we don't call check_command here because read_reg() function is called
# when detecting chip type, and the way we check for success (STATUS_BYTES_LENGTH) is different
# for different chip types (!)
val, data = self.command(self.ESP_READ_REG, struct.pack('<I', addr))
if byte(data, 0) != 0:
raise FatalError.WithResult("Failed to read register address %08x" % addr, data)
return val
def run_stub(self, stub=None):
if stub is None:
if self.IS_STUB:
raise FatalError("Not possible for a stub to load another stub (memory likely to overlap.)")
stub = self.STUB_CODE
# Upload
print("Uploading stub...")
for field in ['text', 'data']:
if field in stub:
offs = stub[field + "_start"]
length = len(stub[field])
blocks = (length + self.ESP_RAM_BLOCK - 1) // self.ESP_RAM_BLOCK
self.mem_begin(length, blocks, self.ESP_RAM_BLOCK, offs)
for seq in range(blocks):
from_offs = seq * self.ESP_RAM_BLOCK
to_offs = from_offs + self.ESP_RAM_BLOCK
self.mem_block(stub[field][from_offs:to_offs], seq)
print("Running stub...")
self.mem_finish(stub['entry'])
p = self.read()
if p != b'OHAI':
raise FatalError("Failed to start stub. Unexpected response: %s" % p)
print("Stub running...")
return self.STUB_CLASS(self)
def load_common_header(self, load_file, expected_magic):
(magic, segments, self.flash_mode, self.flash_size_freq, self.entrypoint) = struct.unpack('<BBBBI', load_file.read(8))
if magic != expected_magic or segments > 16:
raise FatalError('Invalid firmware image magic=%d segments=%d' % (magic, segments))
return segments
def load_segment(self, f, is_irom_segment=False):
""" Load the next segment from the image file """
file_offs = f.tell()
(offset, size) = struct.unpack('<II', f.read(8))
self.warn_if_unusual_segment(offset, size, is_irom_segment)
segment_data = f.read(size)
if len(segment_data) < size:
raise FatalError('End of file reading segment 0x%x, length %d (actual length %d)' % (offset, size, len(segment_data)))
segment = ImageSegment(offset, segment_data, file_offs)
self.segments.append(segment)
return segment
def read_checksum(self, f):
""" Return ESPLoader checksum from end of just-read image """
# Skip the padding. The checksum is stored in the last byte so that the
# file is a multiple of 16 bytes.
align_file_position(f, 16)
return ord(f.read(1))
def _read_sections(self, f, section_header_offs, shstrndx):
f.seek(section_header_offs)
section_header = f.read()
LEN_SEC_HEADER = 0x28
if len(section_header) == 0:
raise FatalError("No section header found at offset %04x in ELF file." % section_header_offs)
if len(section_header) % LEN_SEC_HEADER != 0:
print('WARNING: Unexpected ELF section header length %04x is not mod-%02x' % (len(section_header),LEN_SEC_HEADER))
# walk through the section header and extract all sections
section_header_offsets = range(0, len(section_header), LEN_SEC_HEADER)
def read_section_header(offs):
name_offs,sec_type,_flags,lma,sec_offs,size = struct.unpack_from("<LLLLLL", section_header[offs:])
return (name_offs, sec_type, lma, size, sec_offs)
all_sections = [read_section_header(offs) for offs in section_header_offsets]
prog_sections = [s for s in all_sections if s[1] == ELFFile.SEC_TYPE_PROGBITS]
# search for the string table section
if not shstrndx * LEN_SEC_HEADER in section_header_offsets:
raise FatalError("ELF file has no STRTAB section at shstrndx %d" % shstrndx)
_,sec_type,_,sec_size,sec_offs = read_section_header(shstrndx * LEN_SEC_HEADER)
if sec_type != ELFFile.SEC_TYPE_STRTAB:
print('WARNING: ELF file has incorrect STRTAB section type 0x%02x' % sec_type)
f.seek(sec_offs)
string_table = f.read(sec_size)
# build the real list of ELFSections by reading the actual section names from the
# string table section, and actual data for each section from the ELF file itself
def lookup_string(offs):
raw = string_table[offs:]
return raw[:raw.index(b'\x00')]
def read_data(offs,size):
f.seek(offs)
return f.read(size)
prog_sections = [ELFSection(lookup_string(n_offs), lma, read_data(offs, size)) for (n_offs, _type, lma, size, offs) in prog_sections
if lma != 0]
self.sections = prog_sections
def slip_reader(port):
"""Generator to read SLIP packets from a serial port.
Yields one full SLIP packet at a time, raises exception on timeout or invalid data.
Designed to avoid too many calls to serial.read(1), which can bog
down on slow systems.
"""
partial_packet = None
in_escape = False
while True:
waiting = port.inWaiting()
read_bytes = port.read(1 if waiting == 0 else waiting)
if read_bytes == b'':
raise FatalError("Timed out waiting for packet %s" % ("header" if partial_packet is None else "content"))
for b in read_bytes:
if type(b) is int:
b = bytes([b]) # python 2/3 compat
if partial_packet is None: # waiting for packet header
if b == b'\xc0':
partial_packet = b""
else:
raise FatalError('Invalid head of packet (%r)' % b)
elif in_escape: # part-way through escape sequence
in_escape = False
if b == b'\xdc':
partial_packet += b'\xc0'
elif b == b'\xdd':
partial_packet += b'\xdb'
else:
raise FatalError('Invalid SLIP escape (%r%r)' % (b'\xdb', b))
elif b == b'\xdb': # start of escape sequence
in_escape = True
elif b == b'\xc0': # end of packet
yield partial_packet
partial_packet = None
else: # normal byte in packet
partial_packet += b
def dump_mem(esp, args):
f = open(args.filename, 'wb')
for i in range(args.size // 4):
d = esp.read_reg(args.address + (i * 4))
f.write(struct.pack(b'<I', d))
if f.tell() % 1024 == 0:
print('\r%d bytes read... (%d %%)' % (f.tell(),
f.tell() * 100 // args.size),
end=' ')
sys.stdout.flush()
print('Done!')
def make_image(args):
image = ESPFirmwareImage()
if len(args.segfile) == 0:
raise FatalError('No segments specified')
if len(args.segfile) != len(args.segaddr):
raise FatalError('Number of specified files does not match number of specified addresses')
for (seg, addr) in zip(args.segfile, args.segaddr):
data = open(seg, 'rb').read()
image.segments.append(ImageSegment(addr, data))
image.entrypoint = args.entrypoint
image.save(args.output)