[core] Add UF2-based uploader

This commit is contained in:
Kuba Szczodrzyński
2022-06-02 23:02:02 +02:00
parent 22d40825bb
commit bdffa7ef53
10 changed files with 346 additions and 31 deletions

View File

@@ -1,6 +1,5 @@
# Copyright (c) Kuba Szczodrzyński 2022-04-20.
import sys
from os.path import join
from SCons.Script import Builder, DefaultEnvironment
@@ -271,31 +270,6 @@ env.Append(
),
)
# Uploader
upload_protocol = env.subst("$UPLOAD_PROTOCOL")
upload_source = ""
upload_actions = []
# from platform-espressif32/builder/main.py
if upload_protocol == "uart":
env.Replace(
UPLOADER=join("$TOOLS_DIR", "rtltool.py"),
UPLOADERFLAGS=[
"--port",
"$UPLOAD_PORT",
"--go", # run firmware after uploading
"wf", # Write a binary file to Flash data
],
UPLOADCMD='"$PYTHONEXE" "$UPLOADER" $UPLOADERFLAGS $FLASH_OTA1_OFFSET "$BUILD_DIR/$IMG_FW"',
)
upload_actions = [
env.VerboseAction(env.AutodetectUploadPort, "Looking for upload port..."),
env.VerboseAction("$UPLOADCMD", "Uploading $IMG_FW"),
]
elif upload_protocol == "custom":
upload_actions = [env.VerboseAction("$UPLOADCMD", "Uploading $IMG_FW")]
else:
sys.stderr.write("Warning! Unknown upload protocol %s\n" % upload_protocol)
# Bootloader library
boot_all = board.get("build.amb_boot_all")
target_boot = env.StaticLibrary(
@@ -325,6 +299,4 @@ env.Replace(
"${BUILD_DIR}/image_${FLASH_OTA2_OFFSET}.ota2.bin",
),
],
# uploader
UPLOAD_ACTIONS=upload_actions,
)

View File

@@ -73,7 +73,9 @@ target_elf = env.BuildProgram()
targets = [target_elf]
if "UF2OTA" in env:
targets.append(env.BuildUF2OTA(target_elf))
target_uf2 = env.BuildUF2OTA(target_elf)
targets.append(target_uf2)
env.AddUF2Uploader(target_uf2)
elif "IMG_FW" in env:
target_fw = env.subst("$IMG_FW")
env.AddPlatformTarget("upload", target_fw, env["UPLOAD_ACTIONS"], "Upload")

View File

