[tools] Add UF2 OTA writer tool

This commit is contained in:
Kuba Szczodrzyński
2022-05-27 20:53:08 +02:00
parent f3e8bcd74a
commit 5df430f3be
7 changed files with 592 additions and 0 deletions

2
tools/uf2ota/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
*.uf2
*.bin

116
tools/uf2ota/models.py Normal file
View File

@@ -0,0 +1,116 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from enum import IntEnum
class Family(IntEnum):
INVALID = 0
# Microsoft-defined families
ATMEGA32 = 0x16573617 # Microchip (Atmel) ATmega32
SAML21 = 0x1851780A # Microchip (Atmel) SAML21
NRF52 = 0x1B57745F # Nordic NRF52
ESP32 = 0x1C5F21B0 # ESP32
STM32L1 = 0x1E1F432D # ST STM32L1xx
STM32L0 = 0x202E3A91 # ST STM32L0xx
STM32WL = 0x21460FF0 # ST STM32WLxx
LPC55 = 0x2ABC77EC # NXP LPC55xx
STM32G0 = 0x300F5633 # ST STM32G0xx
GD32F350 = 0x31D228C6 # GD32F350
STM32L5 = 0x04240BDF # ST STM32L5xx
STM32G4 = 0x4C71240A # ST STM32G4xx
MIMXRT10XX = 0x4FB2D5BD # NXP i.MX RT10XX
STM32F7 = 0x53B80F00 # ST STM32F7xx
SAMD51 = 0x55114460 # Microchip (Atmel) SAMD51
STM32F4 = 0x57755A57 # ST STM32F401
FX2 = 0x5A18069B # Cypress FX2
STM32F2 = 0x5D1A0A2E # ST STM32F2xx
STM32F1 = 0x5EE21072 # ST STM32F103
NRF52833 = 0x621E937A # Nordic NRF52833
STM32F0 = 0x647824B6 # ST STM32F0xx
SAMD21 = 0x68ED2B88 # Microchip (Atmel) SAMD21
STM32F3 = 0x6B846188 # ST STM32F3xx
STM32F407 = 0x6D0922FA # ST STM32F407
STM32H7 = 0x6DB66082 # ST STM32H7xx
STM32WB = 0x70D16653 # ST STM32WBxx
ESP8266 = 0x7EAB61ED # ESP8266
KL32L2 = 0x7F83E793 # NXP KL32L2x
STM32F407VG = 0x8FB060FE # ST STM32F407VG
NRF52840 = 0xADA52840 # Nordic NRF52840
ESP32S2 = 0xBFDD4EEE # ESP32-S2
ESP32S3 = 0xC47E5767 # ESP32-S3
ESP32C3 = 0xD42BA06C # ESP32-C3
ESP32C2 = 0x2B88D29C # ESP32-C2
ESP32H2 = 0x332726F6 # ESP32-H2
RP2040 = 0xE48BFF56 # Raspberry Pi RP2040
STM32L4 = 0x00FF6919 # ST STM32L4xx
GD32VF103 = 0x9AF03E33 # GigaDevice GD32VF103
# LibreTuya defined families
RTL8710A = 0x9FFFD543 # Realtek Ameba1
RTL8710B = 0x22E0D6FC # Realtek AmebaZ
RTL8720C = 0xE08F7564 # Realtek AmebaZ2
RTL8720D = 0x3379CFE2 # Realtek AmebaD
BK7231T = 0x675A40B0 # Beken 7231T
BK7231N = 0x7B3EF230 # Beken 7231N
BL602 = 0xDE1270B7 # Boufallo 602
XR809 = 0x51E903A8 # Xradiotech 809
class Tag(IntEnum):
VERSION = 0x9FC7BC # version of firmware file - UTF8 semver string
PAGE_SIZE = 0x0BE9F7 # page size of target device (32 bit unsigned number)
SHA2 = 0xB46DB0 # SHA-2 checksum of firmware (can be of various size)
DEVICE = 0x650D9D # description of device (UTF8)
DEVICE_ID = 0xC8A729 # device type identifier
# LibreTuya custom tags
OTA_VERSION = 0x5D57D0 # format version
BOARD = 0xCA25C8 # board name (lowercase code)
FIRMWARE = 0x00DE43 # firmware description / name
LT_VERSION = 0x59563D # LT version (semver)
LT_PART_1 = 0x805946 # OTA1 partition name
LT_PART_2 = 0xA1E4D7 # OTA2 partition name
LT_HAS_OTA1 = 0xBBD965 # image has any data for OTA1
LT_HAS_OTA2 = 0x92280E # image has any data for OTA2
LT_BINPATCH = 0xB948DE # binary patch to convert OTA1->OTA2
class Flags:
not_main_flash: bool = False
file_container: bool = False
has_family_id: bool = False
has_md5: bool = False
has_tags: bool = False
def encode(self) -> int:
val = 0
if self.not_main_flash:
val |= 0x00000001
if self.file_container:
val |= 0x00001000
if self.has_family_id:
val |= 0x00002000
if self.has_md5:
val |= 0x00004000
if self.has_tags:
val |= 0x00008000
return val
def decode(self, data: int):
self.not_main_flash = (data & 0x00000001) != 0
self.file_container = (data & 0x00001000) != 0
self.has_family_id = (data & 0x00002000) != 0
self.has_md5 = (data & 0x00004000) != 0
self.has_tags = (data & 0x00008000) != 0
def __str__(self) -> str:
flags = []
if self.not_main_flash:
flags.append("NMF")
if self.file_container:
flags.append("FC")
if self.has_family_id:
flags.append("FID")
if self.has_md5:
flags.append("MD5")
if self.has_tags:
flags.append("TAG")
return ",".join(flags)

