[tools] Implement UF2 binary patching, add dumping images

This commit is contained in:
Kuba Szczodrzyński
2022-05-28 14:47:45 +02:00
parent 5df430f3be
commit b549790798
6 changed files with 373 additions and 111 deletions

99
tools/uf2ota/dump.py Normal file
View File

@@ -0,0 +1,99 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-28.
from io import BytesIO, FileIO
from os import makedirs
from os.path import join
from typing import Dict, Tuple
from models import Opcode, Tag
from uf2 import UF2
from utils import inttole32, letoint, letosint
fs: Dict[str, Tuple[int, FileIO]] = {}
output_dir = ""
output_basename = ""
part1 = ""
part2 = ""
def write(part: str, offs: int, data: bytes):
global fs
if part not in fs or fs[part][0] != offs:
path = join(output_dir, output_basename + part + f"_0x{offs:x}.bin")
f = open(path, "wb")
if part in fs:
fs[part][1].close()
else:
f = fs[part][1]
fs[part] = (offs + f.write(data), f)
def update_parts(tags: Dict[Tag, bytes]):
global part1, part2
if Tag.LT_PART_1 in tags:
part1 = tags[Tag.LT_PART_1].decode()
part1 = ("1_" + part1) if part1 else None
if Tag.LT_PART_2 in tags:
part2 = tags[Tag.LT_PART_2].decode()
part2 = ("2_" + part2) if part2 else None
def uf2_dump(uf2: UF2, outdir: str):
global output_dir, output_basename
makedirs(outdir, exist_ok=True)
if Tag.LT_VERSION not in uf2.tags:
raise RuntimeError("Can only dump LibreTuya firmware images")
output_dir = outdir
output_basename = "_".join(
filter(
None,
[
uf2.tags.get(Tag.FIRMWARE, b"").decode(),
uf2.tags.get(Tag.VERSION, b"").decode(),
"lt" + uf2.tags[Tag.LT_VERSION].decode(),
uf2.tags.get(Tag.BOARD, b"").decode(),
],
)
)
output_basename += "_"
update_parts(uf2.tags)
for block in uf2.data:
# update target partition info
update_parts(block.tags)
# skip empty blocks
if not block.length:
continue
data1 = block.data if part1 else None
data2 = block.data if part2 else None
if Tag.LT_BINPATCH in block.tags:
# type 5, 6
data2 = bytearray(data2)
tag = block.tags[Tag.LT_BINPATCH]
binpatch = BytesIO(tag)
while binpatch.tell() < len(tag):
opcode = Opcode(binpatch.read(1)[0])
length = binpatch.read(1)[0]
data = binpatch.read(length)
if opcode == Opcode.DIFF32:
value = letosint(data[0:4])
for offs in data[4:]:
chunk = data2[offs : offs + 4]
chunk = letoint(chunk)
chunk += value
chunk = inttole32(chunk)
data2[offs : offs + 4] = chunk
data2 = bytes(data2)
if data1:
# types 1, 3, 4
write(part1, block.address, data1)
if data2:
# types 2, 3, 4
write(part2, block.address, data2)

View File

@@ -73,6 +73,10 @@ class Tag(IntEnum):
LT_BINPATCH = 0xB948DE # binary patch to convert OTA1->OTA2
class Opcode(IntEnum):
DIFF32 = 0xFE # difference between 32-bit values
class Flags:
not_main_flash: bool = False
file_container: bool = False
@@ -114,3 +118,71 @@ class Flags:
if self.has_tags:
flags.append("TAG")
return ",".join(flags)
class Input:
ota1_part: str = None
ota1_offs: int = 0
ota1_file: str = None
ota2_part: str = None
ota2_offs: int = 0
ota2_file: str = None
def __init__(self, input: str) -> None:
input = input.split(":")
n = len(input)
if n not in [2, 4]:
raise ValueError(
"Incorrect input format - should be part+offs:file[:part+offs:file]"
)
# just spread the same image twice for single-OTA scheme
if n == 2:
input += input
if input[0] and input[1]:
if "+" in input[0]:
(self.ota1_part, self.ota1_offs) = input[0].split("+")
self.ota1_offs = int(self.ota1_offs, 16)
else:
self.ota1_part = input[0]
self.ota1_file = input[1]
if input[2] and input[3]:
if "+" in input[2]:
(self.ota2_part, self.ota2_offs) = input[2].split("+")
self.ota2_offs = int(self.ota2_offs, 16)
else:
self.ota2_part = input[2]
self.ota2_file = input[3]
if self.ota1_offs != self.ota2_offs:
# currently, offsets cannot differ when storing images
# (this would require to actually store it twice)
raise ValueError(f"Offsets cannot differ ({self.ota1_file})")
@property
def is_single(self) -> bool:
return self.ota1_part == self.ota2_part and self.ota1_file == self.ota2_file
@property
def single_part(self) -> str:
return self.ota1_part or self.ota2_part
@property
def single_offs(self) -> int:
return self.ota1_offs or self.ota2_offs
@property
def single_file(self) -> str:
return self.ota1_file or self.ota2_file
@property
def has_ota1(self) -> bool:
return not not (self.ota1_part and self.ota1_file)
@property
def has_ota2(self) -> bool:
return not not (self.ota2_part and self.ota2_file)
@property
def is_simple(self) -> bool:
return self.ota1_file == self.ota2_file or not (self.has_ota1 and self.has_ota2)