@@ -1,5 +1,6 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
import sys
from datetime import datetime
from os.path import basename, join, normpath
@@ -26,6 +27,8 @@ def env_uf2ota(env, *args, **kwargs):
f"lt{lt_version}",
]
output = join("${BUILD_DIR}", "_".join(output)) + ".uf2"
env["UF2OUT"] = output
env["UF2OUT_BASE"] = basename(output)
cmd = [
"@${UF2OTA_PY}",
@@ -44,6 +47,34 @@ def env_uf2ota(env, *args, **kwargs):
env.Execute(" ".join(cmd))
def env_uf2upload(env, target):
protocol = env.subst("${UPLOAD_PROTOCOL}")
actions = []
# from platform-espressif32/builder/main.py
if protocol == "uart":
# upload via UART
env["UPLOADERFLAGS"] = [
"${UF2OUT}",
"uart",
"${UPLOAD_PORT}",
]
actions = [
env.VerboseAction(env.AutodetectUploadPort, "Looking for upload port..."),
]
elif protocol == "custom":
actions = [
env.VerboseAction("${UPLOADCMD}", "Uploading firmware"),
]
else:
sys.stderr.write("Warning! Unknown upload protocol %s\n" % protocol)
return
# add main upload target
env.Replace(UPLOADER="${UF2UPLOAD_PY}", UPLOADCMD="${UPLOADER} ${UPLOADERFLAGS}")
actions.append(env.VerboseAction("${UPLOADCMD}", "Uploading ${UF2OUT_BASE}"))
env.AddPlatformTarget("upload", target, actions, "Upload")
env.Append(
BUILDERS=dict(
BuildUF2OTA=Builder(
@@ -51,3 +82,4 @@ env.Append(
)
)
)
env.AddMethod(env_uf2upload, "AddUF2Uploader")

View File

@@ -33,8 +33,9 @@ def env_add_defaults(env, family_name: str, sdk_name: str):
LDSCRIPT_SDK=board.get("build.ldscript_sdk"),
LDSCRIPT_ARDUINO=board.get("build.ldscript_arduino"),
# Link2Bin tool
LINK2BIN="${PYTHONEXE} ${LT_DIR}/tools/link2bin.py",
UF2OTA_PY="${PYTHONEXE} ${LT_DIR}/tools/uf2ota/uf2ota.py",
LINK2BIN='"${PYTHONEXE}" "${LT_DIR}/tools/link2bin.py"',
UF2OTA_PY='"${PYTHONEXE}" "${LT_DIR}/tools/uf2ota/uf2ota.py"',
UF2UPLOAD_PY='"${PYTHONEXE}" "${LT_DIR}/tools/upload/uf2upload.py"',
)
env.Replace(**vars)
for k, v in vars.items():

View File

@@ -75,6 +75,11 @@ class UF2:
return False
self.family = block.family
if block.block_seq != self.seq:
print(f"Mismatched sequence number ({self.seq} != {block.block_seq}")
return False
self.seq += 1
if block_tags or not block.length:
self.tags.update(block.tags)
if block.length and not block.flags.not_main_flash:

26
tools/upload/binpatch.py Normal file
View File

@@ -0,0 +1,26 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from io import BytesIO
from tools.uf2ota.models import Opcode
from tools.util.intbin import inttole32, letoint, letosint
def binpatch_diff32(data: bytearray, patch: bytes) -> bytearray:
diff = letosint(patch[0:4])
for offs in patch[4:]:
value = letoint(data[offs : offs + 4])
value += diff
data[offs : offs + 4] = inttole32(value)
return data
def binpatch_apply(data: bytearray, binpatch: bytes) -> bytearray:
io = BytesIO(binpatch)
while io.tell() < len(binpatch):
opcode = io.read(1)[0]
length = io.read(1)[0]
bpdata = io.read(length)
if opcode == Opcode.DIFF32:
data = binpatch_diff32(data, bpdata)
return data

157
tools/upload/ctx.py Normal file
View File

@@ -0,0 +1,157 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from datetime import datetime
from io import BytesIO
from typing import Dict, Tuple
from tools.uf2ota.models import Tag
from tools.uf2ota.uf2 import UF2
from tools.upload.binpatch import binpatch_apply
from tools.util.intbin import letoint
from tools.util.obj import get
from tools.util.platform import get_board_manifest
class UploadContext:
uf2: UF2
seq: int = 0
part1: str = None
part2: str = None
has_ota1: bool
has_ota2: bool
board_manifest: dict = None
def __init__(self, uf2: UF2) -> None:
self.uf2 = uf2
self.has_ota1 = uf2.tags.get(Tag.LT_HAS_OTA1, None) == b"\x01"
self.has_ota2 = uf2.tags.get(Tag.LT_HAS_OTA2, None) == b"\x01"
@property
def fw_name(self) -> str:
return self.uf2.tags.get(Tag.FIRMWARE, b"").decode()
@property
def fw_version(self) -> str:
return self.uf2.tags.get(Tag.VERSION, b"").decode()
@property
def lt_version(self) -> str:
return self.uf2.tags.get(Tag.LT_VERSION, b"").decode()
@property
def board(self) -> str:
return self.uf2.tags.get(Tag.BOARD, b"").decode()
@property
def build_date(self) -> datetime:
if Tag.BUILD_DATE not in self.uf2.tags:
return None
return datetime.fromtimestamp(letoint(self.uf2.tags[Tag.BUILD_DATE]))
def get_offset(self, part: str, offs: int) -> int:
if not self.board_manifest:
self.board_manifest = get_board_manifest(self.board)
part = get(self.board_manifest, f"flash.{part}")
(offset, length) = map(lambda x: int(x, 16), part.split("+"))
if offs >= length:
return None
return offset + offs
def read(self, ota_idx: int = 1) -> Tuple[str, int, bytes]:
"""Read next available data block for the specified OTA scheme.
Returns:
Tuple[str, int, bytes]: target partition, relative offset, data block
"""
if ota_idx not in [1, 2]:
print(f"Invalid OTA index - {ota_idx}")
return None
if ota_idx == 1 and not self.has_ota1:
print(f"No data for OTA index - {ota_idx}")
return None
if ota_idx == 2 and not self.has_ota2:
print(f"No data for OTA index - {ota_idx}")
return None
for _ in range(self.seq, len(self.uf2.data)):
block = self.uf2.data[self.seq]
self.seq += 1
part1 = block.tags.get(Tag.LT_PART_1, None)
part2 = block.tags.get(Tag.LT_PART_2, None)
if part1 and part2:
self.part1 = part1.decode()
self.part2 = part2.decode()
elif part1 or part2:
print(f"Only one target partition specified - {part1} / {part2}")
return None
if not block.data:
continue
part = None
if ota_idx == 1:
part = self.part1
elif ota_idx == 2:
part = self.part2
if not part:
continue
# got data and target partition
offs = block.address
data = block.data
if ota_idx == 2 and Tag.LT_BINPATCH in block.tags:
binpatch = block.tags[Tag.LT_BINPATCH]
data = bytearray(data)
data = binpatch_apply(data, binpatch)
data = bytes(data)
return (part, offs, data)
return (None, 0, None)
def collect(self, ota_idx: int = 1) -> Dict[int, BytesIO]:
"""Read all UF2 blocks. Gather continuous data parts into sections
and their flashing offsets.
Returns:
Dict[int, BytesIO]: map of flash offsets to streams with data
"""
out: Dict[int, BytesIO] = {}
while True:
ret = self.read(ota_idx)
if not ret:
return False
(part, offs, data) = ret
if not data:
break
offs = self.get_offset(part, offs)
if offs is None:
return False
# find BytesIO in the dict
for io_offs, io_data in out.items():
if io_offs + len(io_data.getvalue()) == offs:
io_data.write(data)
offs = 0
break
if offs == 0:
continue
# create BytesIO at specified offset
io = BytesIO()
io.write(data)
out[offs] = io
# rewind BytesIO back to start
for io in out.values():
io.seek(0)
return out

View File

@@ -0,0 +1,67 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from io import BytesIO
from tools.upload.ctx import UploadContext
from tools.upload.rtltool import RTLXMD
from tools.util.intbin import letoint
def upload_uart(
ctx: UploadContext,
port: str,
baud: int = None,
**kwargs,
) -> bool:
prefix = "| |--"
rtl = RTLXMD(port=port)
print(prefix, f"Connecting to {port}...")
if not rtl.connect():
print(prefix, f"Failed to connect on port {port}")
return False
# read system data to get active OTA index
io = BytesIO()
if not rtl.ReadBlockFlash(io, offset=0x9000, size=256):
print(prefix, "Failed to read from 0x9000")
return False
# get as bytes
system = io.getvalue()
if len(system) != 256:
print(prefix, f"Length invalid while reading from 0x9000 - {len(system)}")
return False
# read OTA switch value
ota_switch = bin(letoint(system[4:8]))[2:]
# count 0-bits
ota_idx = 1 + (ota_switch.count("0") % 2)
# validate OTA2 address in system data
if ota_idx == 2:
ota2_addr = letoint(system[0:4]) & 0xFFFFFF
part_addr = ctx.get_offset("ota2", 0)
if ota2_addr != part_addr:
print(
prefix,
f"Invalid OTA2 address on chip - found {ota2_addr}, expected {part_addr}",
)
return False
print(prefix, f"Flashing image to OTA {ota_idx}...")
# collect continuous blocks of data
parts = ctx.collect(ota_idx=ota_idx)
# write blocks to flash
for offs, data in parts.items():
offs |= 0x8000000
length = len(data.getvalue())
data.seek(0)
print(prefix, f"Writing {length} bytes to 0x{offs:06x}")
if not rtl.WriteBlockFlash(data, offs, length):
print(prefix, f"Writing failed at 0x{offs:x}")
return False
return True
def upload(ctx: UploadContext, protocol: str, **kwargs) -> bool:
if protocol == "uart":
return upload_uart(ctx, **kwargs)
print(f"Unknown upload protocol - {protocol}")
return False

53
tools/upload/uf2upload.py Normal file
View File

@@ -0,0 +1,53 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
import sys
from os.path import dirname, join
from time import time
sys.path.append(join(dirname(__file__), "..", ".."))
sys.path.append(join(dirname(__file__), "..", "uf2ota"))
from argparse import ArgumentParser, FileType
from tools.uf2ota.uf2 import UF2
from tools.upload.ctx import UploadContext
# TODO document this tool
if __name__ == "__main__":
parser = ArgumentParser("uf2upload", description="UF2 uploader")
parser.add_argument("file", type=FileType("rb"), help=".uf2 file")
subp = parser.add_subparsers(dest="protocol", help="Upload protocol", required=True)
parser_uart = subp.add_parser("uart", help="UART uploader")
parser_uart.add_argument("port", type=str, help="Serial port device")
parser_uart.add_argument("-b", "--baud", type=int, help="Serial baudrate")
args = parser.parse_args()
uf2 = UF2(args.file)
if not uf2.read(block_tags=False):
exit(1)
ctx = UploadContext(uf2)
print(
f"|-- {ctx.fw_name} {ctx.fw_version} @ {ctx.build_date} -> {ctx.board} via {args.protocol}"
)
start = time()
args = dict(args._get_kwargs())
if uf2.family.code == "ambz":
from uf2_rtltool import upload
if not upload(ctx, **args):
exit(1)
else:
print(f"Unsupported upload family - {uf2.family.name}")
exit(1)
duration = time() - start
print(f"|-- Finished in {duration:.3f} s")
exit(0)