# Copyright (c) Kuba SzczodrzyƄski 2022-06-10. import sys from os.path import dirname, join sys.path.append(join(dirname(__file__), "..", "..")) from argparse import ArgumentParser, FileType from binascii import crc32 from dataclasses import dataclass, field from enum import Enum, IntFlag from io import SEEK_SET, FileIO from os import stat from struct import Struct from time import time from typing import Generator, Tuple, Union from tools.util.bitint import BitInt from tools.util.bkcrypto import BekenCrypto from tools.util.crc16 import CRC16 from tools.util.fileio import readbin, writebin from tools.util.intbin import ( ByteGenerator, align_up, betoint, biniter, fileiter, geniter, inttobe16, inttole32, letoint, pad_data, pad_up, ) class DataType(Enum): BINARY = "BINARY" PADDING_SIZE = "PADDING_SIZE" RBL = "RBL" DataTuple = Tuple[DataType, Union[bytes, int]] DataUnion = Union[bytes, DataTuple] DataGenerator = Generator[DataUnion, None, None] class OTAAlgorithm(IntFlag): NONE = 0 CRYPT_XOR = 1 CRYPT_AES256 = 2 COMPRESS_GZIP = 256 COMPRESS_QUICKLZ = 512 COMPRESS_FASTLZ = 768 @dataclass class RBL: ota_algo: OTAAlgorithm = OTAAlgorithm.NONE timestamp: float = field(default_factory=time) name: Union[str, bytes] = "app" version: Union[str, bytes] = "1.00" sn: Union[str, bytes] = "0" * 23 data_crc: int = 0 data_hash: int = 0x811C9DC5 # https://github.com/znerol/py-fnvhash/blob/master/fnvhash/__init__.py raw_size: int = 0 data_size: int = 0 container_size: int = 0 has_part_table: bool = False @property def container_size_crc(self) -> int: return int(self.container_size + (self.container_size // 32) * 2) def update(self, data: bytes): self.data_crc = crc32(data, self.data_crc) for byte in data: if self.data_size < self.raw_size: self.data_hash ^= byte self.data_hash *= 0x01000193 self.data_hash %= 0x100000000 self.data_size += 1 def serialize(self) -> bytes: if isinstance(self.name, str): self.name = self.name.encode() if isinstance(self.version, str): self.version = self.version.encode() if isinstance(self.sn, str): self.sn = self.sn.encode() # based on https://github.com/khalednassar/bk7231tools/blob/main/bk7231tools/analysis/rbl.py struct = Struct("<4sII16s24s24sIIII") # without header CRC rbl = struct.pack( b"RBL\x00", self.ota_algo, int(self.timestamp), pad_data(self.name, 16, 0x00), pad_data(self.version, 24, 0x00), pad_data(self.sn, 24, 0x00), self.data_crc, self.data_hash, self.raw_size, self.data_size, ) return rbl + inttole32(crc32(rbl)) @classmethod def deserialize(cls, data: bytes) -> "RBL": crc_found = letoint(data[-4:]) data = data[:-4] crc_expected = crc32(data) if crc_expected != crc_found: raise ValueError( f"Invalid RBL CRC (expected {crc_expected:X}, found {crc_found:X})" ) struct = Struct(" None: if coeffs: if isinstance(coeffs, str): coeffs = bytes.fromhex(coeffs) if len(coeffs) != 16: raise ValueError( f"Invalid length of encryption coefficients: {len(coeffs)}" ) coeffs = list(map(BitInt, map(betoint, biniter(coeffs, 4)))) self.crypto = BekenCrypto(coeffs) def crc(self, data: ByteGenerator, type: DataType = None) -> DataGenerator: for block in geniter(data, 32): crc = CRC16.CMS.calc(block) block += inttobe16(crc) if type: yield (type, block) else: yield block def uncrc(self, data: ByteGenerator, check: bool = True) -> ByteGenerator: for block in geniter(data, 34): if check: crc = CRC16.CMS.calc(block[0:32]) crc_found = betoint(block[32:34]) if crc != crc_found: print(f"CRC invalid: expected={crc:X}, found={crc_found:X}") return yield block[0:32] def crypt(self, addr: int, data: ByteGenerator) -> ByteGenerator: for word in geniter(data, 4): word = letoint(word) word = self.crypto.encrypt_u32(addr, word) word = inttole32(word) yield word addr += 4 def package( self, f: FileIO, addr: int, size: int, rbl: RBL, partial: bool = False, ) -> DataGenerator: if not rbl.container_size: raise ValueError("RBL must have a total size when packaging") crc_total = 0 # yield all data as (type, bytes) tuples, if partial mode enabled type_binary = DataType.BINARY if partial else None type_padding = DataType.PADDING_SIZE if partial else None type_rbl = DataType.RBL if partial else None # when to stop reading input data data_end = size if rbl.has_part_table: data_end = size - 0xC0 # do not encrypt the partition table # set RBL size including one 16-byte padding rbl.raw_size = align_up(size + 16, 32) + 16 # encrypt the input file, padded to 32 bytes data_crypt_gen = self.crypt( addr, fileiter(f, size=32, padding=0xFF, count=data_end) ) # iterate over encrypted 32-byte blocks for block in geniter(data_crypt_gen, 32): # add CRC16 and yield yield from self.crc(block, type_binary) crc_total += 2 rbl.update(block) # temporary buffer for small-size operations buf = b"\xff" * 16 # add 16 bytes of padding if rbl.has_part_table: # add an unencrypted partition table buf += f.read(0xC0) # update RBL rbl.update(buf) # add last padding with different values rbl.update(b"\x10" * 16) # add last padding with normal values buf += b"\xff" * 16 # yield the temporary buffer yield from self.crc(buf, type_binary) crc_total += 2 * (len(buf) // 32) # pad the entire container with 0xFF, excluding RBL and its CRC16 pad_size = pad_up(rbl.data_size + crc_total, rbl.container_size_crc) - 102 if type_padding: yield (type_padding, pad_size) else: for _ in range(pad_size): yield b"\xff" # yield RBL with CRC16 yield from self.crc(rbl.serialize(), type_rbl) def auto_int(x): return int(x, 0) def add_common_args(parser): parser.add_argument( "coeffs", type=str, help="Encryption coefficients (hex string, 32 chars)" ) parser.add_argument("input", type=FileType("rb"), help="Input file") parser.add_argument("output", type=FileType("wb"), help="Output file") parser.add_argument("addr", type=auto_int, help="Memory address (dec/hex)") if __name__ == "__main__": parser = ArgumentParser(description="Encrypt/decrypt Beken firmware binaries") sub = parser.add_subparsers(dest="action", required=True) encrypt = sub.add_parser("encrypt", help="Encrypt binary files without packaging") add_common_args(encrypt) encrypt.add_argument("-c", "--crc", help="Include CRC16", action="store_true") decrypt = sub.add_parser("decrypt", description="Decrypt unpackaged binary files") add_common_args(decrypt) decrypt.add_argument( "-C", "--no-crc-check", help="Do not check CRC16 (if present)", action="store_true", ) package = sub.add_parser( "package", description="Package raw binary files as RBL containers" ) add_common_args(package) package.add_argument( "size", type=auto_int, help="RBL total size (excl. CRC) (dec/hex)" ) package.add_argument( "-n", "--name", type=str, help="Firmware name (default: app)", default="app", required=False, ) package.add_argument( "-v", "--version", type=str, help="Firmware version (default: 1.00)", default="1.00", required=False, ) unpackage = sub.add_parser( "unpackage", description="Unpackage a single RBL container" ) add_common_args(unpackage) unpackage.add_argument( "offset", type=auto_int, help="Offset in input file (dec/hex)" ) unpackage.add_argument( "size", type=auto_int, help="Container total size (incl. CRC) (dec/hex)" ) args = parser.parse_args() bk = BekenBinary(args.coeffs) f: FileIO = args.input size = stat(args.input.name).st_size start = time() if args.action == "encrypt": print(f"Encrypting '{f.name}' ({size} bytes)") if args.crc: print(f" - calculating 32-byte block CRC16...") gen = bk.crc(bk.crypt(args.addr, f)) else: print(f" - as raw binary, without CRC16...") gen = bk.crypt(args.addr, f) if args.action == "decrypt": print(f"Decrypting '{f.name}' ({size} bytes)") if size % 34 == 0: if args.no_crc_check: print(f" - has CRC16, skipping checks...") else: print(f" - has CRC16, checking...") gen = bk.crypt(args.addr, bk.uncrc(f, check=not args.no_crc_check)) elif size % 4 != 0: raise ValueError("Input file has invalid length") else: print(f" - raw binary, no CRC") gen = bk.crypt(args.addr, f) if args.action == "package": print(f"Packaging {args.name} '{f.name}' for memory address 0x{args.addr:X}") rbl = RBL(name=args.name, version=args.version) if args.name == "bootloader": rbl.has_part_table = True print(f" - in bootloader mode; partition table unencrypted") rbl.container_size = args.size print(f" - container size (excl. CRC): 0x{rbl.container_size:X}") print(f" - container size (incl. CRC): 0x{rbl.container_size_crc:X}") gen = bk.package(f, args.addr, size, rbl) if args.action == "unpackage": print(f"Unpackaging '{f.name}' (at 0x{args.offset:X}, size 0x{args.size:X})") f.seek(args.offset + args.size - 102, SEEK_SET) rbl = f.read(102) rbl = b"".join(bk.uncrc(rbl)) rbl = RBL.deserialize(rbl) print(f" - found '{rbl.name}' ({rbl.version}), size {rbl.data_size}") f.seek(0, SEEK_SET) crc_size = (rbl.data_size - 16) // 32 * 34 gen = bk.crypt(args.addr, bk.uncrc(fileiter(f, 32, 0xFF, crc_size))) written = 0 for data in gen: args.output.write(data) written += len(data) print(f" - wrote {written} bytes in {time()-start:.3f} s")