View File

@@ -5,7 +5,7 @@ from typing import Dict, List
from models import Family, Flags, Tag
from uf2_block import Block
from utils import intto8, inttole16, inttole32
from utils import align_down, align_up, intto8, inttole16, inttole32
class UF2:
@@ -56,7 +56,7 @@ class UF2:
def put_int8(self, tag: Tag, value: int):
self.tags[tag] = intto8(value)
def read(self) -> bool:
def read(self, block_tags: bool = True) -> bool:
while True:
data = self.f.read(512)
if len(data) not in [0, 512]:
@@ -73,7 +73,8 @@ class UF2:
return False
self.family = block.family
self.tags.update(block.tags)
if block_tags or not block.length:
self.tags.update(block.tags)
if block.length and not block.flags.not_main_flash:
self.data.append(block)
return True
@@ -98,13 +99,33 @@ class UF2:
return cnt
def write_header(self):
comment = "Hi! Please visit https://kuba2k2.github.io/libretuya/ to read specifications of this file format."
bl = Block(self.family)
bl.flags.has_tags = True
bl.flags.not_main_flash = True
bl.block_seq = 0
bl.block_count = self.block_count
bl.tags = self.tags
self.f.write(bl.encode())
data = bl.encode()
# add comment in the unused space
tags_len = align_up(Block.get_tags_length(bl.tags), 16)
comment_len = len(comment)
if 476 - 16 >= tags_len + comment_len:
space = 476 - 16 - tags_len
start = (space - comment_len) / 2
start = align_down(start, 16)
padding1 = b"\x00" * start
padding2 = b"\x00" * (476 - tags_len - comment_len - start)
data = (
data[0 : 32 + tags_len]
+ padding1
+ comment.encode()
+ padding2
+ data[-4:]
)
self.f.write(data)
def write(self):
if self.tags and self.seq == 0:

View File

@@ -4,7 +4,7 @@ from math import ceil
from typing import Dict
from models import Family, Flags, Tag
from utils import intto8, inttole24, inttole32, letoint
from utils import align_up, intto8, inttole24, inttole32, letoint
class Block:
@@ -114,6 +114,17 @@ class Block:
self.data = data[32 : 32 + self.length]
return True
@staticmethod
def get_tags_length(tags: Dict[Tag, bytes]) -> int:
out = 0
# add tag headers
out += 4 * len(tags)
# add all tag lengths, padded to 4 bytes
out += sum(align_up(l, 4) for l in map(len, tags.values()))
# add final 0x00 tag
out += 4
return out
def __str__(self) -> str:
flags = self.flags
address = hex(self.address)

View File

