From bdffa7ef53dc218d03a793ad2f01bbf8649f5f9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Szczodrzy=C5=84ski?= Date: Thu, 2 Jun 2022 23:02:02 +0200 Subject: [PATCH] [core] Add UF2-based uploader --- builder/frameworks/realtek-ambz-sdk.py | 28 ----- builder/main.py | 4 +- builder/uf2.py | 32 +++++ builder/utils.py | 5 +- tools/uf2ota/uf2.py | 5 + tools/upload/binpatch.py | 26 ++++ tools/upload/ctx.py | 157 +++++++++++++++++++++++++ tools/{ => upload}/rtltool.py | 0 tools/upload/uf2_rtltool.py | 67 +++++++++++ tools/upload/uf2upload.py | 53 +++++++++ 10 files changed, 346 insertions(+), 31 deletions(-) create mode 100644 tools/upload/binpatch.py create mode 100644 tools/upload/ctx.py rename tools/{ => upload}/rtltool.py (100%) create mode 100644 tools/upload/uf2_rtltool.py create mode 100644 tools/upload/uf2upload.py diff --git a/builder/frameworks/realtek-ambz-sdk.py b/builder/frameworks/realtek-ambz-sdk.py index c4c3b26..6847192 100644 --- a/builder/frameworks/realtek-ambz-sdk.py +++ b/builder/frameworks/realtek-ambz-sdk.py @@ -1,6 +1,5 @@ # Copyright (c) Kuba Szczodrzyński 2022-04-20. -import sys from os.path import join from SCons.Script import Builder, DefaultEnvironment @@ -271,31 +270,6 @@ env.Append( ), ) -# Uploader -upload_protocol = env.subst("$UPLOAD_PROTOCOL") -upload_source = "" -upload_actions = [] -# from platform-espressif32/builder/main.py -if upload_protocol == "uart": - env.Replace( - UPLOADER=join("$TOOLS_DIR", "rtltool.py"), - UPLOADERFLAGS=[ - "--port", - "$UPLOAD_PORT", - "--go", # run firmware after uploading - "wf", # Write a binary file to Flash data - ], - UPLOADCMD='"$PYTHONEXE" "$UPLOADER" $UPLOADERFLAGS $FLASH_OTA1_OFFSET "$BUILD_DIR/$IMG_FW"', - ) - upload_actions = [ - env.VerboseAction(env.AutodetectUploadPort, "Looking for upload port..."), - env.VerboseAction("$UPLOADCMD", "Uploading $IMG_FW"), - ] -elif upload_protocol == "custom": - upload_actions = [env.VerboseAction("$UPLOADCMD", "Uploading $IMG_FW")] -else: - sys.stderr.write("Warning! Unknown upload protocol %s\n" % upload_protocol) - # Bootloader library boot_all = board.get("build.amb_boot_all") target_boot = env.StaticLibrary( @@ -325,6 +299,4 @@ env.Replace( "${BUILD_DIR}/image_${FLASH_OTA2_OFFSET}.ota2.bin", ), ], - # uploader - UPLOAD_ACTIONS=upload_actions, ) diff --git a/builder/main.py b/builder/main.py index 330fea5..8f04de3 100644 --- a/builder/main.py +++ b/builder/main.py @@ -73,7 +73,9 @@ target_elf = env.BuildProgram() targets = [target_elf] if "UF2OTA" in env: - targets.append(env.BuildUF2OTA(target_elf)) + target_uf2 = env.BuildUF2OTA(target_elf) + targets.append(target_uf2) + env.AddUF2Uploader(target_uf2) elif "IMG_FW" in env: target_fw = env.subst("$IMG_FW") env.AddPlatformTarget("upload", target_fw, env["UPLOAD_ACTIONS"], "Upload") diff --git a/builder/uf2.py b/builder/uf2.py index c850d60..21cfc4c 100644 --- a/builder/uf2.py +++ b/builder/uf2.py @@ -1,5 +1,6 @@ # Copyright (c) Kuba Szczodrzyński 2022-06-02. +import sys from datetime import datetime from os.path import basename, join, normpath @@ -26,6 +27,8 @@ def env_uf2ota(env, *args, **kwargs): f"lt{lt_version}", ] output = join("${BUILD_DIR}", "_".join(output)) + ".uf2" + env["UF2OUT"] = output + env["UF2OUT_BASE"] = basename(output) cmd = [ "@${UF2OTA_PY}", @@ -44,6 +47,34 @@ def env_uf2ota(env, *args, **kwargs): env.Execute(" ".join(cmd)) +def env_uf2upload(env, target): + protocol = env.subst("${UPLOAD_PROTOCOL}") + actions = [] + # from platform-espressif32/builder/main.py + if protocol == "uart": + # upload via UART + env["UPLOADERFLAGS"] = [ + "${UF2OUT}", + "uart", + "${UPLOAD_PORT}", + ] + actions = [ + env.VerboseAction(env.AutodetectUploadPort, "Looking for upload port..."), + ] + elif protocol == "custom": + actions = [ + env.VerboseAction("${UPLOADCMD}", "Uploading firmware"), + ] + else: + sys.stderr.write("Warning! Unknown upload protocol %s\n" % protocol) + return + + # add main upload target + env.Replace(UPLOADER="${UF2UPLOAD_PY}", UPLOADCMD="${UPLOADER} ${UPLOADERFLAGS}") + actions.append(env.VerboseAction("${UPLOADCMD}", "Uploading ${UF2OUT_BASE}")) + env.AddPlatformTarget("upload", target, actions, "Upload") + + env.Append( BUILDERS=dict( BuildUF2OTA=Builder( @@ -51,3 +82,4 @@ env.Append( ) ) ) +env.AddMethod(env_uf2upload, "AddUF2Uploader") diff --git a/builder/utils.py b/builder/utils.py index 289cf6c..770acfd 100644 --- a/builder/utils.py +++ b/builder/utils.py @@ -33,8 +33,9 @@ def env_add_defaults(env, family_name: str, sdk_name: str): LDSCRIPT_SDK=board.get("build.ldscript_sdk"), LDSCRIPT_ARDUINO=board.get("build.ldscript_arduino"), # Link2Bin tool - LINK2BIN="${PYTHONEXE} ${LT_DIR}/tools/link2bin.py", - UF2OTA_PY="${PYTHONEXE} ${LT_DIR}/tools/uf2ota/uf2ota.py", + LINK2BIN='"${PYTHONEXE}" "${LT_DIR}/tools/link2bin.py"', + UF2OTA_PY='"${PYTHONEXE}" "${LT_DIR}/tools/uf2ota/uf2ota.py"', + UF2UPLOAD_PY='"${PYTHONEXE}" "${LT_DIR}/tools/upload/uf2upload.py"', ) env.Replace(**vars) for k, v in vars.items(): diff --git a/tools/uf2ota/uf2.py b/tools/uf2ota/uf2.py index 2e03894..8e44c56 100644 --- a/tools/uf2ota/uf2.py +++ b/tools/uf2ota/uf2.py @@ -75,6 +75,11 @@ class UF2: return False self.family = block.family + if block.block_seq != self.seq: + print(f"Mismatched sequence number ({self.seq} != {block.block_seq}") + return False + self.seq += 1 + if block_tags or not block.length: self.tags.update(block.tags) if block.length and not block.flags.not_main_flash: diff --git a/tools/upload/binpatch.py b/tools/upload/binpatch.py new file mode 100644 index 0000000..4a2b1a4 --- /dev/null +++ b/tools/upload/binpatch.py @@ -0,0 +1,26 @@ +# Copyright (c) Kuba Szczodrzyński 2022-06-02. + +from io import BytesIO + +from tools.uf2ota.models import Opcode +from tools.util.intbin import inttole32, letoint, letosint + + +def binpatch_diff32(data: bytearray, patch: bytes) -> bytearray: + diff = letosint(patch[0:4]) + for offs in patch[4:]: + value = letoint(data[offs : offs + 4]) + value += diff + data[offs : offs + 4] = inttole32(value) + return data + + +def binpatch_apply(data: bytearray, binpatch: bytes) -> bytearray: + io = BytesIO(binpatch) + while io.tell() < len(binpatch): + opcode = io.read(1)[0] + length = io.read(1)[0] + bpdata = io.read(length) + if opcode == Opcode.DIFF32: + data = binpatch_diff32(data, bpdata) + return data diff --git a/tools/upload/ctx.py b/tools/upload/ctx.py new file mode 100644 index 0000000..7c866fc --- /dev/null +++ b/tools/upload/ctx.py @@ -0,0 +1,157 @@ +# Copyright (c) Kuba Szczodrzyński 2022-06-02. + +from datetime import datetime +from io import BytesIO +from typing import Dict, Tuple + +from tools.uf2ota.models import Tag +from tools.uf2ota.uf2 import UF2 +from tools.upload.binpatch import binpatch_apply +from tools.util.intbin import letoint +from tools.util.obj import get +from tools.util.platform import get_board_manifest + + +class UploadContext: + + uf2: UF2 + + seq: int = 0 + + part1: str = None + part2: str = None + + has_ota1: bool + has_ota2: bool + + board_manifest: dict = None + + def __init__(self, uf2: UF2) -> None: + self.uf2 = uf2 + self.has_ota1 = uf2.tags.get(Tag.LT_HAS_OTA1, None) == b"\x01" + self.has_ota2 = uf2.tags.get(Tag.LT_HAS_OTA2, None) == b"\x01" + + @property + def fw_name(self) -> str: + return self.uf2.tags.get(Tag.FIRMWARE, b"").decode() + + @property + def fw_version(self) -> str: + return self.uf2.tags.get(Tag.VERSION, b"").decode() + + @property + def lt_version(self) -> str: + return self.uf2.tags.get(Tag.LT_VERSION, b"").decode() + + @property + def board(self) -> str: + return self.uf2.tags.get(Tag.BOARD, b"").decode() + + @property + def build_date(self) -> datetime: + if Tag.BUILD_DATE not in self.uf2.tags: + return None + return datetime.fromtimestamp(letoint(self.uf2.tags[Tag.BUILD_DATE])) + + def get_offset(self, part: str, offs: int) -> int: + if not self.board_manifest: + self.board_manifest = get_board_manifest(self.board) + part = get(self.board_manifest, f"flash.{part}") + (offset, length) = map(lambda x: int(x, 16), part.split("+")) + if offs >= length: + return None + return offset + offs + + def read(self, ota_idx: int = 1) -> Tuple[str, int, bytes]: + """Read next available data block for the specified OTA scheme. + + Returns: + Tuple[str, int, bytes]: target partition, relative offset, data block + """ + + if ota_idx not in [1, 2]: + print(f"Invalid OTA index - {ota_idx}") + return None + + if ota_idx == 1 and not self.has_ota1: + print(f"No data for OTA index - {ota_idx}") + return None + if ota_idx == 2 and not self.has_ota2: + print(f"No data for OTA index - {ota_idx}") + return None + + for _ in range(self.seq, len(self.uf2.data)): + block = self.uf2.data[self.seq] + self.seq += 1 + + part1 = block.tags.get(Tag.LT_PART_1, None) + part2 = block.tags.get(Tag.LT_PART_2, None) + + if part1 and part2: + self.part1 = part1.decode() + self.part2 = part2.decode() + elif part1 or part2: + print(f"Only one target partition specified - {part1} / {part2}") + return None + + if not block.data: + continue + + part = None + if ota_idx == 1: + part = self.part1 + elif ota_idx == 2: + part = self.part2 + if not part: + continue + + # got data and target partition + offs = block.address + data = block.data + + if ota_idx == 2 and Tag.LT_BINPATCH in block.tags: + binpatch = block.tags[Tag.LT_BINPATCH] + data = bytearray(data) + data = binpatch_apply(data, binpatch) + data = bytes(data) + + return (part, offs, data) + return (None, 0, None) + + def collect(self, ota_idx: int = 1) -> Dict[int, BytesIO]: + """Read all UF2 blocks. Gather continuous data parts into sections + and their flashing offsets. + + Returns: + Dict[int, BytesIO]: map of flash offsets to streams with data + """ + + out: Dict[int, BytesIO] = {} + while True: + ret = self.read(ota_idx) + if not ret: + return False + (part, offs, data) = ret + if not data: + break + offs = self.get_offset(part, offs) + if offs is None: + return False + + # find BytesIO in the dict + for io_offs, io_data in out.items(): + if io_offs + len(io_data.getvalue()) == offs: + io_data.write(data) + offs = 0 + break + if offs == 0: + continue + + # create BytesIO at specified offset + io = BytesIO() + io.write(data) + out[offs] = io + # rewind BytesIO back to start + for io in out.values(): + io.seek(0) + return out diff --git a/tools/rtltool.py b/tools/upload/rtltool.py similarity index 100% rename from tools/rtltool.py rename to tools/upload/rtltool.py diff --git a/tools/upload/uf2_rtltool.py b/tools/upload/uf2_rtltool.py new file mode 100644 index 0000000..8d3e9db --- /dev/null +++ b/tools/upload/uf2_rtltool.py @@ -0,0 +1,67 @@ +# Copyright (c) Kuba Szczodrzyński 2022-06-02. + +from io import BytesIO + +from tools.upload.ctx import UploadContext +from tools.upload.rtltool import RTLXMD +from tools.util.intbin import letoint + + +def upload_uart( + ctx: UploadContext, + port: str, + baud: int = None, + **kwargs, +) -> bool: + prefix = "| |--" + rtl = RTLXMD(port=port) + print(prefix, f"Connecting to {port}...") + if not rtl.connect(): + print(prefix, f"Failed to connect on port {port}") + return False + + # read system data to get active OTA index + io = BytesIO() + if not rtl.ReadBlockFlash(io, offset=0x9000, size=256): + print(prefix, "Failed to read from 0x9000") + return False + # get as bytes + system = io.getvalue() + if len(system) != 256: + print(prefix, f"Length invalid while reading from 0x9000 - {len(system)}") + return False + # read OTA switch value + ota_switch = bin(letoint(system[4:8]))[2:] + # count 0-bits + ota_idx = 1 + (ota_switch.count("0") % 2) + # validate OTA2 address in system data + if ota_idx == 2: + ota2_addr = letoint(system[0:4]) & 0xFFFFFF + part_addr = ctx.get_offset("ota2", 0) + if ota2_addr != part_addr: + print( + prefix, + f"Invalid OTA2 address on chip - found {ota2_addr}, expected {part_addr}", + ) + return False + + print(prefix, f"Flashing image to OTA {ota_idx}...") + # collect continuous blocks of data + parts = ctx.collect(ota_idx=ota_idx) + # write blocks to flash + for offs, data in parts.items(): + offs |= 0x8000000 + length = len(data.getvalue()) + data.seek(0) + print(prefix, f"Writing {length} bytes to 0x{offs:06x}") + if not rtl.WriteBlockFlash(data, offs, length): + print(prefix, f"Writing failed at 0x{offs:x}") + return False + return True + + +def upload(ctx: UploadContext, protocol: str, **kwargs) -> bool: + if protocol == "uart": + return upload_uart(ctx, **kwargs) + print(f"Unknown upload protocol - {protocol}") + return False diff --git a/tools/upload/uf2upload.py b/tools/upload/uf2upload.py new file mode 100644 index 0000000..5acdffc --- /dev/null +++ b/tools/upload/uf2upload.py @@ -0,0 +1,53 @@ +# Copyright (c) Kuba Szczodrzyński 2022-06-02. + +import sys +from os.path import dirname, join +from time import time + +sys.path.append(join(dirname(__file__), "..", "..")) +sys.path.append(join(dirname(__file__), "..", "uf2ota")) + +from argparse import ArgumentParser, FileType + +from tools.uf2ota.uf2 import UF2 +from tools.upload.ctx import UploadContext + +# TODO document this tool + +if __name__ == "__main__": + parser = ArgumentParser("uf2upload", description="UF2 uploader") + parser.add_argument("file", type=FileType("rb"), help=".uf2 file") + + subp = parser.add_subparsers(dest="protocol", help="Upload protocol", required=True) + + parser_uart = subp.add_parser("uart", help="UART uploader") + parser_uart.add_argument("port", type=str, help="Serial port device") + parser_uart.add_argument("-b", "--baud", type=int, help="Serial baudrate") + + args = parser.parse_args() + + uf2 = UF2(args.file) + if not uf2.read(block_tags=False): + exit(1) + + ctx = UploadContext(uf2) + + print( + f"|-- {ctx.fw_name} {ctx.fw_version} @ {ctx.build_date} -> {ctx.board} via {args.protocol}" + ) + + start = time() + + args = dict(args._get_kwargs()) + if uf2.family.code == "ambz": + from uf2_rtltool import upload + + if not upload(ctx, **args): + exit(1) + else: + print(f"Unsupported upload family - {uf2.family.name}") + exit(1) + + duration = time() - start + print(f"|-- Finished in {duration:.3f} s") + exit(0)