diff --git a/tools/uf2ota/.gitignore b/tools/uf2ota/.gitignore new file mode 100644 index 0000000..811d126 --- /dev/null +++ b/tools/uf2ota/.gitignore @@ -0,0 +1,2 @@ +*.uf2 +*.bin diff --git a/tools/uf2ota/models.py b/tools/uf2ota/models.py new file mode 100644 index 0000000..52f9277 --- /dev/null +++ b/tools/uf2ota/models.py @@ -0,0 +1,116 @@ +# Copyright (c) Kuba Szczodrzyński 2022-05-27. + +from enum import IntEnum + + +class Family(IntEnum): + INVALID = 0 + # Microsoft-defined families + ATMEGA32 = 0x16573617 # Microchip (Atmel) ATmega32 + SAML21 = 0x1851780A # Microchip (Atmel) SAML21 + NRF52 = 0x1B57745F # Nordic NRF52 + ESP32 = 0x1C5F21B0 # ESP32 + STM32L1 = 0x1E1F432D # ST STM32L1xx + STM32L0 = 0x202E3A91 # ST STM32L0xx + STM32WL = 0x21460FF0 # ST STM32WLxx + LPC55 = 0x2ABC77EC # NXP LPC55xx + STM32G0 = 0x300F5633 # ST STM32G0xx + GD32F350 = 0x31D228C6 # GD32F350 + STM32L5 = 0x04240BDF # ST STM32L5xx + STM32G4 = 0x4C71240A # ST STM32G4xx + MIMXRT10XX = 0x4FB2D5BD # NXP i.MX RT10XX + STM32F7 = 0x53B80F00 # ST STM32F7xx + SAMD51 = 0x55114460 # Microchip (Atmel) SAMD51 + STM32F4 = 0x57755A57 # ST STM32F401 + FX2 = 0x5A18069B # Cypress FX2 + STM32F2 = 0x5D1A0A2E # ST STM32F2xx + STM32F1 = 0x5EE21072 # ST STM32F103 + NRF52833 = 0x621E937A # Nordic NRF52833 + STM32F0 = 0x647824B6 # ST STM32F0xx + SAMD21 = 0x68ED2B88 # Microchip (Atmel) SAMD21 + STM32F3 = 0x6B846188 # ST STM32F3xx + STM32F407 = 0x6D0922FA # ST STM32F407 + STM32H7 = 0x6DB66082 # ST STM32H7xx + STM32WB = 0x70D16653 # ST STM32WBxx + ESP8266 = 0x7EAB61ED # ESP8266 + KL32L2 = 0x7F83E793 # NXP KL32L2x + STM32F407VG = 0x8FB060FE # ST STM32F407VG + NRF52840 = 0xADA52840 # Nordic NRF52840 + ESP32S2 = 0xBFDD4EEE # ESP32-S2 + ESP32S3 = 0xC47E5767 # ESP32-S3 + ESP32C3 = 0xD42BA06C # ESP32-C3 + ESP32C2 = 0x2B88D29C # ESP32-C2 + ESP32H2 = 0x332726F6 # ESP32-H2 + RP2040 = 0xE48BFF56 # Raspberry Pi RP2040 + STM32L4 = 0x00FF6919 # ST STM32L4xx + GD32VF103 = 0x9AF03E33 # GigaDevice GD32VF103 + # LibreTuya defined families + RTL8710A = 0x9FFFD543 # Realtek Ameba1 + RTL8710B = 0x22E0D6FC # Realtek AmebaZ + RTL8720C = 0xE08F7564 # Realtek AmebaZ2 + RTL8720D = 0x3379CFE2 # Realtek AmebaD + BK7231T = 0x675A40B0 # Beken 7231T + BK7231N = 0x7B3EF230 # Beken 7231N + BL602 = 0xDE1270B7 # Boufallo 602 + XR809 = 0x51E903A8 # Xradiotech 809 + + +class Tag(IntEnum): + VERSION = 0x9FC7BC # version of firmware file - UTF8 semver string + PAGE_SIZE = 0x0BE9F7 # page size of target device (32 bit unsigned number) + SHA2 = 0xB46DB0 # SHA-2 checksum of firmware (can be of various size) + DEVICE = 0x650D9D # description of device (UTF8) + DEVICE_ID = 0xC8A729 # device type identifier + # LibreTuya custom tags + OTA_VERSION = 0x5D57D0 # format version + BOARD = 0xCA25C8 # board name (lowercase code) + FIRMWARE = 0x00DE43 # firmware description / name + LT_VERSION = 0x59563D # LT version (semver) + LT_PART_1 = 0x805946 # OTA1 partition name + LT_PART_2 = 0xA1E4D7 # OTA2 partition name + LT_HAS_OTA1 = 0xBBD965 # image has any data for OTA1 + LT_HAS_OTA2 = 0x92280E # image has any data for OTA2 + LT_BINPATCH = 0xB948DE # binary patch to convert OTA1->OTA2 + + +class Flags: + not_main_flash: bool = False + file_container: bool = False + has_family_id: bool = False + has_md5: bool = False + has_tags: bool = False + + def encode(self) -> int: + val = 0 + if self.not_main_flash: + val |= 0x00000001 + if self.file_container: + val |= 0x00001000 + if self.has_family_id: + val |= 0x00002000 + if self.has_md5: + val |= 0x00004000 + if self.has_tags: + val |= 0x00008000 + return val + + def decode(self, data: int): + self.not_main_flash = (data & 0x00000001) != 0 + self.file_container = (data & 0x00001000) != 0 + self.has_family_id = (data & 0x00002000) != 0 + self.has_md5 = (data & 0x00004000) != 0 + self.has_tags = (data & 0x00008000) != 0 + + def __str__(self) -> str: + flags = [] + if self.not_main_flash: + flags.append("NMF") + if self.file_container: + flags.append("FC") + if self.has_family_id: + flags.append("FID") + if self.has_md5: + flags.append("MD5") + if self.has_tags: + flags.append("TAG") + return ",".join(flags) diff --git a/tools/uf2ota/pyproject.toml b/tools/uf2ota/pyproject.toml new file mode 100644 index 0000000..dc407ce --- /dev/null +++ b/tools/uf2ota/pyproject.toml @@ -0,0 +1,17 @@ +[tool.poetry] +name = "uf2ota" +version = "0.1.0" +description = "UF2 OTA update format" +authors = ["Kuba Szczodrzyński "] +license = "MIT" + +[tool.poetry.dependencies] +python = "^3.7" + +[tool.poetry.dev-dependencies] +black = "^22.3.0" +isort = "^5.10.1" + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/tools/uf2ota/uf2.py b/tools/uf2ota/uf2.py new file mode 100644 index 0000000..cc1f682 --- /dev/null +++ b/tools/uf2ota/uf2.py @@ -0,0 +1,124 @@ +# Copyright (c) Kuba Szczodrzyński 2022-05-27. + +from io import BytesIO, FileIO +from typing import Dict, List + +from models import Family, Flags, Tag +from uf2_block import Block +from utils import intto8, inttole16, inttole32 + + +class UF2: + f: FileIO + seq: int = 0 + + family: Family = Family.INVALID + tags: Dict[Tag, bytes] = {} + data: List[Block] = [] + + def __init__(self, f: FileIO) -> None: + self.f = f + + def store( + self, + address: int, + data: bytes, + tags: Dict[Tag, bytes] = {}, + block_size: int = 256, + ): + if len(data) <= block_size: + block = Block(self.family) + block.tags = tags + block.address = address + block.data = data + block.length = len(data) + self.data.append(block) + return + for offs in range(0, len(data), block_size): + block = Block(self.family) + block.tags = tags + data_part = data[offs : offs + block_size] + block.address = address + offs + block.data = data_part + block.length = len(data_part) + self.data.append(block) + tags = {} + + def put_str(self, tag: Tag, value: str): + self.tags[tag] = value.encode("utf-8") + + def put_int32le(self, tag: Tag, value: int): + self.tags[tag] = inttole32(value) + + def put_int16le(self, tag: Tag, value: int): + self.tags[tag] = inttole16(value) + + def put_int8(self, tag: Tag, value: int): + self.tags[tag] = intto8(value) + + def read(self) -> bool: + while True: + data = self.f.read(512) + if len(data) not in [0, 512]: + print(f"Block size invalid ({len(data)=})") + return False + if not len(data): + break + block = Block() + if not block.decode(data): + return False + + if self.family != Family.INVALID and self.family != block.family: + print(f"Mismatched family ({self.family=} != {block.family=})") + return False + self.family = block.family + + self.tags.update(block.tags) + if block.length and not block.flags.not_main_flash: + self.data.append(block) + return True + + def dump(self): + print(f"Family: {self.family.name}") + print(f"Tags:") + for k, v in self.tags.items(): + if "\\x" not in str(v): + v = v.decode() + else: + v = v.hex() + print(f" - {k.name}: {v}") + print(f"Data chunks: {len(self.data)}") + print(f"Total binary size: {sum(bl.length for bl in self.data)}") + + @property + def block_count(self) -> int: + cnt = len(self.data) + if self.tags: + cnt += 1 + return cnt + + def write_header(self): + bl = Block(self.family) + bl.flags.has_tags = True + bl.flags.not_main_flash = True + bl.block_seq = 0 + bl.block_count = self.block_count + bl.tags = self.tags + self.f.write(bl.encode()) + + def write(self): + if self.tags and self.seq == 0: + self.write_header() + self.seq += 1 + + bio = BytesIO() + for bl in self.data: + bl.block_count = self.block_count + bl.block_seq = self.seq + bio.write(bl.encode()) + if self.seq % 128 == 0: + # write the buffer every 64 KiB + self.f.write(bio.getvalue()) + bio = BytesIO() + self.seq += 1 + self.f.write(bio.getvalue()) diff --git a/tools/uf2ota/uf2_block.py b/tools/uf2ota/uf2_block.py new file mode 100644 index 0000000..eda837e --- /dev/null +++ b/tools/uf2ota/uf2_block.py @@ -0,0 +1,126 @@ +# Copyright (c) Kuba Szczodrzyński 2022-05-27. + +from math import ceil +from typing import Dict + +from models import Family, Flags, Tag +from utils import intto8, inttole24, inttole32, letoint + + +class Block: + flags: Flags + + address: int = 0 + length: int = 0 + + block_seq: int = 0 + block_count: int = 0 + + file_size: int = 0 + family: Family + + data: bytes = None + md5_data: bytes = None + tags: Dict[Tag, bytes] = {} + + def __init__(self, family: Family = Family.INVALID) -> None: + self.flags = Flags() + self.family = family + if self.family != Family.INVALID: + self.flags.has_family_id = True + + def encode(self) -> bytes: + self.flags.has_tags = not not self.tags + # UF2 magic 1 and 2 + data = b"\x55\x46\x32\x0A\x57\x51\x5D\x9E" + # encode integer variables + data += inttole32(self.flags.encode()) + data += inttole32(self.address) + data += inttole32(self.length) + data += inttole32(self.block_seq) + data += inttole32(self.block_count) + if self.flags.file_container: + data += inttole32(self.file_size) + elif self.flags.has_family_id: + data += inttole32(self.family.value) + else: + data += b"\x00\x00\x00\x00" + if not self.data: + self.data = b"" + # append tags + tags = b"" + if self.flags.has_tags: + for k, v in self.tags.items(): + tag_size = 4 + len(v) + tags += intto8(tag_size) + tags += inttole24(k.value) + tags += v + tag_size %= 4 + if tag_size: + tags += b"\x00" * (4 - tag_size) + # append block data with padding + data += self.data + data += tags + data += b"\x00" * (476 - len(self.data) - len(tags)) + data += b"\x30\x6F\xB1\x0A" # magic 3 + return data + + def decode(self, data: bytes) -> bool: + # check block size + if len(data) != 512: + print(f"Invalid block size ({len(data)=})") + return False + # check Magic 1 + if letoint(data[0:4]) != 0x0A324655: + print(f"Invalid Magic 1 ({data[0:4]=})") + return False + # check Magic 2 + if letoint(data[4:8]) != 0x9E5D5157: + print(f"Invalid Magic 2 ({data[4:8]=})") + return False + # check Magic 3 + if letoint(data[508:512]) != 0x0AB16F30: + print(f"Invalid Magic 13({data[508:512]=})") + return False + + self.flags.decode(letoint(data[8:12])) + self.address = letoint(data[12:16]) + self.length = letoint(data[16:20]) + self.block_seq = letoint(data[20:24]) + self.block_count = letoint(data[24:28]) + if self.flags.file_container: + self.file_size = letoint(data[28:32]) + if self.flags.has_family_id: + self.family = Family(letoint(data[28:32])) + + if self.flags.has_md5: + self.md5_data = data[484:508] # last 24 bytes of data[] + + # decode tags + self.tags = {} + if self.flags.has_tags: + tags = data[32 + self.length :] + i = 0 + while i < len(tags): + length = tags[i] + if not length: + break + tag_type = letoint(tags[i + 1 : i + 4]) + tag_data = tags[i + 4 : i + length] + self.tags[Tag(tag_type)] = tag_data + i += length + i = int(ceil(i / 4) * 4) + + self.data = data[32 : 32 + self.length] + return True + + def __str__(self) -> str: + flags = self.flags + address = hex(self.address) + length = hex(self.length) + block_seq = self.block_seq + block_count = self.block_count + file_size = self.file_size + family = self.family.name + tags = [(k.name, v) for k, v in self.tags.items()] + return f"Block[{block_seq}/{block_count}]({flags=}, {address=}, {length=}, {file_size=}, {family=}, {tags=})" diff --git a/tools/uf2ota/uf2ota.py b/tools/uf2ota/uf2ota.py new file mode 100644 index 0000000..6e2594d --- /dev/null +++ b/tools/uf2ota/uf2ota.py @@ -0,0 +1,166 @@ +# Copyright (c) Kuba Szczodrzyński 2022-05-27. + +from argparse import ArgumentParser +from zlib import crc32 + +from models import Family, Tag +from uf2 import UF2 + + +class Input: + ota1_part: str = None + ota1_offs: int = 0 + ota1_file: str = None + ota2_part: str = None + ota2_offs: int = 0 + ota2_file: str = None + + def __init__(self, input: str) -> None: + input = input.split(":") + n = len(input) + if n not in [2, 4]: + print("Incorrect input format - should be part+offs:file[:part+offs:file]") + exit() + # just spread the same image twice for single-OTA scheme + if n == 2: + input += input + + if input[0] and input[1]: + if "+" in input[0]: + (self.ota1_part, self.ota1_offs) = input[0].split("+") + self.ota1_offs = int(self.ota1_offs, 16) + else: + self.ota1_part = input[0] + self.ota1_file = input[1] + if input[2] and input[3]: + if "+" in input[2]: + (self.ota2_part, self.ota2_offs) = input[2].split("+") + self.ota2_offs = int(self.ota2_offs, 16) + else: + self.ota2_part = input[2] + self.ota2_file = input[3] + + if self.is_simple and self.ota1_offs != self.ota2_offs: + # currently, offsets cannot differ when storing one image only + # (this would require to actually store it twice) + print( + f"Offsets cannot differ in single-image/two-partition scheme ({self.ota1_file})" + ) + exit() + + @property + def is_single(self) -> bool: + return self.ota1_part == self.ota2_part and self.ota1_file == self.ota2_file + + @property + def single_part(self) -> str: + return self.ota1_part or self.ota2_part + + @property + def single_offs(self) -> int: + return self.ota1_offs or self.ota2_offs + + @property + def single_file(self) -> str: + return self.ota1_file or self.ota2_file + + @property + def has_ota1(self) -> bool: + return not not (self.ota1_part and self.ota1_file) + + @property + def has_ota2(self) -> bool: + return not not (self.ota2_part and self.ota2_file) + + @property + def is_simple(self) -> bool: + return self.ota1_file == self.ota2_file or not (self.has_ota1 and self.has_ota2) + + +def cli(): + parser = ArgumentParser("uf2ota", description="UF2 OTA update format") + parser.add_argument("action", choices=["dump", "write"]) + parser.add_argument("inputs", nargs="+", type=str) + parser.add_argument("--output", help="Output .uf2 binary", type=str) + parser.add_argument("--family", help="Family name", type=str) + parser.add_argument("--board", help="Board name/code", type=str) + parser.add_argument("--version", help="LibreTuya core version", type=str) + parser.add_argument("--fw", help="Firmware name:version", type=str) + args = parser.parse_args() + + if args.action == "dump": + with open(args.inputs[0], "rb") as f: + uf2 = UF2(f) + if uf2.read(): + uf2.dump() + return + + out = args.output or "out.uf2" + with open(out, "wb") as f: + uf2 = UF2(f) + + try: + uf2.family = next(f for f in Family if f.name == args.family) + except: + families = ", ".join(f.name for f in Family)[9:] + print(f"Invalid family name - should be one of {families}") + return + + # store global tags (for entire file) + if not args.board: + print("Missing board name (--board)") + return + uf2.put_str(Tag.BOARD, args.board.lower()) + + if not args.version: + print("Missing LT version (--version)") + return + uf2.put_str(Tag.LT_VERSION, args.version) + + if args.fw: + (fw_name, fw_ver) = args.fw.split(":") + uf2.put_str(Tag.FIRMWARE, fw_name) + uf2.put_str(Tag.VERSION, fw_ver) + + uf2.put_str(Tag.DEVICE, "LibreTuya") + key = f"LibreTuya {args.board.lower()}" + uf2.put_int32le(Tag.DEVICE_ID, crc32(key.encode())) + + any_ota1 = False + any_ota2 = False + + for input in args.inputs: + input = Input(input) + + any_ota1 = any_ota1 or input.has_ota1 + any_ota2 = any_ota2 or input.has_ota2 + + if input.is_simple: + # single input image: + # - same image and partition (2 args) + # - same image but different partitions (4 args) + # - only OTA1 image + # - only OTA2 image + with open(input.single_file, "rb") as f: + data = f.read() + # store local tags (for this image only) + tags = {} + tags[Tag.LT_PART_1] = ( + input.ota1_part.encode() if input.has_ota1 else b"" + ) + tags[Tag.LT_PART_2] = ( + input.ota2_part.encode() if input.has_ota2 else b"" + ) + uf2.store(input.single_offs, data, tags) + continue + + # different images and partitions for both OTA schemes + raise NotImplementedError("Image binary patching is not yet implemented") + + uf2.put_int8(Tag.LT_HAS_OTA1, any_ota1 * 1) + uf2.put_int8(Tag.LT_HAS_OTA2, any_ota2 * 1) + uf2.write() + + +if __name__ == "__main__": + cli() diff --git a/tools/uf2ota/utils.py b/tools/uf2ota/utils.py new file mode 100644 index 0000000..09af380 --- /dev/null +++ b/tools/uf2ota/utils.py @@ -0,0 +1,41 @@ +# Copyright (c) Kuba Szczodrzyński 2022-05-27. + + +def bswap(data: bytes) -> bytes: + return bytes(reversed(data)) + + +def betoint(data: bytes) -> int: + return int.from_bytes(data, byteorder="big") + + +def letoint(data: bytes) -> int: + return int.from_bytes(data, byteorder="little") + + +def inttobe32(data: int) -> bytes: + return data.to_bytes(length=4, byteorder="big") + + +def inttole32(data: int) -> bytes: + return data.to_bytes(length=4, byteorder="little") + + +def inttobe24(data: int) -> bytes: + return data.to_bytes(length=3, byteorder="big") + + +def inttole24(data: int) -> bytes: + return data.to_bytes(length=3, byteorder="little") + + +def inttobe16(data: int) -> bytes: + return data.to_bytes(length=2, byteorder="big") + + +def inttole16(data: int) -> bytes: + return data.to_bytes(length=2, byteorder="little") + + +def intto8(data: int) -> bytes: + return data.to_bytes(length=1, byteorder="big")