View File

@@ -0,0 +1,17 @@
[tool.poetry]
name = "uf2ota"
version = "0.1.0"
description = "UF2 OTA update format"
authors = ["Kuba Szczodrzyński <kuba@szczodrzynski.pl>"]
license = "MIT"
[tool.poetry.dependencies]
python = "^3.7"
[tool.poetry.dev-dependencies]
black = "^22.3.0"
isort = "^5.10.1"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

124
tools/uf2ota/uf2.py Normal file
View File

@@ -0,0 +1,124 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from io import BytesIO, FileIO
from typing import Dict, List
from models import Family, Flags, Tag
from uf2_block import Block
from utils import intto8, inttole16, inttole32
class UF2:
f: FileIO
seq: int = 0
family: Family = Family.INVALID
tags: Dict[Tag, bytes] = {}
data: List[Block] = []
def __init__(self, f: FileIO) -> None:
self.f = f
def store(
self,
address: int,
data: bytes,
tags: Dict[Tag, bytes] = {},
block_size: int = 256,
):
if len(data) <= block_size:
block = Block(self.family)
block.tags = tags
block.address = address
block.data = data
block.length = len(data)
self.data.append(block)
return
for offs in range(0, len(data), block_size):
block = Block(self.family)
block.tags = tags
data_part = data[offs : offs + block_size]
block.address = address + offs
block.data = data_part
block.length = len(data_part)
self.data.append(block)
tags = {}
def put_str(self, tag: Tag, value: str):
self.tags[tag] = value.encode("utf-8")
def put_int32le(self, tag: Tag, value: int):
self.tags[tag] = inttole32(value)
def put_int16le(self, tag: Tag, value: int):
self.tags[tag] = inttole16(value)
def put_int8(self, tag: Tag, value: int):
self.tags[tag] = intto8(value)
def read(self) -> bool:
while True:
data = self.f.read(512)
if len(data) not in [0, 512]:
print(f"Block size invalid ({len(data)=})")
return False
if not len(data):
break
block = Block()
if not block.decode(data):
return False
if self.family != Family.INVALID and self.family != block.family:
print(f"Mismatched family ({self.family=} != {block.family=})")
return False
self.family = block.family
self.tags.update(block.tags)
if block.length and not block.flags.not_main_flash:
self.data.append(block)
return True
def dump(self):
print(f"Family: {self.family.name}")
print(f"Tags:")
for k, v in self.tags.items():
if "\\x" not in str(v):
v = v.decode()
else:
v = v.hex()
print(f" - {k.name}: {v}")
print(f"Data chunks: {len(self.data)}")
print(f"Total binary size: {sum(bl.length for bl in self.data)}")
@property
def block_count(self) -> int:
cnt = len(self.data)
if self.tags:
cnt += 1
return cnt
def write_header(self):
bl = Block(self.family)
bl.flags.has_tags = True
bl.flags.not_main_flash = True
bl.block_seq = 0
bl.block_count = self.block_count
bl.tags = self.tags
self.f.write(bl.encode())
def write(self):
if self.tags and self.seq == 0:
self.write_header()
self.seq += 1
bio = BytesIO()
for bl in self.data:
bl.block_count = self.block_count
bl.block_seq = self.seq
bio.write(bl.encode())
if self.seq % 128 == 0:
# write the buffer every 64 KiB
self.f.write(bio.getvalue())
bio = BytesIO()
self.seq += 1
self.f.write(bio.getvalue())

126
tools/uf2ota/uf2_block.py Normal file
View File