@@ -3,83 +3,18 @@
from argparse import ArgumentParser
from zlib import crc32
from models import Family, Tag
from dump import uf2_dump
from models import Family, Input, Tag
from uf2 import UF2
from uf2_block import Block
from utils import binpatch32
class Input:
ota1_part: str = None
ota1_offs: int = 0
ota1_file: str = None
ota2_part: str = None
ota2_offs: int = 0
ota2_file: str = None
def __init__(self, input: str) -> None:
input = input.split(":")
n = len(input)
if n not in [2, 4]:
print("Incorrect input format - should be part+offs:file[:part+offs:file]")
exit()
# just spread the same image twice for single-OTA scheme
if n == 2:
input += input
if input[0] and input[1]:
if "+" in input[0]:
(self.ota1_part, self.ota1_offs) = input[0].split("+")
self.ota1_offs = int(self.ota1_offs, 16)
else:
self.ota1_part = input[0]
self.ota1_file = input[1]
if input[2] and input[3]:
if "+" in input[2]:
(self.ota2_part, self.ota2_offs) = input[2].split("+")
self.ota2_offs = int(self.ota2_offs, 16)
else:
self.ota2_part = input[2]
self.ota2_file = input[3]
if self.is_simple and self.ota1_offs != self.ota2_offs:
# currently, offsets cannot differ when storing one image only
# (this would require to actually store it twice)
print(
f"Offsets cannot differ in single-image/two-partition scheme ({self.ota1_file})"
)
exit()
@property
def is_single(self) -> bool:
return self.ota1_part == self.ota2_part and self.ota1_file == self.ota2_file
@property
def single_part(self) -> str:
return self.ota1_part or self.ota2_part
@property
def single_offs(self) -> int:
return self.ota1_offs or self.ota2_offs
@property
def single_file(self) -> str:
return self.ota1_file or self.ota2_file
@property
def has_ota1(self) -> bool:
return not not (self.ota1_part and self.ota1_file)
@property
def has_ota2(self) -> bool:
return not not (self.ota2_part and self.ota2_file)
@property
def is_simple(self) -> bool:
return self.ota1_file == self.ota2_file or not (self.has_ota1 and self.has_ota2)
BLOCK_SIZE = 256
def cli():
parser = ArgumentParser("uf2ota", description="UF2 OTA update format")
parser.add_argument("action", choices=["dump", "write"])
parser.add_argument("action", choices=["info", "dump", "write"])
parser.add_argument("inputs", nargs="+", type=str)
parser.add_argument("--output", help="Output .uf2 binary", type=str)
parser.add_argument("--family", help="Family name", type=str)
@@ -88,11 +23,22 @@ def cli():
parser.add_argument("--fw", help="Firmware name:version", type=str)
args = parser.parse_args()
if args.action == "dump":
if args.action == "info":
with open(args.inputs[0], "rb") as f:
uf2 = UF2(f)
if uf2.read():
uf2.dump()
if not uf2.read():
raise RuntimeError("Reading UF2 failed")
uf2.dump()
return
if args.action == "dump":
input = args.inputs[0]
outdir = input + "_dump"
with open(input, "rb") as f:
uf2 = UF2(f)
if not uf2.read(block_tags=False):
raise RuntimeError("Reading UF2 failed")
uf2_dump(uf2, outdir)
return
out = args.output or "out.uf2"
@@ -103,28 +49,24 @@ def cli():
uf2.family = next(f for f in Family if f.name == args.family)
except:
families = ", ".join(f.name for f in Family)[9:]
print(f"Invalid family name - should be one of {families}")
return
raise ValueError(f"Invalid family name - should be one of {families}")
# store global tags (for entire file)
if not args.board:
print("Missing board name (--board)")
return
uf2.put_str(Tag.BOARD, args.board.lower())
if args.board:
uf2.put_str(Tag.BOARD, args.board.lower())
key = f"LibreTuya {args.board.lower()}"
uf2.put_int32le(Tag.DEVICE_ID, crc32(key.encode()))
if not args.version:
print("Missing LT version (--version)")
return
uf2.put_str(Tag.LT_VERSION, args.version)
if args.version:
uf2.put_str(Tag.LT_VERSION, args.version)
if args.fw:
(fw_name, fw_ver) = args.fw.split(":")
uf2.put_str(Tag.FIRMWARE, fw_name)
uf2.put_str(Tag.VERSION, fw_ver)
uf2.put_int8(Tag.OTA_VERSION, 1)
uf2.put_str(Tag.DEVICE, "LibreTuya")
key = f"LibreTuya {args.board.lower()}"
uf2.put_int32le(Tag.DEVICE_ID, crc32(key.encode()))
any_ota1 = False
any_ota2 = False
@@ -135,6 +77,12 @@ def cli():
any_ota1 = any_ota1 or input.has_ota1
any_ota2 = any_ota2 or input.has_ota2
# store local tags (for this image only)
tags = {
Tag.LT_PART_1: input.ota1_part.encode() if input.has_ota1 else b"",
Tag.LT_PART_2: input.ota2_part.encode() if input.has_ota2 else b"",
}
if input.is_simple:
# single input image:
# - same image and partition (2 args)
@@ -143,19 +91,41 @@ def cli():
# - only OTA2 image
with open(input.single_file, "rb") as f:
data = f.read()
# store local tags (for this image only)
tags = {}
tags[Tag.LT_PART_1] = (
input.ota1_part.encode() if input.has_ota1 else b""
)
tags[Tag.LT_PART_2] = (
input.ota2_part.encode() if input.has_ota2 else b""
)
uf2.store(input.single_offs, data, tags)
uf2.store(input.single_offs, data, tags, block_size=BLOCK_SIZE)
continue
# different images and partitions for both OTA schemes
raise NotImplementedError("Image binary patching is not yet implemented")
with open(input.ota1_file, "rb") as f:
data1 = f.read()
with open(input.ota2_file, "rb") as f:
data2 = f.read()
if len(data1) != len(data2):
raise RuntimeError(
f"Images must have same lengths ({len(data1)} vs {len(data2)})"
)
for i in range(0, len(data1), 256):
block1 = data1[i : i + 256]
block2 = data2[i : i + 256]
if block1 == block2:
# blocks are identical, simply store them
uf2.store(
input.single_offs + i, block1, tags, block_size=BLOCK_SIZE
)
tags = {}
continue
# calculate max binpatch length (incl. existing tags and binpatch tag header)
max_length = 476 - BLOCK_SIZE - Block.get_tags_length(tags) - 4
# try 32-bit binpatch for best space optimization
binpatch = binpatch32(block1, block2, bladdr=i)
if len(binpatch) > max_length:
raise RuntimeError(
f"Binary patch too long - {len(binpatch)} > {max_length}"
)
tags[Tag.LT_BINPATCH] = binpatch
uf2.store(input.single_offs + i, block1, tags, block_size=BLOCK_SIZE)
tags = {}
uf2.put_int8(Tag.LT_HAS_OTA1, any_ota1 * 1)
uf2.put_int8(Tag.LT_HAS_OTA2, any_ota2 * 1)

View File

@@ -1,6 +1,11 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from typing import Dict, List, Tuple
from models import Opcode
def bswap(data: bytes) -> bytes:
return bytes(reversed(data))
@@ -13,29 +18,113 @@ def letoint(data: bytes) -> int:
return int.from_bytes(data, byteorder="little")
def inttobe32(data: int) -> bytes:
return data.to_bytes(length=4, byteorder="big")
def betosint(data: bytes) -> int:
return int.from_bytes(data, byteorder="big", signed=True)
def letosint(data: bytes) -> int:
return int.from_bytes(data, byteorder="little", signed=True)
def inttole32(data: int) -> bytes:
return data.to_bytes(length=4, byteorder="little")
def inttobe24(data: int) -> bytes:
return data.to_bytes(length=3, byteorder="big")
def inttole24(data: int) -> bytes:
return data.to_bytes(length=3, byteorder="little")
def inttobe16(data: int) -> bytes:
return data.to_bytes(length=2, byteorder="big")
def inttole16(data: int) -> bytes:
return data.to_bytes(length=2, byteorder="little")
def intto8(data: int) -> bytes:
return data.to_bytes(length=1, byteorder="big")
def sinttole32(data: int) -> bytes:
return data.to_bytes(length=4, byteorder="little", signed=True)
def sinttole24(data: int) -> bytes:
return data.to_bytes(length=3, byteorder="little", signed=True)
def sinttole16(data: int) -> bytes:
return data.to_bytes(length=2, byteorder="little", signed=True)
def sintto8(data: int) -> bytes:
return data.to_bytes(length=1, byteorder="little", signed=True)
def align_up(x: int, n: int) -> int:
return int((x - 1) // n + 1) * n
def align_down(x: int, n: int) -> int:
return int(x // n) * n
def bindiff(
data1: bytes, data2: bytes, width: int = 1, single: bool = False
) -> Dict[int, Tuple[bytes, bytes]]:
out: Dict[int, Tuple[bytes, bytes]] = {}
offs = -1
diff1 = b""
diff2 = b""
for i in range(0, len(data1), width):
block1 = data1[i : i + width]
block2 = data2[i : i + width]
if block1 == block2:
# blocks are equal again
if offs != -1:
# store and reset current difference
out[offs] = (diff1, diff2)
offs = -1
diff1 = b""
diff2 = b""
continue
# blocks still differ
if single:
# single block per difference, so just store it
out[i] = (block1, block2)
else:
if offs == -1:
# difference starts here
offs = i
diff1 += block1
diff2 += block2
return out
def binpatch32(block1: bytes, block2: bytes, bladdr: int = 0) -> bytes:
# compare blocks:
# - in 4 byte (32 bit) chunks
# - report a single chunk in each difference
diffs = bindiff(block1, block2, width=4, single=True)
binpatch: Dict[int, List[int]] = {}
# gather all repeating differences (i.e. memory offsets for OTA1/OTA2)
for offs, diff in diffs.items():
(diff1, diff2) = diff
diff1 = letoint(diff1)
diff2 = letoint(diff2)
diff = diff2 - diff1
if diff in binpatch:
# difference already in this binpatch, add the offset
binpatch[diff].append(offs)
else:
# a new difference value
binpatch[diff] = [offs]
# print(f"Block at 0x{bladdr:x}+{offs:02x} -> {diff1:08x} - {diff2:08x} = {diff2-diff1:x}")
# print(f"Block at 0x{bladdr:x}: {len(binpatch)} difference(s) at {sum(len(v) for v in binpatch.values())} offsets")
# write binary patches
out = b""
for diff, offs in binpatch.items():
out += intto8(Opcode.DIFF32.value)
out += intto8(len(offs) + 4)
out += sinttole32(diff)
out += bytes(offs)
return out