@@ -0,0 +1,126 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from math import ceil
from typing import Dict
from models import Family, Flags, Tag
from utils import intto8, inttole24, inttole32, letoint
class Block:
flags: Flags
address: int = 0
length: int = 0
block_seq: int = 0
block_count: int = 0
file_size: int = 0
family: Family
data: bytes = None
md5_data: bytes = None
tags: Dict[Tag, bytes] = {}
def __init__(self, family: Family = Family.INVALID) -> None:
self.flags = Flags()
self.family = family
if self.family != Family.INVALID:
self.flags.has_family_id = True
def encode(self) -> bytes:
self.flags.has_tags = not not self.tags
# UF2 magic 1 and 2
data = b"\x55\x46\x32\x0A\x57\x51\x5D\x9E"
# encode integer variables
data += inttole32(self.flags.encode())
data += inttole32(self.address)
data += inttole32(self.length)
data += inttole32(self.block_seq)
data += inttole32(self.block_count)
if self.flags.file_container:
data += inttole32(self.file_size)
elif self.flags.has_family_id:
data += inttole32(self.family.value)
else:
data += b"\x00\x00\x00\x00"
if not self.data:
self.data = b""
# append tags
tags = b""
if self.flags.has_tags:
for k, v in self.tags.items():
tag_size = 4 + len(v)
tags += intto8(tag_size)
tags += inttole24(k.value)
tags += v
tag_size %= 4
if tag_size:
tags += b"\x00" * (4 - tag_size)
# append block data with padding
data += self.data
data += tags
data += b"\x00" * (476 - len(self.data) - len(tags))
data += b"\x30\x6F\xB1\x0A" # magic 3
return data
def decode(self, data: bytes) -> bool:
# check block size
if len(data) != 512:
print(f"Invalid block size ({len(data)=})")
return False
# check Magic 1
if letoint(data[0:4]) != 0x0A324655:
print(f"Invalid Magic 1 ({data[0:4]=})")
return False
# check Magic 2
if letoint(data[4:8]) != 0x9E5D5157:
print(f"Invalid Magic 2 ({data[4:8]=})")
return False
# check Magic 3
if letoint(data[508:512]) != 0x0AB16F30:
print(f"Invalid Magic 13({data[508:512]=})")
return False
self.flags.decode(letoint(data[8:12]))
self.address = letoint(data[12:16])
self.length = letoint(data[16:20])
self.block_seq = letoint(data[20:24])
self.block_count = letoint(data[24:28])
if self.flags.file_container:
self.file_size = letoint(data[28:32])
if self.flags.has_family_id:
self.family = Family(letoint(data[28:32]))
if self.flags.has_md5:
self.md5_data = data[484:508] # last 24 bytes of data[]
# decode tags
self.tags = {}
if self.flags.has_tags:
tags = data[32 + self.length :]
i = 0
while i < len(tags):
length = tags[i]
if not length:
break
tag_type = letoint(tags[i + 1 : i + 4])
tag_data = tags[i + 4 : i + length]
self.tags[Tag(tag_type)] = tag_data
i += length
i = int(ceil(i / 4) * 4)
self.data = data[32 : 32 + self.length]
return True
def __str__(self) -> str:
flags = self.flags
address = hex(self.address)
length = hex(self.length)
block_seq = self.block_seq
block_count = self.block_count
file_size = self.file_size
family = self.family.name
tags = [(k.name, v) for k, v in self.tags.items()]
return f"Block[{block_seq}/{block_count}]({flags=}, {address=}, {length=}, {file_size=}, {family=}, {tags=})"

166
tools/uf2ota/uf2ota.py Normal file
View File

@@ -0,0 +1,166 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from argparse import ArgumentParser
from zlib import crc32
from models import Family, Tag
from uf2 import UF2
class Input:
ota1_part: str = None
ota1_offs: int = 0
ota1_file: str = None
ota2_part: str = None
ota2_offs: int = 0
ota2_file: str = None
def __init__(self, input: str) -> None:
input = input.split(":")
n = len(input)
if n not in [2, 4]:
print("Incorrect input format - should be part+offs:file[:part+offs:file]")
exit()
# just spread the same image twice for single-OTA scheme
if n == 2:
input += input
if input[0] and input[1]:
if "+" in input[0]:
(self.ota1_part, self.ota1_offs) = input[0].split("+")
self.ota1_offs = int(self.ota1_offs, 16)
else:
self.ota1_part = input[0]
self.ota1_file = input[1]
if input[2] and input[3]:
if "+" in input[2]:
(self.ota2_part, self.ota2_offs) = input[2].split("+")
self.ota2_offs = int(self.ota2_offs, 16)
else:
self.ota2_part = input[2]
self.ota2_file = input[3]
if self.is_simple and self.ota1_offs != self.ota2_offs:
# currently, offsets cannot differ when storing one image only
# (this would require to actually store it twice)
print(
f"Offsets cannot differ in single-image/two-partition scheme ({self.ota1_file})"
)
exit()
@property
def is_single(self) -> bool:
return self.ota1_part == self.ota2_part and self.ota1_file == self.ota2_file
@property
def single_part(self) -> str:
return self.ota1_part or self.ota2_part
@property
def single_offs(self) -> int:
return self.ota1_offs or self.ota2_offs
@property
def single_file(self) -> str:
return self.ota1_file or self.ota2_file
@property
def has_ota1(self) -> bool:
return not not (self.ota1_part and self.ota1_file)
@property
def has_ota2(self) -> bool:
return not not (self.ota2_part and self.ota2_file)
@property
def is_simple(self) -> bool:
return self.ota1_file == self.ota2_file or not (self.has_ota1 and self.has_ota2)
def cli():
parser = ArgumentParser("uf2ota", description="UF2 OTA update format")
parser.add_argument("action", choices=["dump", "write"])
parser.add_argument("inputs", nargs="+", type=str)
parser.add_argument("--output", help="Output .uf2 binary", type=str)
parser.add_argument("--family", help="Family name", type=str)
parser.add_argument("--board", help="Board name/code", type=str)
parser.add_argument("--version", help="LibreTuya core version", type=str)
parser.add_argument("--fw", help="Firmware name:version", type=str)
args = parser.parse_args()
if args.action == "dump":
with open(args.inputs[0], "rb") as f:
uf2 = UF2(f)
if uf2.read():
uf2.dump()
return
out = args.output or "out.uf2"
with open(out, "wb") as f:
uf2 = UF2(f)
try:
uf2.family = next(f for f in Family if f.name == args.family)
except:
families = ", ".join(f.name for f in Family)[9:]
print(f"Invalid family name - should be one of {families}")
return
# store global tags (for entire file)
if not args.board:
print("Missing board name (--board)")
return
uf2.put_str(Tag.BOARD, args.board.lower())
if not args.version:
print("Missing LT version (--version)")
return
uf2.put_str(Tag.LT_VERSION, args.version)
if args.fw:
(fw_name, fw_ver) = args.fw.split(":")
uf2.put_str(Tag.FIRMWARE, fw_name)
uf2.put_str(Tag.VERSION, fw_ver)
uf2.put_str(Tag.DEVICE, "LibreTuya")
key = f"LibreTuya {args.board.lower()}"
uf2.put_int32le(Tag.DEVICE_ID, crc32(key.encode()))
any_ota1 = False
any_ota2 = False
for input in args.inputs:
input = Input(input)
any_ota1 = any_ota1 or input.has_ota1
any_ota2 = any_ota2 or input.has_ota2
if input.is_simple:
# single input image:
# - same image and partition (2 args)
# - same image but different partitions (4 args)
# - only OTA1 image
# - only OTA2 image
with open(input.single_file, "rb") as f:
data = f.read()
# store local tags (for this image only)
tags = {}
tags[Tag.LT_PART_1] = (
input.ota1_part.encode() if input.has_ota1 else b""
)
tags[Tag.LT_PART_2] = (
input.ota2_part.encode() if input.has_ota2 else b""
)
uf2.store(input.single_offs, data, tags)
continue
# different images and partitions for both OTA schemes
raise NotImplementedError("Image binary patching is not yet implemented")
uf2.put_int8(Tag.LT_HAS_OTA1, any_ota1 * 1)
uf2.put_int8(Tag.LT_HAS_OTA2, any_ota2 * 1)
uf2.write()
if __name__ == "__main__":
cli()

41
tools/uf2ota/utils.py Normal file
View File

@@ -0,0 +1,41 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
def bswap(data: bytes) -> bytes:
return bytes(reversed(data))
def betoint(data: bytes) -> int:
return int.from_bytes(data, byteorder="big")
def letoint(data: bytes) -> int:
return int.from_bytes(data, byteorder="little")
def inttobe32(data: int) -> bytes:
return data.to_bytes(length=4, byteorder="big")
def inttole32(data: int) -> bytes:
return data.to_bytes(length=4, byteorder="little")
def inttobe24(data: int) -> bytes:
return data.to_bytes(length=3, byteorder="big")
def inttole24(data: int) -> bytes:
return data.to_bytes(length=3, byteorder="little")
def inttobe16(data: int) -> bytes:
return data.to_bytes(length=2, byteorder="big")
def inttole16(data: int) -> bytes:
return data.to_bytes(length=2, byteorder="little")
def intto8(data: int) -> bytes:
return data.to_bytes(length=1, byteorder="big")