diff --git a/TODO.md b/TODO.md index e857982..fe12c46 100644 --- a/TODO.md +++ b/TODO.md @@ -14,8 +14,7 @@ ### Tools -- move all UF2 assembling/uploading/processing tools (as well as `uf2ota` C library) to a separate repository, possibly rewriting parts of it again. Make these tools CLI-usable -- write OpenOCD flashers, using uf2ota library + FAL for partitions (same repo as above) +- write OpenOCD flashers, using uf2ota library + FAL for partitions (in ltchiptool repository) ### Serial @@ -31,10 +30,8 @@ ## BK7231 -- WiFi events - implement OTA - fix WiFi on BK7231N, test other functionality -- add generic board definition - fix SSL (mbedTLS) - I2C (Wire) - SPI @@ -42,6 +39,5 @@ ## RTL8710B -- add generic board definition - move to GNU++11 (and verify that it works) - take all stdio functions from stdio.h - rewrite most of Wiring (it was copied from `ambd_arduino`, and is ugly) diff --git a/arduino/libretuya/libraries/Update/Update.h b/arduino/libretuya/libraries/Update/Update.h index dbc6ada..6c5e4ca 100644 --- a/arduino/libretuya/libraries/Update/Update.h +++ b/arduino/libretuya/libraries/Update/Update.h @@ -2,8 +2,7 @@ #include #include - -#include "uf2ota/uf2ota.h" +#include // No Error #define UPDATE_ERROR_OK (0) diff --git a/arduino/libretuya/libraries/Update/uf2ota/uf2binpatch.c b/arduino/libretuya/libraries/Update/uf2ota/uf2binpatch.c deleted file mode 100644 index 9c510f0..0000000 --- a/arduino/libretuya/libraries/Update/uf2ota/uf2binpatch.c +++ /dev/null @@ -1,32 +0,0 @@ -/* Copyright (c) Kuba Szczodrzyński 2022-05-29. */ - -#include "uf2priv.h" - -uf2_err_t uf2_binpatch(uint8_t *data, const uint8_t *binpatch, uint8_t binpatch_len) { - const uint8_t *binpatch_end = binpatch + binpatch_len; - // +2 to make sure opcode and length is present - while ((binpatch + 2) < binpatch_end) { - uf2_opcode_t opcode = binpatch[0]; - uint8_t len = binpatch[1]; - switch (opcode) { - case UF2_OPC_DIFF32: - uf2_binpatch_diff32(data, binpatch + 1); - break; - } - // advance by opcode + length + data - binpatch += len + 2; - } - return UF2_ERR_OK; -} - -void uf2_binpatch_diff32(uint8_t *data, const uint8_t *patch) { - uint8_t num_offs = patch[0] - 4; // read offset count - uint32_t diff = *((uint32_t *)(patch + 1)); // read diff value - patch += 5; // skip num_offs and diff value - for (uint8_t i = 0; i < num_offs; i++) { - // patch the data - uint8_t offs = patch[i]; - uint32_t *value = (uint32_t *)(data + offs); - *(value) += diff; - } -} diff --git a/arduino/libretuya/libraries/Update/uf2ota/uf2binpatch.h b/arduino/libretuya/libraries/Update/uf2ota/uf2binpatch.h deleted file mode 100644 index 33068ce..0000000 --- a/arduino/libretuya/libraries/Update/uf2ota/uf2binpatch.h +++ /dev/null @@ -1,26 +0,0 @@ -/* Copyright (c) Kuba Szczodrzyński 2022-05-29. */ - -#pragma once - -#include "uf2types.h" - -/** - * @brief Apply binary patch to data. - * - * @param data input data - * @param data_len input data length - * @param binpatch binary patch data - * @param binpatch_len binary patch data length - * @return uf2_err_t error code - */ -uf2_err_t uf2_binpatch(uint8_t *data, const uint8_t *binpatch, uint8_t binpatch_len); - -/** - * Apply DIFF32 binary patch. - * - * @param data input data - * @param len input data length - * @param patch patch data, incl. length byte - * @return uf2_err_t error code - */ -void uf2_binpatch_diff32(uint8_t *data, const uint8_t *patch); diff --git a/arduino/libretuya/libraries/Update/uf2ota/uf2ota.c b/arduino/libretuya/libraries/Update/uf2ota/uf2ota.c deleted file mode 100644 index 1d2d4de..0000000 --- a/arduino/libretuya/libraries/Update/uf2ota/uf2ota.c +++ /dev/null @@ -1,100 +0,0 @@ -/* Copyright (c) Kuba Szczodrzyński 2022-05-29. */ - -#include "uf2priv.h" - -uf2_ota_t *uf2_ctx_init(uint8_t ota_idx, uint32_t family_id) { - uf2_ota_t *ctx = (uf2_ota_t *)zalloc(sizeof(uf2_ota_t)); - ctx->ota_idx = ota_idx; - ctx->family_id = family_id; - return ctx; -} - -uf2_info_t *uf2_info_init() { - uf2_info_t *info = (uf2_info_t *)zalloc(sizeof(uf2_info_t)); - return info; -} - -void uf2_info_free(uf2_info_t *info) { - if (!info) - return; - free(info->fw_name); - free(info->fw_version); - free(info->lt_version); - free(info->board); - free(info); -} - -uf2_err_t uf2_check_block(uf2_ota_t *ctx, uf2_block_t *block) { - if (block->magic1 != UF2_MAGIC_1) - return UF2_ERR_MAGIC; - if (block->magic2 != UF2_MAGIC_2) - return UF2_ERR_MAGIC; - if (block->magic3 != UF2_MAGIC_3) - return UF2_ERR_MAGIC; - if (block->file_container) - // ignore file containers, for now - return UF2_ERR_IGNORE; - if (!block->has_family_id || block->file_size != ctx->family_id) - // require family_id - return UF2_ERR_FAMILY; - return UF2_ERR_OK; -} - -uf2_err_t uf2_parse_header(uf2_ota_t *ctx, uf2_block_t *block, uf2_info_t *info) { - if (!block->has_tags || block->file_container || block->len) - // header must have tags and no data - return UF2_ERR_NOT_HEADER; - - uf2_err_t err = uf2_parse_block(ctx, block, info); - if (err) - return err; - - if ((ctx->ota_idx == 1 && !ctx->has_ota1) || !ctx->has_ota2) - return UF2_ERR_OTA_WRONG; - return UF2_ERR_OK; -} - -uf2_err_t uf2_write(uf2_ota_t *ctx, uf2_block_t *block) { - if (ctx->seq == 0) - return uf2_parse_header(ctx, block, NULL); - if (block->not_main_flash || !block->len) - // ignore blocks not meant for flashing - return UF2_ERR_IGNORE; - - uf2_err_t err = uf2_parse_block(ctx, block, NULL); - if (err) - return err; - - if (!ctx->part1 && !ctx->part2) - // no partitions set at all - return UF2_ERR_PART_UNSET; - - fal_partition_t part = uf2_get_target_part(ctx); - if (!part) - // image is not for current OTA scheme - return UF2_ERR_IGNORE; - - if (ctx->ota_idx == 2 && ctx->binpatch_len) { - // apply binpatch - err = uf2_binpatch(block->data, ctx->binpatch, ctx->binpatch_len); - if (err) - return err; - } - - int ret; - // erase sectors if needed - if (!uf2_is_erased(ctx, block->addr, block->len)) { - ret = fal_partition_erase(part, block->addr, block->len); - if (ret < 0) - return UF2_ERR_ERASE_FAILED; - ctx->erased_offset = block->addr; - ctx->erased_length = ret; - } - // write data to flash - ret = fal_partition_write(part, block->addr, block->data, block->len); - if (ret < 0) - return UF2_ERR_WRITE_FAILED; - if (ret != block->len) - return UF2_ERR_WRITE_LENGTH; - return UF2_ERR_OK; -} diff --git a/arduino/libretuya/libraries/Update/uf2ota/uf2ota.h b/arduino/libretuya/libraries/Update/uf2ota/uf2ota.h deleted file mode 100644 index aadfbbb..0000000 --- a/arduino/libretuya/libraries/Update/uf2ota/uf2ota.h +++ /dev/null @@ -1,68 +0,0 @@ -/* Copyright (c) Kuba Szczodrzyński 2022-05-28. */ - -#pragma once - -#ifdef __cplusplus -extern "C" { -#endif // __cplusplus - -#include "uf2types.h" - -/** - * @brief Create an UF2 OTA context. - * - * @param ota_idx target OTA index - * @param family_id expected family ID - * @return uf2_ota_t* heap-allocated structure - */ -uf2_ota_t *uf2_ctx_init(uint8_t ota_idx, uint32_t family_id); - -/** - * @brief Create an UF2 Info structure. - * - * @return uf2_info_t* heap-allocated structure - */ -uf2_info_t *uf2_info_init(); - -/** - * @brief Free values in the info structure AND the structure itself. - * - * @param info structure to free; may be NULL - */ -void uf2_info_free(uf2_info_t *info); - -/** - * @brief Check if block is valid. - * - * @param ctx context - * @param block block to check - * @return uf2_err_t error code; UF2_ERR_OK and UF2_ERR_IGNORE denote valid blocks - */ -uf2_err_t uf2_check_block(uf2_ota_t *ctx, uf2_block_t *block); - -/** - * @brief Parse header block (LibreTuya UF2 first block). - * - * Note: caller should call uf2_check_block() first. - * - * @param ctx context - * @param block block to parse - * @param info structure to write firmware info, NULL if not used - * @return uf2_err_t error code - */ -uf2_err_t uf2_parse_header(uf2_ota_t *ctx, uf2_block_t *block, uf2_info_t *info); - -/** - * @brief Write the block to flash memory. - * - * Note: caller should call uf2_check_block() first. - * - * @param ctx context - * @param block block to write - * @return uf2_err_t error code - */ -uf2_err_t uf2_write(uf2_ota_t *ctx, uf2_block_t *block); - -#ifdef __cplusplus -} // extern "C" -#endif diff --git a/arduino/libretuya/libraries/Update/uf2ota/uf2priv.c b/arduino/libretuya/libraries/Update/uf2ota/uf2priv.c deleted file mode 100644 index eee59f6..0000000 --- a/arduino/libretuya/libraries/Update/uf2ota/uf2priv.c +++ /dev/null @@ -1,146 +0,0 @@ -/* Copyright (c) Kuba Szczodrzyński 2022-05-29. */ - -#include "uf2priv.h" - -uf2_err_t uf2_parse_block(uf2_ota_t *ctx, uf2_block_t *block, uf2_info_t *info) { - if (block->block_seq != ctx->seq) - // sequence number must match - return UF2_ERR_SEQ_MISMATCH; - ctx->seq++; // increment sequence number after checking it - - if (!block->has_tags) - // no tags in this block, no further processing needed - return UF2_ERR_OK; - - if (block->len > (476 - 4 - 4)) - // at least one tag + last tag must fit - return UF2_ERR_DATA_TOO_LONG; - - uint8_t *tags_start = block->data + block->len; - uint8_t tags_len = 476 - block->len; - uint8_t tags_pos = 0; - if (block->has_md5) - tags_len -= 24; - - ctx->binpatch_len = 0; // binpatch applies to one block only - char *part1 = NULL; - char *part2 = NULL; - - uf2_tag_type_t type; - while (tags_pos < tags_len) { - uint8_t len = uf2_read_tag(tags_start + tags_pos, &type); - if (!len) - break; - tags_pos += 4; // skip tag header - uint8_t *tag = tags_start + tags_pos; - - char **str_dest = NULL; // char* to copy the tag into - - switch (type) { - case UF2_TAG_OTA_VERSION: - if (tag[0] != 1) - return UF2_ERR_OTA_VER; - break; - case UF2_TAG_FIRMWARE: - if (info) - str_dest = &(info->fw_name); - break; - case UF2_TAG_VERSION: - if (info) - str_dest = &(info->fw_version); - break; - case UF2_TAG_LT_VERSION: - if (info) - str_dest = &(info->lt_version); - break; - case UF2_TAG_BOARD: - if (info) - str_dest = &(info->board); - break; - case UF2_TAG_HAS_OTA1: - ctx->has_ota1 = tag[0]; - break; - case UF2_TAG_HAS_OTA2: - ctx->has_ota2 = tag[0]; - break; - case UF2_TAG_PART_1: - str_dest = &(part1); - break; - case UF2_TAG_PART_2: - str_dest = &(part2); - break; - case UF2_TAG_BINPATCH: - ctx->binpatch = tag; - ctx->binpatch_len = len; - break; - default: - break; - } - - if (str_dest) { - *str_dest = (char *)zalloc(len + 1); - memcpy(*str_dest, tag, len); - } - // align position to 4 bytes - tags_pos += (((len - 1) / 4) + 1) * 4; - } - - if (part1 && part2) { - // update current target partition - uf2_err_t err = uf2_update_parts(ctx, part1, part2); - if (err) - return err; - } else if (part1 || part2) { - // only none or both partitions can be specified - return UF2_ERR_PART_ONE; - } - - return UF2_ERR_OK; -} - -uint8_t uf2_read_tag(const uint8_t *data, uf2_tag_type_t *type) { - uint8_t len = data[0]; - if (!len) - return 0; - uint32_t tag_type = *((uint32_t *)data); - if (!tag_type) - return 0; - *type = tag_type >> 8; // remove tag length byte - return len - 4; -} - -uf2_err_t uf2_update_parts(uf2_ota_t *ctx, char *part1, char *part2) { - // reset both target partitions - ctx->part1 = NULL; - ctx->part2 = NULL; - // reset offsets as they probably don't apply to this partition - ctx->erased_offset = 0; - ctx->erased_length = 0; - - if (part1[0]) { - ctx->part1 = (fal_partition_t)fal_partition_find(part1); - if (!ctx->part1) - return UF2_ERR_PART_404; - } - if (part2[0]) { - ctx->part2 = (fal_partition_t)fal_partition_find(part2); - if (!ctx->part2) - return UF2_ERR_PART_404; - } - - return UF2_ERR_OK; -} - -fal_partition_t uf2_get_target_part(uf2_ota_t *ctx) { - if (ctx->ota_idx == 1) - return ctx->part1; - if (ctx->ota_idx == 2) - return ctx->part2; - return NULL; -} - -bool uf2_is_erased(uf2_ota_t *ctx, uint32_t offset, uint32_t length) { - uint32_t erased_end = ctx->erased_offset + ctx->erased_length; - uint32_t end = offset + length; - return (offset >= ctx->erased_offset) && (end <= erased_end); -} diff --git a/arduino/libretuya/libraries/Update/uf2ota/uf2priv.h b/arduino/libretuya/libraries/Update/uf2ota/uf2priv.h deleted file mode 100644 index 6b6692c..0000000 --- a/arduino/libretuya/libraries/Update/uf2ota/uf2priv.h +++ /dev/null @@ -1,61 +0,0 @@ -/* Copyright (c) Kuba Szczodrzyński 2022-05-28. */ - -#pragma once - -// include family stdlib APIs -#include - -#include "uf2binpatch.h" -#include "uf2types.h" - -/** - * @brief Parse a block and extract information from tags. - * - * @param ctx context - * @param block block to parse - * @param info structure to write firmware info, NULL if not used - * @return uf2_err_t error code - */ -uf2_err_t uf2_parse_block(uf2_ota_t *ctx, uf2_block_t *block, uf2_info_t *info); - -/** - * @brief Parse a tag. - * - * @param data pointer to tag header beginning - * @param type [out] parsed tag type - * @return uint8_t parsed tag data length (excl. header); 0 if invalid/last tag - */ -uint8_t uf2_read_tag(const uint8_t *data, uf2_tag_type_t *type); - -/** - * @brief Update destination partitions in context. - * - * Partition names cannot be NULL. - * - * Returns UF2_ERR_IGNORE if specified partitions don't match the - * current OTA index. - * - * @param ctx context - * @param part1 partition 1 name or empty string - * @param part2 partition 2 name or empty string - * @return uf2_err_t error code - */ -uf2_err_t uf2_update_parts(uf2_ota_t *ctx, char *part1, char *part2); - -/** - * @brief Get target flashing partition, depending on OTA index. - * - * @param ctx context - * @return fal_partition_t target partition or NULL if not set - */ -fal_partition_t uf2_get_target_part(uf2_ota_t *ctx); - -/** - * Check if specified flash memory region was already erased during update. - * - * @param ctx context - * @param offset offset to check - * @param length length to check - * @return bool true/false - */ -bool uf2_is_erased(uf2_ota_t *ctx, uint32_t offset, uint32_t length); diff --git a/arduino/libretuya/libraries/Update/uf2ota/uf2types.h b/arduino/libretuya/libraries/Update/uf2ota/uf2types.h deleted file mode 100644 index 2ecf80a..0000000 --- a/arduino/libretuya/libraries/Update/uf2ota/uf2types.h +++ /dev/null @@ -1,104 +0,0 @@ -/* Copyright (c) Kuba Szczodrzyński 2022-05-28. */ - -#pragma once - -#include -#include - -#include - -#define UF2_MAGIC_1 0x0A324655 -#define UF2_MAGIC_2 0x9E5D5157 -#define UF2_MAGIC_3 0x0AB16F30 - -#define UF2_BLOCK_SIZE sizeof(uf2_block_t) - -typedef struct __attribute__((packed)) { - // 32 byte header - uint32_t magic1; - uint32_t magic2; - - // flags split as bitfields - bool not_main_flash : 1; - uint16_t dummy1 : 11; - bool file_container : 1; - bool has_family_id : 1; - bool has_md5 : 1; - bool has_tags : 1; - uint16_t dummy2 : 16; - - uint32_t addr; - uint32_t len; - uint32_t block_seq; - uint32_t block_count; - uint32_t file_size; // or familyID; - uint8_t data[476]; - uint32_t magic3; -} uf2_block_t; - -typedef struct { - uint32_t seq; // current block sequence number - - uint8_t *binpatch; // current block's binpatch (if any) -> pointer inside block->data - uint8_t binpatch_len; // binpatch length - - bool has_ota1; // image has any data for OTA1 - bool has_ota2; // image has any data for OTA2 - - uint8_t ota_idx; // target OTA index - uint32_t family_id; // expected family ID - - uint32_t erased_offset; // offset of region erased during update - uint32_t erased_length; // length of erased region - - fal_partition_t part1; // OTA1 target partition - fal_partition_t part2; // OTA2 target partition -} uf2_ota_t; - -typedef struct { - char *fw_name; - char *fw_version; - char *lt_version; - char *board; -} uf2_info_t; - -typedef enum { - UF2_TAG_VERSION = 0x9FC7BC, // version of firmware file - UTF8 semver string - UF2_TAG_PAGE_SIZE = 0x0BE9F7, // page size of target device (32 bit unsigned number) - UF2_TAG_SHA2 = 0xB46DB0, // SHA-2 checksum of firmware (can be of various size) - UF2_TAG_DEVICE = 0x650D9D, // description of device (UTF8) - UF2_TAG_DEVICE_ID = 0xC8A729, // device type identifier - // LibreTuya custom, tags - UF2_TAG_OTA_VERSION = 0x5D57D0, // format version - UF2_TAG_BOARD = 0xCA25C8, // board name (lowercase code) - UF2_TAG_FIRMWARE = 0x00DE43, // firmware description / name - UF2_TAG_BUILD_DATE = 0x822F30, // build date/time as Unix timestamp - UF2_TAG_LT_VERSION = 0x59563D, // LT version (semver) - UF2_TAG_PART_1 = 0x805946, // OTA1 partition name - UF2_TAG_PART_2 = 0xA1E4D7, // OTA2 partition name - UF2_TAG_HAS_OTA1 = 0xBBD965, // image has any data for OTA1 - UF2_TAG_HAS_OTA2 = 0x92280E, // image has any data for OTA2 - UF2_TAG_BINPATCH = 0xB948DE, // binary patch to convert OTA1->OTA2 -} uf2_tag_type_t; - -typedef enum { - UF2_OPC_DIFF32 = 0xFE, -} uf2_opcode_t; - -typedef enum { - UF2_ERR_OK = 0, - UF2_ERR_IGNORE, // block should be ignored - UF2_ERR_MAGIC, // wrong magic numbers - UF2_ERR_FAMILY, // family ID mismatched - UF2_ERR_NOT_HEADER, // block is not a header - UF2_ERR_OTA_VER, // unknown/invalid OTA format version - UF2_ERR_OTA_WRONG, // no data for current OTA index - UF2_ERR_PART_404, // no partition with that name - UF2_ERR_PART_ONE, // only one partition tag in a block - UF2_ERR_PART_UNSET, // image broken - attempted to write without target partition - UF2_ERR_DATA_TOO_LONG, // data too long - tags won't fit - UF2_ERR_SEQ_MISMATCH, // sequence number mismatched - UF2_ERR_ERASE_FAILED, // erasing flash failed - UF2_ERR_WRITE_FAILED, // writing to flash failed - UF2_ERR_WRITE_LENGTH, // wrote fewer data than requested -} uf2_err_t; diff --git a/builder/arduino-common.py b/builder/arduino-common.py index a3ec38c..a6b4367 100644 --- a/builder/arduino-common.py +++ b/builder/arduino-common.py @@ -76,6 +76,19 @@ env.AddLibrary( ], ) +# Sources - uf2ota library +ltchiptool_dir = platform.get_package_dir(f"tool-ltchiptool") +env.AddLibrary( + name="uf2ota", + base_dir=ltchiptool_dir, + srcs=[ + "+", + ], + includes=[ + "+<.>", + ], +) + # Sources - board variant env.AddLibrary( name="board_${VARIANT}", diff --git a/builder/frameworks/beken-72xx-sdk.py b/builder/frameworks/beken-72xx-sdk.py index 829ca0b..30bab99 100644 --- a/builder/frameworks/beken-72xx-sdk.py +++ b/builder/frameworks/beken-72xx-sdk.py @@ -590,7 +590,7 @@ env.BuildLibraries() # Main firmware outputs and actions env.Replace( # linker command (encryption + packaging) - LINK="${LINK2BIN} ${VARIANT} '' ''", + LINK="${LTCHIPTOOL} link2bin ${VARIANT} '' ''", # UF2OTA input list UF2OTA=[ # app binary image (enc+crc), OTA1 (uploader) only diff --git a/builder/frameworks/realtek-ambz-sdk.py b/builder/frameworks/realtek-ambz-sdk.py index 4040f7b..ec5d6c7 100644 --- a/builder/frameworks/realtek-ambz-sdk.py +++ b/builder/frameworks/realtek-ambz-sdk.py @@ -294,7 +294,7 @@ env.BuildLibraries() # Main firmware outputs and actions env.Replace( # linker command (dual .bin outputs) - LINK="${LINK2BIN} ${VARIANT} xip1 xip2", + LINK="${LTCHIPTOOL} link2bin ${VARIANT} xip1 xip2", # default output .bin name IMG_FW="image_${FLASH_OTA1_OFFSET}.ota1.bin", # UF2OTA input list diff --git a/builder/utils/env.py b/builder/utils/env.py index 11df221..1cff8cc 100644 --- a/builder/utils/env.py +++ b/builder/utils/env.py @@ -2,16 +2,15 @@ from os.path import join +from ltchiptool import Family from SCons.Script import DefaultEnvironment -from tools.util.platform import get_family - env = DefaultEnvironment() def env_add_defaults(env, platform, board): # Get Family object for this board - family = get_family(short_name=board.get("build.family")) + family = Family.get(short_name=board.get("build.family")) # Default environment variables vars = dict( SDK_DIR=platform.get_package_dir(family.framework), @@ -36,10 +35,8 @@ def env_add_defaults(env, platform, board): VARIANT=board.get("build.variant"), LDSCRIPT_SDK=board.get("build.ldscript_sdk"), LDSCRIPT_ARDUINO=board.get("build.ldscript_arduino"), - # Link2Bin tool - LINK2BIN='"${PYTHONEXE}" "${LT_DIR}/tools/link2bin.py"', - UF2OTA_PY='"${PYTHONEXE}" "${LT_DIR}/tools/uf2ota/uf2ota.py"', - UF2UPLOAD_PY='"${PYTHONEXE}" "${LT_DIR}/tools/upload/uf2upload.py"', + # ltchiptool variables + LTCHIPTOOL='"${PYTHONEXE}" -m ltchiptool', # Fix for link2bin to get tmpfile name in argv LINKCOM="${LINK} ${LINKARGS}", LINKARGS="${TEMPFILE('-o $TARGET $LINKFLAGS $__RPATH $SOURCES $_LIBDIRFLAGS $_LIBFLAGS', '$LINKCOMSTR')}", diff --git a/builder/utils/uf2.py b/builder/utils/uf2.py index 503ef41..eb359e2 100644 --- a/builder/utils/uf2.py +++ b/builder/utils/uf2.py @@ -35,14 +35,13 @@ def env_uf2ota(env, *args, **kwargs): env["UF2OUT_BASE"] = basename(output) cmd = [ - "@${UF2OTA_PY}", + "@${LTCHIPTOOL} uf2 write", f'--output "{output}"', "--family ${FAMILY}", "--board ${VARIANT}", f"--version {lt_version}", f'--fw "{project_name}:{project_version}"', f"--date {int(now.timestamp())}", - "write", inputs, ] @@ -76,7 +75,10 @@ def env_uf2upload(env, target): return # add main upload target - env.Replace(UPLOADER="${UF2UPLOAD_PY}", UPLOADCMD="${UPLOADER} ${UPLOADERFLAGS}") + env.Replace( + UPLOADER="${LTCHIPTOOL} uf2 upload", + UPLOADCMD="${UPLOADER} ${UPLOADERFLAGS}", + ) actions.append(env.VerboseAction("${UPLOADCMD}", "Uploading ${UF2OUT_BASE}")) env.AddPlatformTarget("upload", target, actions, "Upload") diff --git a/tools/util/markdown.py b/docs/markdown.py similarity index 94% rename from tools/util/markdown.py rename to docs/markdown.py index 9bde7fa..b90bec7 100644 --- a/tools/util/markdown.py +++ b/docs/markdown.py @@ -3,8 +3,6 @@ from os.path import join from typing import List -from tools.util.fileio import writetext - class Markdown: items: List[str] @@ -15,7 +13,9 @@ class Markdown: self.output = join(dir, f"{name}.md") def write(self): - writetext(self.output, self.items) + with open(self.output, "w", encoding="utf-8") as f: + f.write("\n".join(self.items)) + f.write("\n") def pad(self, s: str, i: int) -> str: return s + " " * (i - len(s)) diff --git a/docs/update_docs.py b/docs/update_docs.py index 2857ab2..f5febc6 100644 --- a/docs/update_docs.py +++ b/docs/update_docs.py @@ -4,29 +4,19 @@ import sys from os.path import dirname, join sys.path.append(join(dirname(__file__), "..")) + import re -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Set import colorama from colorama import Fore, Style - -from tools.util.fileio import readjson, readtext -from tools.util.markdown import Markdown -from tools.util.obj import get, sizeof -from tools.util.platform import ( - get_board_list, - get_board_manifest, - get_families, - get_family, -) +from ltchiptool import Board, Family +from ltchiptool.util import readjson, readtext, sizeof +from markdown import Markdown OUTPUT = join(dirname(__file__), "status") -def load_boards() -> Dict[str, dict]: - return {board: get_board_manifest(board) for board in get_board_list()} - - def load_chip_type_h() -> str: code = readtext( join( @@ -43,24 +33,15 @@ def load_chip_type_h() -> str: return code -def check_mcus(boards: List[Tuple[str, dict]]) -> bool: - for board_name, board in boards: +def check_mcus(boards: List[Board]) -> bool: + for board in boards: # check if all boards' MCUs are defined in families.json - family_name: str = get(board, "build.family") - mcu_name: str = get(board, "build.mcu") - family = get_family(short_name=family_name) - if not family: - print( - Fore.RED - + f"ERROR: Family '{family_name}' of board '{board_name}' does not exist" - + Style.RESET_ALL - ) - return False - mcus = [mcu.lower() for mcu in family.mcus] + mcu_name: str = board["build.mcu"] + mcus = [mcu.lower() for mcu in board.family.mcus] if mcu_name not in mcus: print( Fore.RED - + f"ERROR: MCU '{mcu_name}' of board '{board_name}' is not defined for family '{family_name}'" + + f"ERROR: MCU '{mcu_name}' of board '{board.name}' is not defined for family '{board.family.name}'" + Style.RESET_ALL ) return False @@ -69,19 +50,19 @@ def check_mcus(boards: List[Tuple[str, dict]]) -> bool: def get_family_mcus() -> Set[str]: out = [] - for family in get_families(): + for family in Family.get_all(): out += family.mcus return set(out) def get_family_names() -> Set[str]: - return set(family.short_name for family in get_families()) + return set(family.short_name for family in Family.get_all()) -def get_board_mcus(boards: List[Tuple[str, dict]]) -> Set[str]: +def get_board_mcus(boards: List[Board]) -> Set[str]: out = set() - for _, board in boards: - mcu_name: str = get(board, "build.mcu") + for board in boards: + mcu_name: str = board["build.mcu"] out.add(mcu_name.upper()) return out @@ -103,28 +84,27 @@ def get_enum_families(code: str) -> Set[str]: return set(family[2:] for family in get_enum_keys(code, "ChipFamily")) -def board_sort(tpl): - generic = tpl[0].lower().startswith("generic") - vendor = get(tpl[1], "vendor") +def board_json_sort(tpl): + return tpl[1]["mcu"], tpl[0] + + +def board_obj_sort(board: Board): + generic = board.is_generic + vendor = board.vendor if vendor == "N/A": vendor = "\xff" generic = False return ( not generic, # reverse vendor, - get(tpl[1], "build.mcu"), - get(tpl[1], "mcu"), - tpl[0], + board["build.mcu"], + board["mcu"], + board.name, ) -def get_board_symbol(board_name: str, board: dict) -> str: - symbol = get(board, "symbol") - if not symbol and board_name.startswith("generic-"): - symbol = board_name[8:] - else: - symbol = symbol or board_name.upper() - return symbol +def get_board_symbol(board: Board) -> str: + return board.symbol or board.generic_name or board.name.upper() def write_chips(mcus: List[str]): @@ -133,7 +113,7 @@ def write_chips(mcus: List[str]): md.write() -def write_boards(boards: List[Tuple[str, dict]]): +def write_boards(boards: List[Board]): md = Markdown(OUTPUT, "supported_boards") header = [ "Name", @@ -149,34 +129,33 @@ def write_boards(boards: List[Tuple[str, dict]]): rows = [] vendor_prev = "" - for board_name, board in boards: - family = get_family(short_name=get(board, "build.family")) + for board in boards: # add board vendor as a row - vendor = get(board, "vendor") + vendor = board["vendor"] if vendor_prev != vendor: rows.append([f"**{vendor}**"]) vendor_prev = vendor # count total pin count & IO count pins = "-" - pinout: Dict[str, dict] = get(board, "pcb.pinout") + pinout: Dict[str, dict] = board["pcb.pinout"] if pinout: pinout = [pin for name, pin in pinout.items() if name.isnumeric()] pins_total = len(pinout) pins_io = sum(1 for pin in pinout if "ARD" in pin) pins = f"{pins_total} ({pins_io} I/O)" # format row values - symbol = get_board_symbol(board_name, board) - board_url = f"[{symbol}](../../boards/{board_name}/README.md)" + symbol = get_board_symbol(board) + board_url = f"[{symbol}](../../boards/{board.name}/README.md)" row = [ board_url, - get(board, "build.mcu").upper(), - sizeof(get(board, "upload.flash_size")), - sizeof(get(board, "upload.maximum_ram_size")), + board["build.mcu"].upper(), + sizeof(board["upload.flash_size"]), + sizeof(board["upload.maximum_ram_size"]), pins, - "✔️" if "wifi" in get(board, "connectivity") else "❌", - "✔️" if "ble" in get(board, "connectivity") else "❌", - "✔️" if "zigbee" in get(board, "connectivity") else "❌", - f"`{family.name}`", + "✔️" if "wifi" in board["connectivity"] else "❌", + "✔️" if "ble" in board["connectivity"] else "❌", + "✔️" if "zigbee" in board["connectivity"] else "❌", + f"`{board.family.name}`", ] rows.append(row) md.add_table(header, *rows) @@ -204,7 +183,7 @@ def write_unsupported_boards( series_rows = [] series_rows.append([f"**{series_name.upper()} Series**"]) boards = series[series_name] - for board_name, board in sorted(boards.items(), key=board_sort): + for board_name, board in sorted(boards.items(), key=board_json_sort): if board_name in supported: continue row = [ @@ -236,7 +215,7 @@ def write_families(): ] rows = [] - for family in get_families(): + for family in Family.get_all(): row = [ # Title "[{}]({})".format( @@ -274,14 +253,14 @@ def write_families(): md.write() -def write_boards_list(boards: List[Tuple[str, dict]]): +def write_boards_list(boards: List[Board]): md = Markdown(dirname(__file__), join("..", "boards", "SUMMARY")) items = [] - for board_name, board in boards: - symbol = get_board_symbol(board_name, board) - if board_name.startswith("generic-"): - symbol = get(board, "name") - items.append(f"[{symbol}](../boards/{board_name}/README.md)") + for board in boards: + symbol = get_board_symbol(board) + if board.is_generic: + symbol = board["name"] + items.append(f"[{symbol}](../boards/{board.name}/README.md)") md.add_list(*items) md.write() @@ -289,18 +268,17 @@ def write_boards_list(boards: List[Tuple[str, dict]]): if __name__ == "__main__": colorama.init() - boards = load_boards() - boards = sorted(boards.items(), key=board_sort) + boards = map(Board, Board.get_list()) + boards = sorted(boards, key=board_obj_sort) code = load_chip_type_h() errors = False - for name, board in boards: - variant = get(board, "build.variant") - if name != variant: + for board in boards: + if board.name != board["source"]: print( Fore.RED - + f"ERROR: Invalid build.variant of '{name}': '{variant}'" + + f"ERROR: Invalid build.variant of '{board['source']}': '{board.name}'" + Style.RESET_ALL ) errors = True @@ -347,5 +325,5 @@ if __name__ == "__main__": write_unsupported_boards( series=data, name=f"unsupported_{name}", - supported=[tpl[0] for tpl in boards], + supported=[board.name for board in boards], ) diff --git a/platform.json b/platform.json index 3865abc..da24a8e 100644 --- a/platform.json +++ b/platform.json @@ -120,6 +120,11 @@ "~1.100301.0" ] }, + "tool-ltchiptool": { + "type": "uploader", + "version": "https://github.com/libretuya/ltchiptool#v1.2.1", + "note": "This is used only for C/C++ code from ltchiptool." + }, "tool-openocd": { "type": "uploader", "optional": true, diff --git a/platform.py b/platform.py index 159ea62..755e729 100644 --- a/platform.py +++ b/platform.py @@ -1,26 +1,48 @@ # Copyright (c) Kuba Szczodrzyński 2022-04-20. +import importlib import json import sys +from os import system from os.path import dirname, join from typing import Dict -from platformio import util from platformio.debug.config.base import DebugConfigBase from platformio.debug.exception import DebugInvalidOptionsError -from platformio.managers.platform import PlatformBase from platformio.package.exception import MissingPackageManifestError from platformio.package.manager.base import BasePackageManager from platformio.package.meta import PackageItem, PackageSpec +from platformio.platform.base import PlatformBase from platformio.platform.board import PlatformBoardConfig +from semantic_version import Version -# Make tools available -sys.path.insert(0, dirname(__file__)) -from tools.util.platform import get_board_manifest +# Install & import tools +def check_ltchiptool(): + global ltchiptool + import ltchiptool + + importlib.reload(ltchiptool) + if Version(ltchiptool.get_version()) < Version("1.3.1"): + raise ImportError("Version too old") + + +try: + check_ltchiptool() +except (ImportError, AttributeError): + print("Installing/updating ltchiptool") + system(" ".join([sys.executable, "-m", "pip install -U ltchiptool"])) + try: + check_ltchiptool() + except (ImportError, AttributeError) as e: + print( + f"!!! Installing ltchiptool failed, or version outdated. Cannot continue: {e}" + ) + raise e # Remove current dir so it doesn't conflict with PIO -sys.path.remove(dirname(__file__)) +if dirname(__file__) in sys.path: + sys.path.remove(dirname(__file__)) libretuya_packages = None manifest_default = {"version": "0.0.0", "description": "", "keywords": []} @@ -189,7 +211,7 @@ class LibretuyaPlatform(PlatformBase): def update_board(self, board: PlatformBoardConfig): if "_base" in board: - board._manifest = get_board_manifest(board._manifest) + board._manifest = ltchiptool.Board.get_data(board._manifest) # add "arduino" framework has_arduino = any("arduino" in fw for fw in board.manifest["frameworks"]) diff --git a/tools/link2bin.py b/tools/link2bin.py deleted file mode 100644 index 3f9c624..0000000 --- a/tools/link2bin.py +++ /dev/null @@ -1,241 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-05-31. - -import sys -from os.path import dirname, join - -sys.path.append(join(dirname(__file__), "..")) - -import shlex -from argparse import ArgumentParser -from enum import Enum -from os import stat, unlink -from os.path import basename, dirname, isfile, join -from shutil import copyfile -from subprocess import PIPE, Popen -from typing import IO, Dict, List, Tuple - -from tools.util.fileio import chext, isnewer, readtext -from tools.util.models import Family -from tools.util.obj import get -from tools.util.platform import get_board_manifest, get_family - - -class SocType(Enum): - UNSET = () - # (index, toolchain prefix, has dual-OTA, argument count) - AMBZ = (1, "arm-none-eabi-", True, 0) - BK72XX = (2, "arm-none-eabi-", False, 0) - - def cmd(self, program: str, args: List[str] = []) -> IO[bytes]: - program = self.prefix + program - cmd = [program] + args - try: - process = Popen(cmd, stdout=PIPE) - except FileNotFoundError: - if isinstance(cmd, list): - cmd = " ".join(cmd) - print(f"Toolchain not found while running: '{cmd}'") - exit(1) - return process.stdout - - @property - def prefix(self) -> str: - return self.value[1] - - @property - def dual_ota(self) -> bool: - return self.value[2] - - @property - def soc_argc(self) -> int: - return self.value[3] - - def nm(self, input: str) -> Dict[str, int]: - out = {} - stdout = self.cmd("gcc-nm", [input]) - for line in stdout.readlines(): - line = line.decode().strip().split(" ") - if len(line) != 3: - continue - out[line[2]] = int(line[0], 16) - return out - - def objcopy( - self, - input: str, - output: str, - sections: List[str] = [], - fmt: str = "binary", - ) -> str: - # print graph element - print(f"| | |-- {basename(output)}") - if isnewer(input, output): - args = [] - for section in sections: - args += ["-j", section] - args += ["-O", fmt] - args += [input, output] - self.cmd("objcopy", args).read() - return output - - -# _ _ _ _ _ _ _ _ -# | | | | | (_) (_) | (_) -# | | | | |_ _| |_| |_ _ ___ ___ -# | | | | __| | | | __| |/ _ \/ __| -# | |__| | |_| | | | |_| | __/\__ \ -# \____/ \__|_|_|_|\__|_|\___||___/ -def checkfile(path: str): - if not isfile(path) or stat(path).st_size == 0: - print(f"Generated file not found: {path}") - exit(1) - - -# ______ _ ______ _ ____ _____ _ _ -# | ____| | | ____| | | | _ \_ _| \ | | -# | |__ | | | |__ | |_ ___ | |_) || | | \| | -# | __| | | | __| | __/ _ \ | _ < | | | . ` | -# | |____| |____| | | || (_) | | |_) || |_| |\ | -# |______|______|_| \__\___/ |____/_____|_| \_| -def elf2bin( - soc: SocType, - family: Family, - board: dict, - input: str, - ota_idx: int = 1, - args: List[str] = [], -) -> Tuple[int, str]: - checkfile(input) - func = None - - if soc == SocType.AMBZ: - from tools.soc.link2bin_ambz import elf2bin_ambz - - func = elf2bin_ambz - elif soc == SocType.BK72XX: - from tools.soc.link2bin_bk72xx import elf2bin_bk72xx - - func = elf2bin_bk72xx - - if func: - return func(soc, family, board, input, ota_idx, *args) - raise NotImplementedError(f"SoC ELF->BIN not implemented: {soc}") - - -# _ _ _ -# | | (_) | | -# | | _ _ __ | | _____ _ __ -# | | | | '_ \| |/ / _ \ '__| -# | |____| | | | | < __/ | -# |______|_|_| |_|_|\_\___|_| -def ldargs_parse( - args: List[str], - ld_ota1: str, - ld_ota2: str, -) -> List[Tuple[str, List[str]]]: - args1 = list(args) - args2 = list(args) - elf1 = elf2 = None - for i, arg in enumerate(args): - if ".elf" in arg: - if not ld_ota1: - # single-OTA chip, return the output name - return [(arg, args)] - # append OTA index in filename - args1[i] = elf1 = chext(arg, "ota1.elf") - args2[i] = elf2 = chext(arg, "ota2.elf") - if arg.endswith(".ld") and ld_ota1: - # use OTA2 linker script - args2[i] = arg.replace(ld_ota1, ld_ota2) - if not elf1 or not elf2: - print("Linker output .elf not found in arguments") - return None - return [(elf1, args1), (elf2, args2)] - - -def link2bin( - soc: SocType, - family: Family, - board: dict, - ld_args: List[str], - ld_ota1: str = None, - ld_ota2: str = None, - soc_args: List[str] = [], -) -> List[str]: - elfs = [] - if soc.dual_ota: - # process linker arguments for dual-OTA chips - elfs = ldargs_parse(ld_args, ld_ota1, ld_ota2) - else: - # just get .elf output name for single-OTA chips - elfs = ldargs_parse(ld_args, None, None) - - if not elfs: - return None - - ota_idx = 1 - for elf, ldargs in elfs: - # print graph element - print(f"|-- Image {ota_idx}: {basename(elf)}") - if isfile(elf): - unlink(elf) - soc.cmd(f"gcc", args=ldargs).read() - checkfile(elf) - # generate a set of binaries for the SoC - elf2bin(soc, family, board, elf, ota_idx, soc_args) - ota_idx += 1 - - if soc.dual_ota: - # copy OTA1 file as firmware.elf to make PIO understand it - elf, _ = ldargs_parse(ld_args, None, None)[0] - copyfile(elfs[0][0], elf) - - -if __name__ == "__main__": - parser = ArgumentParser( - prog="link2bin", - description="Link to BIN format", - prefix_chars="#", - ) - parser.add_argument("board", type=str, help="Target board name") - parser.add_argument("ota1", type=str, help=".LD file OTA1 pattern") - parser.add_argument("ota2", type=str, help=".LD file OTA2 pattern") - parser.add_argument("args", type=str, nargs="*", help="SoC+linker arguments") - args = parser.parse_args() - - try: - board = get_board_manifest(args.board) - except FileNotFoundError: - print(f"Board not found: {args.board}") - exit(1) - - family = get_family(short_name=get(board, "build.family")) - soc_types = {soc.name.lower(): soc for soc in SocType} - soc = soc_types.get(family.code, soc_types.get(family.parent_code, None)) - if not soc: - print(f"SoC type not found. Tried {family.code}, {family.parent_code}") - exit(1) - - if not args.args: - print(f"Linker arguments must not be empty") - exit(1) - - try: - while True: - i = next(i for i, a in enumerate(args.args) if a.startswith("@")) - arg = args.args.pop(i) - argv = readtext(arg[1:]) - argv = shlex.split(argv) - args.args = args.args[0:i] + argv + args.args[i:] - except StopIteration: - pass - - link2bin( - soc, - family, - board, - args.args[soc.soc_argc :], - args.ota1, - args.ota2, - args.args[: soc.soc_argc], - ) diff --git a/tools/soc/link2bin_ambz.py b/tools/soc/link2bin_ambz.py deleted file mode 100644 index 6bf6835..0000000 --- a/tools/soc/link2bin_ambz.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-14. - -from os.path import basename -from typing import IO, Tuple - -from tools.util.fileio import chname, isnewer, readbin -from tools.util.intbin import inttole32 -from tools.util.models import Family - - -def elf2bin_ambz( - soc, - family: Family, - board: dict, - input: str, - ota_idx: int = 1, -) -> Tuple[int, str]: - def write_header(f: IO[bytes], start: int, end: int): - f.write(b"81958711") - f.write(inttole32(end - start)) - f.write(inttole32(start)) - f.write(b"\xff" * 16) - - sections_ram = [ - ".ram_image2.entry", - ".ram_image2.data", - ".ram_image2.bss", - ".ram_image2.skb.bss", - ".ram_heap.data", - ] - sections_xip = [".xip_image2.text"] - sections_rdp = [".ram_rdp.text"] - nmap = soc.nm(input) - ram_start = nmap["__ram_image2_text_start__"] - ram_end = nmap["__ram_image2_text_end__"] - xip_start = nmap["__flash_text_start__"] - 0x8000020 - # build output name - output = chname(input, f"image_0x{xip_start:06X}.ota{ota_idx}.bin") - out_ram = chname(input, f"ota{ota_idx}.ram_2.r.bin") - out_xip = chname(input, f"ota{ota_idx}.xip_image2.bin") - out_rdp = chname(input, f"ota{ota_idx}.rdp.bin") - # print graph element - print(f"| |-- {basename(output)}") - # objcopy required images - ram = soc.objcopy(input, out_ram, sections_ram) - xip = soc.objcopy(input, out_xip, sections_xip) - soc.objcopy(input, out_rdp, sections_rdp) - # return if images are up to date - if not isnewer(ram, output) and not isnewer(xip, output): - return (xip_start, output) - - # read and trim RAM image - ram = readbin(ram).rstrip(b"\x00") - # read XIP image - xip = readbin(xip) - # align images to 4 bytes - ram += b"\x00" * (((((len(ram) - 1) // 4) + 1) * 4) - len(ram)) - xip += b"\x00" * (((((len(xip) - 1) // 4) + 1) * 4) - len(xip)) - # write output file - with open(output, "wb") as f: - # write XIP header - write_header(f, 0, len(xip)) - # write XIP image - f.write(xip) - # write RAM header - write_header(f, ram_start, ram_end) - # write RAM image - f.write(ram) - return (xip_start, output) diff --git a/tools/soc/link2bin_bk72xx.py b/tools/soc/link2bin_bk72xx.py deleted file mode 100644 index 8d9a6ce..0000000 --- a/tools/soc/link2bin_bk72xx.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-14. - -from datetime import datetime -from os import stat -from os.path import basename -from typing import Tuple - -from tools.util.bkutil import RBL, BekenBinary, DataType -from tools.util.fileio import chext, chname, isnewer, writebin, writejson -from tools.util.models import Family -from tools.util.obj import get - - -def calc_offset(addr: int) -> int: - return int(addr + (addr // 32) * 2) - - -def elf2bin_bk72xx( - soc, - family: Family, - board: dict, - input: str, - ota_idx: int = 1, -) -> Tuple[int, str]: - mcu = get(board, "build.mcu") - coeffs = get(board, "build.bkcrypt_coeffs") or ("0" * 32) - rbl_size = get(board, "build.bkrbl_size_app") - version = datetime.now().strftime("%y.%m.%d") - - nmap = soc.nm(input) - app_addr = nmap["_vector_start"] - app_offs = calc_offset(app_addr) - app_size = int(rbl_size, 16) - rbl_offs = app_offs - - # build output name - output = chname(input, f"{mcu}_app_0x{app_offs:06X}.rbl") - fw_bin = chext(input, "bin") - # print graph element - print(f"| |-- {basename(output)}") - # objcopy ELF -> raw BIN - soc.objcopy(input, fw_bin) - # return if images are up to date - if not isnewer(fw_bin, output): - return (app_offs, output) - - bk = BekenBinary(coeffs) - rbl = RBL( - name="app", - version=f"{version}-{mcu}", - container_size=app_size, - ) - - fw_size = stat(fw_bin).st_size - raw = open(fw_bin, "rb") - out = open(output, "wb") - - # open encrypted+CRC binary output - out_crc = chname(input, f"{mcu}_app_0x{app_offs:06X}.crc") - print(f"| |-- {basename(out_crc)}") - crc = open(out_crc, "wb") - - # get partial (type, bytes) data generator - package_gen = bk.package(raw, app_addr, fw_size, rbl, partial=True) - - # write all BINARY blocks - for data_type, data in package_gen: - if data_type != DataType.BINARY: - break - out.write(data) - crc.write(data) - rbl_offs += len(data) - - # skip PADDING_SIZE bytes for RBL header, write it to main output - if data_type == DataType.PADDING_SIZE: - out.write(b"\xff" * data) - rbl_offs += data - - # open RBL header output - out_rblh = chname(input, f"{mcu}_app_0x{rbl_offs:06X}.rblh") - print(f"| |-- {basename(out_rblh)}") - rblh = open(out_rblh, "wb") - - # write all RBL blocks - for data_type, data in package_gen: - if data_type != DataType.RBL: - break - out.write(data) - rblh.write(data) - - # close all files - raw.close() - out.close() - crc.close() - rblh.close() diff --git a/tools/soc/uf2_bk72xx.py b/tools/soc/uf2_bk72xx.py deleted file mode 100644 index cfef700..0000000 --- a/tools/soc/uf2_bk72xx.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-23. - -import sys - -try: - from platformio.package.manager.tool import ToolPackageManager - - manager = ToolPackageManager() - pkg = manager.get_package("tool-bk7231tools") - sys.path.append(pkg.path) - from bk7231tools.serial import BK7231Serial -except (ImportError, AttributeError): - print("You need PlatformIO and tool-bk7231tools package to run this program.") - exit(1) - - -from tools.upload.ctx import UploadContext - - -def upload_uart( - ctx: UploadContext, - port: str, - baud: int = None, - **kwargs, -) -> bool: - prefix = "| |--" - # connect to chip - bk = BK7231Serial(port=port, baudrate=baud or ctx.baudrate or 115200) - - # collect continuous blocks of data - parts = ctx.collect(ota_idx=1) - # write blocks to flash - for offs, data in parts.items(): - length = len(data.getvalue()) - data.seek(0) - print(prefix, f"Writing {length} bytes to 0x{offs:06x}") - try: - bk.program_flash( - data, - length, - offs, - verbose=False, - crc_check=True, - dry_run=False, - really_erase=True, - ) - except ValueError as e: - print(prefix, f"Writing failed: {e.args[0]}") - return False - # reboot the chip - bk.reboot_chip() - return True - - -def upload(ctx: UploadContext, protocol: str, **kwargs) -> bool: - if protocol == "uart": - return upload_uart(ctx, **kwargs) - print(f"Unknown upload protocol - {protocol}") - return False diff --git a/tools/soc/uf2_rtltool.py b/tools/soc/uf2_rtltool.py deleted file mode 100644 index 8d3e9db..0000000 --- a/tools/soc/uf2_rtltool.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -from io import BytesIO - -from tools.upload.ctx import UploadContext -from tools.upload.rtltool import RTLXMD -from tools.util.intbin import letoint - - -def upload_uart( - ctx: UploadContext, - port: str, - baud: int = None, - **kwargs, -) -> bool: - prefix = "| |--" - rtl = RTLXMD(port=port) - print(prefix, f"Connecting to {port}...") - if not rtl.connect(): - print(prefix, f"Failed to connect on port {port}") - return False - - # read system data to get active OTA index - io = BytesIO() - if not rtl.ReadBlockFlash(io, offset=0x9000, size=256): - print(prefix, "Failed to read from 0x9000") - return False - # get as bytes - system = io.getvalue() - if len(system) != 256: - print(prefix, f"Length invalid while reading from 0x9000 - {len(system)}") - return False - # read OTA switch value - ota_switch = bin(letoint(system[4:8]))[2:] - # count 0-bits - ota_idx = 1 + (ota_switch.count("0") % 2) - # validate OTA2 address in system data - if ota_idx == 2: - ota2_addr = letoint(system[0:4]) & 0xFFFFFF - part_addr = ctx.get_offset("ota2", 0) - if ota2_addr != part_addr: - print( - prefix, - f"Invalid OTA2 address on chip - found {ota2_addr}, expected {part_addr}", - ) - return False - - print(prefix, f"Flashing image to OTA {ota_idx}...") - # collect continuous blocks of data - parts = ctx.collect(ota_idx=ota_idx) - # write blocks to flash - for offs, data in parts.items(): - offs |= 0x8000000 - length = len(data.getvalue()) - data.seek(0) - print(prefix, f"Writing {length} bytes to 0x{offs:06x}") - if not rtl.WriteBlockFlash(data, offs, length): - print(prefix, f"Writing failed at 0x{offs:x}") - return False - return True - - -def upload(ctx: UploadContext, protocol: str, **kwargs) -> bool: - if protocol == "uart": - return upload_uart(ctx, **kwargs) - print(f"Unknown upload protocol - {protocol}") - return False diff --git a/tools/uf2ota/.gitignore b/tools/uf2ota/.gitignore deleted file mode 100644 index 811d126..0000000 --- a/tools/uf2ota/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.uf2 -*.bin diff --git a/tools/uf2ota/dump.py b/tools/uf2ota/dump.py deleted file mode 100644 index 1d85589..0000000 --- a/tools/uf2ota/dump.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-05-28. - - -from io import BytesIO, FileIO -from os import makedirs -from os.path import join -from typing import Dict, Tuple - -from models import Opcode, Tag -from uf2 import UF2 - -from tools.util.intbin import inttole32, letoint, letosint - -fs: Dict[str, Tuple[int, FileIO]] = {} -output_dir = "" -output_basename = "" -part1 = "" -part2 = "" - - -def write(part: str, offs: int, data: bytes): - global fs - - if part not in fs or fs[part][0] != offs: - path = join(output_dir, output_basename + part + f"_0x{offs:x}.bin") - f = open(path, "wb") - if part in fs: - fs[part][1].close() - else: - f = fs[part][1] - fs[part] = (offs + f.write(data), f) - - -def update_parts(tags: Dict[Tag, bytes]): - global part1, part2 - if Tag.LT_PART_1 in tags: - part1 = tags[Tag.LT_PART_1].decode() - part1 = ("1_" + part1) if part1 else None - if Tag.LT_PART_2 in tags: - part2 = tags[Tag.LT_PART_2].decode() - part2 = ("2_" + part2) if part2 else None - - -def uf2_dump(uf2: UF2, outdir: str): - global output_dir, output_basename - - makedirs(outdir, exist_ok=True) - if Tag.LT_VERSION not in uf2.tags: - raise RuntimeError("Can only dump LibreTuya firmware images") - - output_dir = outdir - output_basename = "_".join( - filter( - None, - [ - uf2.tags.get(Tag.FIRMWARE, b"").decode(), - uf2.tags.get(Tag.VERSION, b"").decode(), - "lt" + uf2.tags[Tag.LT_VERSION].decode(), - uf2.tags.get(Tag.BOARD, b"").decode(), - ], - ) - ) - output_basename += "_" - - update_parts(uf2.tags) - for block in uf2.data: - # update target partition info - update_parts(block.tags) - # skip empty blocks - if not block.length: - continue - - data1 = block.data if part1 else None - data2 = block.data if part2 else None - - if Tag.LT_BINPATCH in block.tags: - # type 5, 6 - data2 = bytearray(data2) - tag = block.tags[Tag.LT_BINPATCH] - binpatch = BytesIO(tag) - while binpatch.tell() < len(tag): - opcode = Opcode(binpatch.read(1)[0]) - length = binpatch.read(1)[0] - data = binpatch.read(length) - if opcode == Opcode.DIFF32: - value = letosint(data[0:4]) - for offs in data[4:]: - chunk = data2[offs : offs + 4] - chunk = letoint(chunk) - chunk += value - chunk = inttole32(chunk) - data2[offs : offs + 4] = chunk - data2 = bytes(data2) - - if data1: - # types 1, 3, 4 - write(part1, block.address, data1) - if data2: - # types 2, 3, 4 - write(part2, block.address, data2) diff --git a/tools/uf2ota/models.py b/tools/uf2ota/models.py deleted file mode 100644 index a90748e..0000000 --- a/tools/uf2ota/models.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-05-27. - -from enum import IntEnum - - -class Tag(IntEnum): - VERSION = 0x9FC7BC # version of firmware file - UTF8 semver string - PAGE_SIZE = 0x0BE9F7 # page size of target device (32 bit unsigned number) - SHA2 = 0xB46DB0 # SHA-2 checksum of firmware (can be of various size) - DEVICE = 0x650D9D # description of device (UTF8) - DEVICE_ID = 0xC8A729 # device type identifier - # LibreTuya custom tags - OTA_VERSION = 0x5D57D0 # format version - BOARD = 0xCA25C8 # board name (lowercase code) - FIRMWARE = 0x00DE43 # firmware description / name - BUILD_DATE = 0x822F30 # build date/time as Unix timestamp - LT_VERSION = 0x59563D # LT version (semver) - LT_PART_1 = 0x805946 # OTA1 partition name - LT_PART_2 = 0xA1E4D7 # OTA2 partition name - LT_HAS_OTA1 = 0xBBD965 # image has any data for OTA1 - LT_HAS_OTA2 = 0x92280E # image has any data for OTA2 - LT_BINPATCH = 0xB948DE # binary patch to convert OTA1->OTA2 - - -class Opcode(IntEnum): - DIFF32 = 0xFE # difference between 32-bit values - - -class Flags: - not_main_flash: bool = False - file_container: bool = False - has_family_id: bool = False - has_md5: bool = False - has_tags: bool = False - - def encode(self) -> int: - val = 0 - if self.not_main_flash: - val |= 0x00000001 - if self.file_container: - val |= 0x00001000 - if self.has_family_id: - val |= 0x00002000 - if self.has_md5: - val |= 0x00004000 - if self.has_tags: - val |= 0x00008000 - return val - - def decode(self, data: int): - self.not_main_flash = (data & 0x00000001) != 0 - self.file_container = (data & 0x00001000) != 0 - self.has_family_id = (data & 0x00002000) != 0 - self.has_md5 = (data & 0x00004000) != 0 - self.has_tags = (data & 0x00008000) != 0 - - def __str__(self) -> str: - flags = [] - if self.not_main_flash: - flags.append("NMF") - if self.file_container: - flags.append("FC") - if self.has_family_id: - flags.append("FID") - if self.has_md5: - flags.append("MD5") - if self.has_tags: - flags.append("TAG") - return ",".join(flags) - - -class Input: - ota1_part: str = None - ota1_offs: int = 0 - ota1_file: str = None - ota2_part: str = None - ota2_offs: int = 0 - ota2_file: str = None - - def __init__(self, input: str) -> None: - input = input.split(";") - n = len(input) - if n not in [2, 4]: - raise ValueError( - "Incorrect input format - should be part+offs;file[;part+offs;file]" - ) - # just spread the same image twice for single-OTA scheme - if n == 2: - input += input - - if input[0] and input[1]: - if "+" in input[0]: - (self.ota1_part, self.ota1_offs) = input[0].split("+") - self.ota1_offs = int(self.ota1_offs, 0) - else: - self.ota1_part = input[0] - self.ota1_file = input[1] - if input[2] and input[3]: - if "+" in input[2]: - (self.ota2_part, self.ota2_offs) = input[2].split("+") - self.ota2_offs = int(self.ota2_offs, 0) - else: - self.ota2_part = input[2] - self.ota2_file = input[3] - - if self.ota1_file and self.ota2_file and self.ota1_offs != self.ota2_offs: - # currently, offsets cannot differ when storing images - # (this would require to actually store it twice) - raise ValueError(f"Offsets cannot differ ({self.ota1_file})") - - @property - def is_single(self) -> bool: - return self.ota1_part == self.ota2_part and self.ota1_file == self.ota2_file - - @property - def single_part(self) -> str: - return self.ota1_part or self.ota2_part - - @property - def single_offs(self) -> int: - return self.ota1_offs or self.ota2_offs - - @property - def single_file(self) -> str: - return self.ota1_file or self.ota2_file - - @property - def has_ota1(self) -> bool: - return not not (self.ota1_part and self.ota1_file) - - @property - def has_ota2(self) -> bool: - return not not (self.ota2_part and self.ota2_file) - - @property - def is_simple(self) -> bool: - return self.ota1_file == self.ota2_file or not (self.has_ota1 and self.has_ota2) diff --git a/tools/uf2ota/pyproject.toml b/tools/uf2ota/pyproject.toml deleted file mode 100644 index dc407ce..0000000 --- a/tools/uf2ota/pyproject.toml +++ /dev/null @@ -1,17 +0,0 @@ -[tool.poetry] -name = "uf2ota" -version = "0.1.0" -description = "UF2 OTA update format" -authors = ["Kuba Szczodrzyński "] -license = "MIT" - -[tool.poetry.dependencies] -python = "^3.7" - -[tool.poetry.dev-dependencies] -black = "^22.3.0" -isort = "^5.10.1" - -[build-system] -requires = ["poetry-core>=1.0.0"] -build-backend = "poetry.core.masonry.api" diff --git a/tools/uf2ota/uf2.py b/tools/uf2ota/uf2.py deleted file mode 100644 index 8e44c56..0000000 --- a/tools/uf2ota/uf2.py +++ /dev/null @@ -1,152 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-05-27. - -from io import BytesIO, FileIO -from typing import Dict, List - -from models import Tag -from uf2_block import Block - -from tools.util.intbin import align_down, align_up, intto8, inttole16, inttole32 -from tools.util.models import Family - - -class UF2: - f: FileIO - seq: int = 0 - - family: Family = None - tags: Dict[Tag, bytes] = {} - data: List[Block] = [] - - def __init__(self, f: FileIO) -> None: - self.f = f - - def store( - self, - address: int, - data: bytes, - tags: Dict[Tag, bytes] = {}, - block_size: int = 256, - ): - if len(data) <= block_size: - block = Block(self.family) - block.tags = tags - block.address = address - block.data = data - block.length = len(data) - self.data.append(block) - return - for offs in range(0, len(data), block_size): - block = Block(self.family) - block.tags = tags - data_part = data[offs : offs + block_size] - block.address = address + offs - block.data = data_part - block.length = len(data_part) - self.data.append(block) - tags = {} - - def put_str(self, tag: Tag, value: str): - self.tags[tag] = value.encode("utf-8") - - def put_int32le(self, tag: Tag, value: int): - self.tags[tag] = inttole32(value) - - def put_int16le(self, tag: Tag, value: int): - self.tags[tag] = inttole16(value) - - def put_int8(self, tag: Tag, value: int): - self.tags[tag] = intto8(value) - - def read(self, block_tags: bool = True) -> bool: - while True: - data = self.f.read(512) - if len(data) not in [0, 512]: - print(f"Block size invalid ({len(data)})") - return False - if not len(data): - break - block = Block() - if not block.decode(data): - return False - - if self.family and self.family != block.family: - print(f"Mismatched family ({self.family} != {block.family})") - return False - self.family = block.family - - if block.block_seq != self.seq: - print(f"Mismatched sequence number ({self.seq} != {block.block_seq}") - return False - self.seq += 1 - - if block_tags or not block.length: - self.tags.update(block.tags) - if block.length and not block.flags.not_main_flash: - self.data.append(block) - return True - - def dump(self): - print(f"Family: {self.family.short_name} / {self.family.description}") - print(f"Tags:") - for k, v in self.tags.items(): - if "\\x" not in str(v): - v = v.decode() - else: - v = v.hex() - print(f" - {k.name}: {v}") - print(f"Data chunks: {len(self.data)}") - print(f"Total binary size: {sum(bl.length for bl in self.data)}") - - @property - def block_count(self) -> int: - cnt = len(self.data) - if self.tags: - cnt += 1 - return cnt - - def write_header(self): - comment = "Hi! Please visit https://kuba2k2.github.io/libretuya/ to read specifications of this file format." - bl = Block(self.family) - bl.flags.has_tags = True - bl.flags.not_main_flash = True - bl.block_seq = 0 - bl.block_count = self.block_count - bl.tags = self.tags - - data = bl.encode() - # add comment in the unused space - tags_len = align_up(Block.get_tags_length(bl.tags), 16) - comment_len = len(comment) - if 476 - 16 >= tags_len + comment_len: - space = 476 - 16 - tags_len - start = (space - comment_len) / 2 - start = align_down(start, 16) - padding1 = b"\x00" * start - padding2 = b"\x00" * (476 - tags_len - comment_len - start) - data = ( - data[0 : 32 + tags_len] - + padding1 - + comment.encode() - + padding2 - + data[-4:] - ) - - self.f.write(data) - - def write(self): - if self.tags and self.seq == 0: - self.write_header() - self.seq += 1 - - bio = BytesIO() - for bl in self.data: - bl.block_count = self.block_count - bl.block_seq = self.seq - bio.write(bl.encode()) - if self.seq % 128 == 0: - # write the buffer every 64 KiB - self.f.write(bio.getvalue()) - bio = BytesIO() - self.seq += 1 - self.f.write(bio.getvalue()) diff --git a/tools/uf2ota/uf2_block.py b/tools/uf2ota/uf2_block.py deleted file mode 100644 index 1ed1ce4..0000000 --- a/tools/uf2ota/uf2_block.py +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-05-27. - -from math import ceil -from typing import Dict - -from models import Flags, Tag - -from tools.util.intbin import align_up, intto8, inttole24, inttole32, letoint -from tools.util.models import Family -from tools.util.platform import get_family - - -class Block: - flags: Flags - - address: int = 0 - length: int = 0 - - block_seq: int = 0 - block_count: int = 0 - - file_size: int = 0 - family: Family - - data: bytes = None - md5_data: bytes = None - tags: Dict[Tag, bytes] = {} - - def __init__(self, family: Family = None) -> None: - self.flags = Flags() - self.family = family - self.flags.has_family_id = not not self.family - - def encode(self) -> bytes: - self.flags.has_tags = not not self.tags - # UF2 magic 1 and 2 - data = b"\x55\x46\x32\x0A\x57\x51\x5D\x9E" - # encode integer variables - data += inttole32(self.flags.encode()) - data += inttole32(self.address) - data += inttole32(self.length) - data += inttole32(self.block_seq) - data += inttole32(self.block_count) - if self.flags.file_container: - data += inttole32(self.file_size) - elif self.flags.has_family_id: - data += inttole32(self.family.id) - else: - data += b"\x00\x00\x00\x00" - if not self.data: - self.data = b"" - # append tags - tags = b"" - if self.flags.has_tags: - for k, v in self.tags.items(): - tag_size = 4 + len(v) - tags += intto8(tag_size) - tags += inttole24(k.value) - tags += v - tag_size %= 4 - if tag_size: - tags += b"\x00" * (4 - tag_size) - # append block data with padding - data += self.data - data += tags - data += b"\x00" * (476 - len(self.data) - len(tags)) - data += b"\x30\x6F\xB1\x0A" # magic 3 - return data - - def decode(self, data: bytes) -> bool: - # check block size - if len(data) != 512: - print(f"Invalid block size ({len(data)})") - return False - # check Magic 1 - if letoint(data[0:4]) != 0x0A324655: - print(f"Invalid Magic 1 ({data[0:4]})") - return False - # check Magic 2 - if letoint(data[4:8]) != 0x9E5D5157: - print(f"Invalid Magic 2 ({data[4:8]})") - return False - # check Magic 3 - if letoint(data[508:512]) != 0x0AB16F30: - print(f"Invalid Magic 13({data[508:512]})") - return False - - self.flags.decode(letoint(data[8:12])) - self.address = letoint(data[12:16]) - self.length = letoint(data[16:20]) - self.block_seq = letoint(data[20:24]) - self.block_count = letoint(data[24:28]) - if self.flags.file_container: - self.file_size = letoint(data[28:32]) - if self.flags.has_family_id: - self.family = get_family(id=letoint(data[28:32])) - - if self.flags.has_md5: - self.md5_data = data[484:508] # last 24 bytes of data[] - - # decode tags - self.tags = {} - if self.flags.has_tags: - tags = data[32 + self.length :] - i = 0 - while i < len(tags): - length = tags[i] - if not length: - break - tag_type = letoint(tags[i + 1 : i + 4]) - tag_data = tags[i + 4 : i + length] - self.tags[Tag(tag_type)] = tag_data - i += length - i = int(ceil(i / 4) * 4) - - self.data = data[32 : 32 + self.length] - return True - - @staticmethod - def get_tags_length(tags: Dict[Tag, bytes]) -> int: - out = 0 - # add tag headers - out += 4 * len(tags) - # add all tag lengths, padded to 4 bytes - out += sum(align_up(l, 4) for l in map(len, tags.values())) - # add final 0x00 tag - out += 4 - return out - - def __str__(self) -> str: - flags = self.flags - address = hex(self.address) - length = hex(self.length) - block_seq = self.block_seq - block_count = self.block_count - file_size = self.file_size - family = self.family.short_name - tags = [(k.name, v) for k, v in self.tags.items()] - return f"Block[{block_seq}/{block_count}](flags={flags}, address={address}, length={length}, file_size={file_size}, family={family}, tags={tags})" diff --git a/tools/uf2ota/uf2ota.py b/tools/uf2ota/uf2ota.py deleted file mode 100644 index fd557b7..0000000 --- a/tools/uf2ota/uf2ota.py +++ /dev/null @@ -1,145 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-05-27. - -import sys -from os.path import dirname, join - -sys.path.append(join(dirname(__file__), "..", "..")) - -from argparse import ArgumentParser -from datetime import datetime -from zlib import crc32 - -from dump import uf2_dump -from models import Input, Tag -from uf2 import UF2 -from uf2_block import Block -from utils import binpatch32 - -from tools.util.platform import get_family - -BLOCK_SIZE = 256 - - -def cli(): - parser = ArgumentParser("uf2ota", description="UF2 OTA update format") - parser.add_argument("action", choices=["info", "dump", "write"]) - parser.add_argument("inputs", nargs="+", type=str) - parser.add_argument("--output", help="Output .uf2 binary", type=str) - parser.add_argument("--family", help="Family name", type=str) - parser.add_argument("--board", help="Board name/code", type=str) - parser.add_argument("--version", help="LibreTuya core version", type=str) - parser.add_argument("--fw", help="Firmware name:version", type=str) - parser.add_argument("--date", help="Build date (Unix, default now)", type=int) - args = parser.parse_args() - - if args.action == "info": - with open(args.inputs[0], "rb") as f: - uf2 = UF2(f) - if not uf2.read(): - raise RuntimeError("Reading UF2 failed") - uf2.dump() - return - - if args.action == "dump": - input = args.inputs[0] - outdir = input + "_dump" - with open(input, "rb") as f: - uf2 = UF2(f) - if not uf2.read(block_tags=False): - raise RuntimeError("Reading UF2 failed") - uf2_dump(uf2, outdir) - return - - out = args.output or "out.uf2" - with open(out, "wb") as f: - uf2 = UF2(f) - - uf2.family = get_family(args.family) - - # store global tags (for entire file) - if args.board: - uf2.put_str(Tag.BOARD, args.board.lower()) - key = f"LibreTuya {args.board.lower()}" - uf2.put_int32le(Tag.DEVICE_ID, crc32(key.encode())) - - if args.version: - uf2.put_str(Tag.LT_VERSION, args.version) - - if args.fw: - if ":" in args.fw: - (fw_name, fw_ver) = args.fw.split(":") - uf2.put_str(Tag.FIRMWARE, fw_name) - uf2.put_str(Tag.VERSION, fw_ver) - else: - uf2.put_str(Tag.FIRMWARE, args.fw) - - uf2.put_int8(Tag.OTA_VERSION, 1) - uf2.put_str(Tag.DEVICE, "LibreTuya") - uf2.put_int32le(Tag.BUILD_DATE, args.date or int(datetime.now().timestamp())) - - any_ota1 = False - any_ota2 = False - - for input in args.inputs: - input = Input(input) - - any_ota1 = any_ota1 or input.has_ota1 - any_ota2 = any_ota2 or input.has_ota2 - - # store local tags (for this image only) - tags = { - Tag.LT_PART_1: input.ota1_part.encode() if input.has_ota1 else b"", - Tag.LT_PART_2: input.ota2_part.encode() if input.has_ota2 else b"", - } - - if input.is_simple: - # single input image: - # - same image and partition (2 args) - # - same image but different partitions (4 args) - # - only OTA1 image - # - only OTA2 image - with open(input.single_file, "rb") as f: - data = f.read() - uf2.store(input.single_offs, data, tags, block_size=BLOCK_SIZE) - continue - - # different images and partitions for both OTA schemes - with open(input.ota1_file, "rb") as f: - data1 = f.read() - with open(input.ota2_file, "rb") as f: - data2 = f.read() - - if len(data1) != len(data2): - raise RuntimeError( - f"Images must have same lengths ({len(data1)} vs {len(data2)})" - ) - - for i in range(0, len(data1), 256): - block1 = data1[i : i + 256] - block2 = data2[i : i + 256] - if block1 == block2: - # blocks are identical, simply store them - uf2.store( - input.single_offs + i, block1, tags, block_size=BLOCK_SIZE - ) - tags = {} - continue - # calculate max binpatch length (incl. existing tags and binpatch tag header) - max_length = 476 - BLOCK_SIZE - Block.get_tags_length(tags) - 4 - # try 32-bit binpatch for best space optimization - binpatch = binpatch32(block1, block2, bladdr=i) - if len(binpatch) > max_length: - raise RuntimeError( - f"Binary patch too long - {len(binpatch)} > {max_length}" - ) - tags[Tag.LT_BINPATCH] = binpatch - uf2.store(input.single_offs + i, block1, tags, block_size=BLOCK_SIZE) - tags = {} - - uf2.put_int8(Tag.LT_HAS_OTA1, any_ota1 * 1) - uf2.put_int8(Tag.LT_HAS_OTA2, any_ota2 * 1) - uf2.write() - - -if __name__ == "__main__": - cli() diff --git a/tools/uf2ota/utils.py b/tools/uf2ota/utils.py deleted file mode 100644 index 33aae39..0000000 --- a/tools/uf2ota/utils.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-05-27. - - -from typing import Dict, List, Tuple - -from models import Opcode - -from tools.util.intbin import intto8, letoint, sinttole32 - - -def bindiff( - data1: bytes, data2: bytes, width: int = 1, single: bool = False -) -> Dict[int, Tuple[bytes, bytes]]: - out: Dict[int, Tuple[bytes, bytes]] = {} - offs = -1 - diff1 = b"" - diff2 = b"" - for i in range(0, len(data1), width): - block1 = data1[i : i + width] - block2 = data2[i : i + width] - if block1 == block2: - # blocks are equal again - if offs != -1: - # store and reset current difference - out[offs] = (diff1, diff2) - offs = -1 - diff1 = b"" - diff2 = b"" - continue - # blocks still differ - if single: - # single block per difference, so just store it - out[i] = (block1, block2) - else: - if offs == -1: - # difference starts here - offs = i - diff1 += block1 - diff2 += block2 - return out - - -def binpatch32(block1: bytes, block2: bytes, bladdr: int = 0) -> bytes: - # compare blocks: - # - in 4 byte (32 bit) chunks - # - report a single chunk in each difference - diffs = bindiff(block1, block2, width=4, single=True) - binpatch: Dict[int, List[int]] = {} - - # gather all repeating differences (i.e. memory offsets for OTA1/OTA2) - for offs, diff in diffs.items(): - (diff1, diff2) = diff - diff1 = letoint(diff1) - diff2 = letoint(diff2) - diff = diff2 - diff1 - if diff in binpatch: - # difference already in this binpatch, add the offset - binpatch[diff].append(offs) - else: - # a new difference value - binpatch[diff] = [offs] - # print(f"Block at 0x{bladdr:x}+{offs:02x} -> {diff1:08x} - {diff2:08x} = {diff2-diff1:x}") - # print(f"Block at 0x{bladdr:x}: {len(binpatch)} difference(s) at {sum(len(v) for v in binpatch.values())} offsets") - - # write binary patches - out = b"" - for diff, offs in binpatch.items(): - out += intto8(Opcode.DIFF32.value) - out += intto8(len(offs) + 4) - out += sinttole32(diff) - out += bytes(offs) - return out diff --git a/tools/upload/binpatch.py b/tools/upload/binpatch.py deleted file mode 100644 index 4a2b1a4..0000000 --- a/tools/upload/binpatch.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -from io import BytesIO - -from tools.uf2ota.models import Opcode -from tools.util.intbin import inttole32, letoint, letosint - - -def binpatch_diff32(data: bytearray, patch: bytes) -> bytearray: - diff = letosint(patch[0:4]) - for offs in patch[4:]: - value = letoint(data[offs : offs + 4]) - value += diff - data[offs : offs + 4] = inttole32(value) - return data - - -def binpatch_apply(data: bytearray, binpatch: bytes) -> bytearray: - io = BytesIO(binpatch) - while io.tell() < len(binpatch): - opcode = io.read(1)[0] - length = io.read(1)[0] - bpdata = io.read(length) - if opcode == Opcode.DIFF32: - data = binpatch_diff32(data, bpdata) - return data diff --git a/tools/upload/ctx.py b/tools/upload/ctx.py deleted file mode 100644 index 8c3f2f3..0000000 --- a/tools/upload/ctx.py +++ /dev/null @@ -1,164 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -from datetime import datetime -from io import BytesIO -from typing import Dict, Tuple - -from tools.uf2ota.models import Tag -from tools.uf2ota.uf2 import UF2 -from tools.upload.binpatch import binpatch_apply -from tools.util.intbin import letoint -from tools.util.obj import get -from tools.util.platform import get_board_manifest - - -class UploadContext: - - uf2: UF2 - - seq: int = 0 - - part1: str = None - part2: str = None - - has_ota1: bool - has_ota2: bool - - board_manifest: dict = None - - def __init__(self, uf2: UF2) -> None: - self.uf2 = uf2 - self.has_ota1 = uf2.tags.get(Tag.LT_HAS_OTA1, None) == b"\x01" - self.has_ota2 = uf2.tags.get(Tag.LT_HAS_OTA2, None) == b"\x01" - - @property - def fw_name(self) -> str: - return self.uf2.tags.get(Tag.FIRMWARE, b"").decode() - - @property - def fw_version(self) -> str: - return self.uf2.tags.get(Tag.VERSION, b"").decode() - - @property - def lt_version(self) -> str: - return self.uf2.tags.get(Tag.LT_VERSION, b"").decode() - - @property - def board(self) -> str: - return self.uf2.tags.get(Tag.BOARD, b"").decode() - - @property - def build_date(self) -> datetime: - if Tag.BUILD_DATE not in self.uf2.tags: - return None - return datetime.fromtimestamp(letoint(self.uf2.tags[Tag.BUILD_DATE])) - - @property - def baudrate(self) -> int: - if not self.board_manifest: - self.board_manifest = get_board_manifest(self.board) - return get(self.board_manifest, "upload.speed") - - def get_offset(self, part: str, offs: int) -> int: - if not self.board_manifest: - self.board_manifest = get_board_manifest(self.board) - part = get(self.board_manifest, f"flash.{part}") - (offset, length) = map(lambda x: int(x, 16), part.split("+")) - if offs >= length: - return None - return offset + offs - - def read(self, ota_idx: int = 1) -> Tuple[str, int, bytes]: - """Read next available data block for the specified OTA scheme. - - Returns: - Tuple[str, int, bytes]: target partition, relative offset, data block - """ - - if ota_idx not in [1, 2]: - print(f"Invalid OTA index - {ota_idx}") - return None - - if ota_idx == 1 and not self.has_ota1: - print(f"No data for OTA index - {ota_idx}") - return None - if ota_idx == 2 and not self.has_ota2: - print(f"No data for OTA index - {ota_idx}") - return None - - for _ in range(self.seq, len(self.uf2.data)): - block = self.uf2.data[self.seq] - self.seq += 1 - - part1 = block.tags.get(Tag.LT_PART_1, None) - part2 = block.tags.get(Tag.LT_PART_2, None) - - if part1 is not None and part2 is not None: - # decode empty tags too - self.part1 = part1.decode() - self.part2 = part2.decode() - elif part1 or part2: - print(f"Only one target partition specified - {part1} / {part2}") - return None - - if not block.data: - continue - - part = None - if ota_idx == 1: - part = self.part1 - elif ota_idx == 2: - part = self.part2 - if not part: - continue - - # got data and target partition - offs = block.address - data = block.data - - if ota_idx == 2 and Tag.LT_BINPATCH in block.tags: - binpatch = block.tags[Tag.LT_BINPATCH] - data = bytearray(data) - data = binpatch_apply(data, binpatch) - data = bytes(data) - - return (part, offs, data) - return (None, 0, None) - - def collect(self, ota_idx: int = 1) -> Dict[int, BytesIO]: - """Read all UF2 blocks. Gather continuous data parts into sections - and their flashing offsets. - - Returns: - Dict[int, BytesIO]: map of flash offsets to streams with data - """ - - out: Dict[int, BytesIO] = {} - while True: - ret = self.read(ota_idx) - if not ret: - return False - (part, offs, data) = ret - if not data: - break - offs = self.get_offset(part, offs) - if offs is None: - return False - - # find BytesIO in the dict - for io_offs, io_data in out.items(): - if io_offs + len(io_data.getvalue()) == offs: - io_data.write(data) - offs = 0 - break - if offs == 0: - continue - - # create BytesIO at specified offset - io = BytesIO() - io.write(data) - out[offs] = io - # rewind BytesIO back to start - for io in out.values(): - io.seek(0) - return out diff --git a/tools/upload/rtltool.py b/tools/upload/rtltool.py deleted file mode 100644 index edc72fc..0000000 --- a/tools/upload/rtltool.py +++ /dev/null @@ -1,506 +0,0 @@ -#!/usr/bin/env python -# RTL871xBx ROM Bootloader Utility Ver 12.01.2018 -# Created on: 10.10.2017 -# Author: pvvx -# - -import argparse -import io -import os -import platform -import struct -import sys -import time - -import serial - -# Protocol bytes -SOH = b"\x01" -STX = b"\x02" -EOT = b"\x04" -ACK = b"\x06" -DLE = b"\x10" -NAK = b"\x15" -CAN = b"\x18" - -CMD_USB = b"\x05" # UART Set Baud -CMD_XMD = b"\x07" # Go xmodem mode (write RAM/Flash mode) -CMD_EFS = b"\x17" # Erase Flash Sectors -CMD_RBF = b"\x19" # Read Block Flash -CMD_ABRT = b"\x1B" # End xmodem mode (write RAM/Flash mode) -CMD_GFS = b"\x21" # FLASH Get Status -CMD_SFS = b"\x26" # FLASH Set Status - -# Protocol Mode -MODE_RTL = 0 # Rtl mode -MODE_XMD = 1 # xmodem mode -MODE_UNK1 = 3 # Unknown mode, test 1 -MODE_UNK2 = 4 # Unknown mode, test 2 - -# Default baudrate -RTL_ROM_BAUD = 1500000 - -RTL_READ_BLOCK_SIZE = 1024 -RTL_FLASH_SECTOR_SIZE = 4096 - - -class RTLXMD: - def __init__(self, port=0, baud=RTL_ROM_BAUD, timeout=1): - self.mode = MODE_UNK1 - try: - self._port = serial.Serial(port, baud) - self._port.timeout = timeout - except: - # raise Exception('Error open %s, %d baud' % (port, baud)) - print("Error: Open %s, %d baud!" % (port, baud)) - sys.exit(-1) - - def writecmd(self, cmd, ok=ACK): - if self._port.write(cmd): - char = self._port.read(1) - if char: - if char == ok: - return True - return False - - def WaitNAK(self): - chr_count = 128 - while 1: - char = self._port.read(1) - if char: - if char == NAK: - return True - else: - return None - chr_count -= 1 - if chr_count == 0: - return False - - # return False - - def sync(self, mode=MODE_RTL, flush=True, ready=7): - if flush: - self._port.flushOutput() - self._port.flushInput() - error_count = 0 - cancel = 0 - while True: - char = self._port.read(1) - if char: - if char == b"\x00": - continue - elif char == NAK: - # standard checksum requested (NAK) - if mode != self.mode: - if self.mode < MODE_UNK1: - if mode == MODE_RTL: - if self.writecmd(CMD_ABRT, CAN): - self.mode = MODE_RTL - # return True - break - elif mode == MODE_XMD: - if self.writecmd(CMD_XMD): - self.mode = MODE_XMD - break - else: - if mode == MODE_XMD: - if self.writecmd(CMD_XMD): - self.mode = MODE_XMD - break - self.mode = MODE_RTL - break - elif char == CAN: - # received CAN - if cancel: - # Transmission canceled: received 2xCAN at start-sequence - return False - else: - # Cancellation at start sequence - cancel = 1 - # else: - # send error: expected NAK, or CAN - # print 'Not NAK or CAN: %02x' % (ord(char)) - else: - if self.mode == MODE_UNK1: - if self.writecmd(CMD_XMD): - self.mode = MODE_XMD - if mode == MODE_XMD: - return True - if self.writecmd(CMD_ABRT, CAN): - self.mode = MODE_RTL - return True - self.mode = MODE_UNK2 - elif self.mode == MODE_UNK2: - if self.writecmd(CMD_ABRT, CAN): - self.mode = MODE_RTL - if mode == MODE_RTL: - return True - if self.writecmd(CMD_XMD): - self.mode = MODE_XMD - return True - self.mode = MODE_UNK1 - error_count += 1 - if error_count > ready: - if self.mode == MODE_XMD: - # send error: error_count reached 15, aborting. - self._port.write(CAN) - self._port.write(CAN) - return False - return True - - def ModeXmodem(self): - if self.sync(): - ret = self.writecmd(CMD_XMD) - if ret == True: - self.mode = 1 - return ret - return None - - def RtlMode(self): - if self.sync(): - ret = self.writecmd(CMD_ABRT, CAN) - if ret == True: - self.mode = 0 - return ret - return None - - def GetFlashStatus(self): - if self.sync(): - self._port.write(CMD_GFS) - return self._port.read(1) - return None - - def SetFlashStatus(self, status): - if self.sync(): - if self.writecmd([CMD_SFS, status]): - return self.GetFlashStatus() - return None - - def ReadBlockFlash(self, stream, offset=0, size=0x200000): - # Read sectors size: 4 block 1024 bytes, else not set ACK! - count = int((size + RTL_FLASH_SECTOR_SIZE - 1) / RTL_FLASH_SECTOR_SIZE) - offset &= 0xFFFFFF - if count > 0 and count < 0x10000 and offset >= 0: # 1 byte .. 16 Mbytes - ret = self.sync() - if ret: - ret = self._port.write( - struct.pack( - " RTL_READ_BLOCK_SIZE: - stream.write(data) - elif size > 0: - stream.write(data[:size]) - else: - return ret - else: - return False - size -= RTL_READ_BLOCK_SIZE - if size <= 0: - ret = self.sync() - else: - ret = False - return ret - - def connect(self): - # issue reset-to-bootloader: - # RTS = either RESET (both active low = chip in reset) - # DTR = GPIOA_30 (active low = boot to flasher) - self._port.setDTR(False) - self._port.setRTS(True) - time.sleep(0.05) - self._port.setDTR(True) - self._port.setRTS(False) - time.sleep(0.05) - self._port.setDTR(False) - return self.GetFlashStatus() - - def EraseSectorsFlash(self, offset=0, size=0x200000): - count = int((size + RTL_FLASH_SECTOR_SIZE - 1) / RTL_FLASH_SECTOR_SIZE) - offset &= 0xFFF000 - if count > 0 and count < 0x10000 and offset >= 0: # 1 byte .. 16 Mbytes - for i in range(count): - ret = self.sync() - if ret: - # print '\r%d' % i - ret = self.writecmd( - struct.pack( - "= ("3", "0", "0"): - return (sum(data) + checksum) % 256 - else: - return (sum(map(ord, data)) + checksum) % 256 - - def send_xmodem(self, stream, offset, size, retry=3): - ret = self.sync(MODE_XMD) - if ret: - sequence = 1 - while size > 0: - if size <= 128: - packet_size = 128 - cmd = SOH - else: - packet_size = 1024 - cmd = STX - rdsize = packet_size - if size < rdsize: - rdsize = size - data = stream.read(rdsize) - if not data: # end of stream - print("send: at EOF") - return False - data = data.ljust(packet_size, b"\xFF") - pkt = ( - struct.pack(" retry: - return False - - ret = self.writecmd(EOT) # if write SRAM -> (*0x10002000)() - self.mode = MODE_RTL - return ret - - def WriteBlockSRAM(self, stream, offset=0x10002000, size=0x1000, retry=3): - offset &= 0x00FFFFFF - offset |= 0x10000000 - return self.send_xmodem(stream, offset, size, retry) - - def WriteBlockFlash(self, stream, offset=0x10010000, size=0x1000, retry=3): - offset &= 0x00FFFFFF - offset |= 0x08000000 - return self.send_xmodem(stream, offset, size, retry) - - -def arg_auto_int(x): - return int(x, 0) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="RT871xBx ROM Bootloader Utility", prog="rtltool" - ) - - parser.add_argument("--port", "-p", help="Serial port device", required=True) - parser.add_argument( - "--go", "-g", action="store_true", help="Run after performing the operation" - ) - - subparsers = parser.add_subparsers( - dest="operation", - help="Run rtltool {command} -h for additional help", - required=True, - ) - - parser_read_flash = subparsers.add_parser( - "rf", help="Read Flash data to binary file" - ) - parser_read_flash.add_argument("address", help="Start address", type=arg_auto_int) - parser_read_flash.add_argument("size", help="Size of region", type=arg_auto_int) - parser_read_flash.add_argument("filename", help="Name of binary file") - - parser_write_flash = subparsers.add_parser( - "wf", help="Write a binary file to Flash data" - ) - parser_write_flash.add_argument("address", help="Start address", type=arg_auto_int) - parser_write_flash.add_argument("filename", help="Name of binary file") - - parser_write_mem = subparsers.add_parser( - "wm", help="Write a binary file to SRAM memory" - ) - parser_write_mem.add_argument("address", help="Start address", type=arg_auto_int) - # parser_write_mem.add_argument('size', help='Size of region', type=arg_auto_int) - parser_write_mem.add_argument("filename", help="Name of binary file") - - parser_erase_flash = subparsers.add_parser("es", help="Erase Sectors Flash") - parser_erase_flash.add_argument("address", help="Start address", type=arg_auto_int) - parser_erase_flash.add_argument("size", help="Size of region", type=arg_auto_int) - - parser_get_status_flash = subparsers.add_parser( - "gf", help="Get Flash Status register" - ) - - parser_set_status_flash = subparsers.add_parser( - "sf", help="Set Flash Status register" - ) - parser_boot_flash = subparsers.add_parser("bf", help="Start boot flash") - parser_set_status_flash = subparsers.add_parser("gm", help="Go ROM Monitor") - - args = parser.parse_args() - rtl = RTLXMD(args.port) - print("Connecting...") - if rtl.connect(): - if args.operation == "wf": - stream = open(args.filename, "rb") - size = os.path.getsize(args.filename) - if size < 1: - stream.close - print("Error: File size = 0!") - sys.exit(-1) - offset = args.address & 0x00FFFFFF - offset |= 0x08000000 - print( - "Write Flash data 0x%08x to 0x%08x from file: %s ..." - % (offset, offset + size, args.filename) - ) - if not rtl.WriteBlockFlash(stream, args.address, size): - stream.close - print("Error: Write Flash!") - sys.exit(-2) - stream.close - # print 'Done!' - # sys.exit(0) - - elif args.operation == "rf": - print( - "Read Flash data from 0x%08x to 0x%08x in file: %s ..." - % (args.address, args.address + args.size, args.filename) - ) - stream = open(args.filename, "wb") - if not rtl.ReadBlockFlash(stream, args.address, args.size): - stream.close - print("Error!") - sys.exit(-2) - stream.close - # print 'Done!' - # sys.exit(0) - - elif args.operation == "wm": - stream = open(args.filename, "rb") - size = os.path.getsize(args.filename) - if size < 1: - stream.close - print("Error: File size = 0!") - sys.exit(-1) - offset = args.address & 0x00FFFFFF - offset |= 0x10000000 - print( - "Write SRAM at 0x%08x to 0x%08x from file: %s ..." - % (args.address, args.address + size, args.filename) - ) - if not rtl.WriteBlockSRAM(stream, args.address, size): - stream.close - print("Error: Write Flash!") - sys.exit(-2) - stream.close - print("Done!") - sys.exit(0) - - elif args.operation == "es": - count = (args.size + RTL_FLASH_SECTOR_SIZE - 1) / RTL_FLASH_SECTOR_SIZE - size = count * RTL_FLASH_SECTOR_SIZE - offset = args.address & 0xFFF000 - print( - "Erase Flash %d sectors, data from 0x%08x to 0x%08x ..." - % (count, offset, offset + size) - ) - if rtl.EraseSectorsFlash(offset, size): - print("Done!") - sys.exit(0) - print("Error: Erase Flash sectors!") - sys.exit(-2) - - elif args.operation == "gf": - fsta = rtl.GetFlashStatus() - if fsta: - print("Flash Status value: 0x%02x" % (ord(fsta))) - sys.exit(0) - print("Error: Get Flash Status!") - sys.exit(-2) - - elif args.operation == "sf": - print("Set Flash Status value: 0x%02x" % (args.value & 0xFF)) - if rtl.SetFlashStatus(args.value & 0xFF): - sys.exit(0) - print("Error: Set Flash Status!") - sys.exit(-2) - - elif args.operation == "bf": - print("BOOT_ROM_FromFlash()...") # ROM-Call:00005404 - stream = io.BytesIO(b"\x05\x54\x00\x00") - if not rtl.WriteBlockSRAM( - stream, 0x10002000, 4 - ): # [0x10002000] = 0x00005405 - stream.close - print("Error!") - sys.exit(-2) - print("Done!") - rtl._port.close() - rtl._port.baudrate = 115200 - rtl._port.open() - rtl._port.timeout = 1 - sio = io.TextIOWrapper(io.BufferedRWPair(rtl._port, rtl._port)) - print( - sio.readline(), - sio.readline(), - sio.readline(), - sio.readline(), - sio.readline(), - ) - sys.exit(0) - - elif args.operation == "gm": - stream = io.BytesIO( - b"\x19\x20\x00\x10\x19\x20\x00\x10\x19\x20\x00\x10\x19\x20\x00\x10\x19\x20\x00\x10\x00\x00\x00\x00\x08\xb5\x02\x4c\x4f\xf4\x7a\x70\xa0\x47\xfb\xe7\x05\x22\x00\x00" - ) - if not rtl.WriteBlockSRAM(stream, 0x10002000, 40): # [0x10002000] = ... - stream.close - print("Error!") - sys.exit(-2) - print("Done!") - sys.exit(0) - else: - print("Failed to connect device on", args.port, "!") - sys.exit(-2) - - if args.go: - if not rtl.WaitNAK() or rtl.writecmd(CMD_GFS, 0) == None: - print("Error: Sync!") - sys.exit(-2) - print("BOOT FromFlash...") # ROM-Call:00005404 - stream = io.BytesIO(b"\x05\x54\x00\x00") - if not rtl.WriteBlockSRAM(stream, 0x10002000, 4): # [0x10002000] = 0x00005405 - stream.close - print("Error!") - sys.exit(-2) - print("Done!") - sys.exit(0) diff --git a/tools/upload/uf2upload.py b/tools/upload/uf2upload.py deleted file mode 100644 index 65cec5a..0000000 --- a/tools/upload/uf2upload.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -import sys -from os.path import dirname, join -from time import time - -sys.path.append(join(dirname(__file__), "..", "..")) -sys.path.append(join(dirname(__file__), "..", "uf2ota")) - -from argparse import ArgumentParser, FileType - -from tools.uf2ota.uf2 import UF2 -from tools.upload.ctx import UploadContext - -# TODO document this tool - -if __name__ == "__main__": - parser = ArgumentParser("uf2upload", description="UF2 uploader") - parser.add_argument("file", type=FileType("rb"), help=".uf2 file") - - subp = parser.add_subparsers(dest="protocol", help="Upload protocol", required=True) - - parser_uart = subp.add_parser("uart", help="UART uploader") - parser_uart.add_argument("port", type=str, help="Serial port device") - parser_uart.add_argument("-b", "--baud", type=int, help="Serial baudrate") - - args = parser.parse_args() - - uf2 = UF2(args.file) - if not uf2.read(block_tags=False): - exit(1) - - ctx = UploadContext(uf2) - - print( - f"|-- {ctx.fw_name} {ctx.fw_version} @ {ctx.build_date} -> {ctx.board} via {args.protocol}" - ) - - start = time() - - args = dict(args._get_kwargs()) - if uf2.family.code == "ambz": - from tools.soc.uf2_rtltool import upload - - if not upload(ctx, **args): - exit(1) - elif uf2.family.parent_code == "bk72xx": - from tools.soc.uf2_bk72xx import upload - - if not upload(ctx, **args): - exit(1) - else: - print(f"Unsupported upload family - {uf2.family.name}") - exit(1) - - duration = time() - start - print(f"|-- Finished in {duration:.3f} s") - exit(0) diff --git a/tools/util/bitint.py b/tools/util/bitint.py deleted file mode 100644 index fde11aa..0000000 --- a/tools/util/bitint.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-10. - -from typing import List, Tuple, Union - -from tools.util.intbin import uintmax -from tools.util.obj import SliceLike, slice2int - - -def bitcat(*vars: Tuple[Union["BitInt", int], SliceLike]) -> int: - """Concat all 'vars' denoted in a (value, slice) format into a bitstring.""" - out = 0 - for val, sl in vars: - if not isinstance(val, BitInt): - val = BitInt(val) - (start, stop) = slice2int(sl) - out <<= start - stop + 1 - out |= val[start:stop] - return out - - -def bitcatraw(*vars: Tuple[int, int]) -> int: - """Concat all 'vars' denoted in a (value, bitwidth) format into a bitstring.""" - out = 0 - for val, bits in vars: - out <<= bits - out |= val - return out - - -class BitInt(int): - """ - Wrapper for int supporting slice reading and assignment of - individual bits (counting from LSB to MSB, like '7:0'). - """ - - value: int = None - - def __init__(self, value: int) -> None: - self.value = value - - def __getitem__(self, key): - if self.value is None: - self.value = self - # for best performance, slice2int() type checking was disabled - if isinstance(key, int): - return (self.value >> key) % 2 - # (start, stop) = slice2int(key) - return (self.value >> key.stop) & uintmax(key.start - key.stop + 1) - - def __setitem__(self, key, value): - if self.value is None: - self.value = self - (start, stop) = slice2int(key) - - if value > uintmax(start - stop + 1): - raise ValueError("value is too big") - - tmp = self.value & ~uintmax(start + 1) - tmp |= self.value & uintmax(stop) - tmp |= value << stop - self.value = tmp - - def rep(self, n: int, sl: Union[SliceLike, List[SliceLike]]) -> int: - """Construct a bitstring from 'sl' (being a single slice or a list) - repeated 'n' times.""" - if isinstance(sl, list): - return self.cat(*(sl * n)) - return self.cat(*([sl] * n)) - - def cat(self, *slices: SliceLike) -> int: - """Construct a bitstring from this BitInt's parts denoted by 'slices'.""" - out = 0 - for sl in slices: - (start, stop) = slice2int(sl) - out <<= start - stop + 1 - out |= self[start:stop] - return out - - def __int__(self) -> int: - return self.value or self diff --git a/tools/util/bkcrypto.py b/tools/util/bkcrypto.py deleted file mode 100644 index 8b90658..0000000 --- a/tools/util/bkcrypto.py +++ /dev/null @@ -1,167 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-10. - -from typing import List, Tuple - -from tools.util.bitint import BitInt, bitcatraw - - -def pn15(addr: int) -> int: - # wire [15:0] pn_tmp = {addr[6:0], addr[15:7]} ^ {16'h6371 & {4{addr[8:5]}}}; - a = ((addr % 0x80) * 0x200) + ((addr // 0x80) % 0x200) - b = (addr // 0x20) % 0x10 - c = 0x6371 & (b * 0x1111) - return a ^ c - - -def pn16(addr: int) -> int: - # wire [16:0] pn_tmp = {addr[9:0], addr[16:10]} ^ {17'h13659 & {addr[4],{4{addr[1],addr[5],addr[9],addr[13]}}}}; - a = ((addr % 0x400) * 0x80) + ((addr // 0x400) % 0x80) - b = (addr // 0x2000) % 2 - b += ((addr // 0x200) % 2) * 2 - b += ((addr // 0x20) % 2) * 4 - b += ((addr // 0x2) % 2) * 8 - c = (addr // 0x10) % 2 - d = 0x13659 & (c * 0x10000 + b * 0x1111) - return a ^ d - - -def pn32(addr: int) -> int: - # wire [31:0] pn_tmp = {addr[14:0], addr[31:15]} ^ {32'hE519A4F1 & {8{addr[5:2]}}}; - a = ((addr % 0x8000) * 0x20000) + ((addr // 0x8000) % 0x20000) - b = (addr // 0x4) % 0x10 - c = 0xE519A4F1 & (b * 0x11111111) - return a ^ c - - -class BekenCrypto: - # translated from https://github.com/ghsecuritylab/tysdk_for_bk7231/blob/master/toolchain/encrypt_crc/abc.c - coef0: BitInt - coef1_mix: int - coef1_hi16: int - bypass: bool = False - pn15_args: List[slice] = None - pn16_args: slice = None - pn32_args: Tuple[int, int] = None - random: int = 0 - - def __init__(self, coeffs: List[BitInt]) -> None: - (self.coef0, coef1, coef2, coef3) = coeffs - - # wire g_bypass = (coef3[31:24] == 8'hFF) | (coef3[31:24] == 8'h00); - self.bypass = coef3[31:24] in [0x00, 0xFF] - if self.bypass: - return - - # wire pn16_bit = coef3[4]; - # wire[16:0] pn16_addr = pn16_A ^ {coef1[15:8], pn16_bit, coef1[7:0]}; - self.coef1_mix = bitcatraw((coef1[15:8], 8), (coef3[4], 1), (coef1[7:0], 8)) - self.coef1_hi16 = coef1[31:16] - - # wire pn15_bps = g_bypass | coef3[0]; - pn15_bps = coef3[0] - # wire pn16_bps = g_bypass | coef3[1]; - pn16_bps = coef3[1] - # wire pn32_bps = g_bypass | coef3[2]; - pn32_bps = coef3[2] - # wire rand_bps = g_bypass | coef3[3]; - rand_bps = coef3[3] - - if coef3[3:0] == 0xF: - self.bypass = True - return - - if not pn15_bps: - # wire[1:0] pn15_sel = coef3[ 6: 5]; - pn15_sel = coef3[6:5] - # wire[15:0] pn15_A = (pn15_sel == 0) ? ({addr[31:24], addr[23:16]} ^ {addr[15:8], addr[ 7:0]}) : - # (pn15_sel == 1) ? ({addr[31:24], addr[23:16]} ^ {addr[ 7:0], addr[15:8]}) : - # (pn15_sel == 2) ? ({addr[23:16], addr[31:24]} ^ {addr[15:8], addr[ 7:0]}) : - # ({addr[23:16], addr[31:24]} ^ {addr[ 7:0], addr[15:8]}); - if pn15_sel == 0: - self.pn15_args = [ - slice(31, 24), - slice(23, 16), - slice(15, 8), - slice(7, 0), - ] - elif pn15_sel == 1: - self.pn15_args = [ - slice(31, 24), - slice(23, 16), - slice(7, 0), - slice(15, 8), - ] - elif pn15_sel == 2: - self.pn15_args = [ - slice(23, 16), - slice(31, 24), - slice(15, 8), - slice(7, 0), - ] - else: - self.pn15_args = [ - slice(23, 16), - slice(31, 24), - slice(7, 0), - slice(15, 8), - ] - - if not pn16_bps: - # wire[1:0] pn16_sel = coef3[ 9: 8]; - pn16_sel = coef3[9:8] - # wire[16:0] pn16_A = (pn16_sel == 0) ? addr[16:0] : - # (pn16_sel == 1) ? addr[17:1] : - # (pn16_sel == 2) ? addr[18:2] : - # addr[19:3]; - self.pn16_args = slice(16 + pn16_sel, pn16_sel) - - if not pn32_bps: - # wire[1:0] pn32_sel = coef3[12:11]; - pn32_sel = coef3[12:11] - # wire[31:0] pn32_A = (pn32_sel == 0) ? addr[31:0] : - # (pn32_sel == 1) ? {addr[ 7:0], addr[31: 8]} : - # (pn32_sel == 2) ? {addr[15:0], addr[31:16]} : - # {addr[23:0], addr[31:24]}; - PN32_SHIFTS = ( - (0, 0), - (2**8, 2**24), - (2**16, 2**16), - (2**24, 2**8), - ) - self.pn32_args = PN32_SHIFTS[pn32_sel] - - # wire[31:0] random = rand_bps ? 32'h00000000 : coef2[31:0]; - self.random = 0 if rand_bps else coef2 - - def encrypt_u32(self, addr: int, data: int) -> int: - if self.bypass: - return data - addr = BitInt(addr) - - pn15_v = 0 - pn16_v = 0 - pn32_v = 0 - - if self.pn15_args: - pn15_a = (addr[self.pn15_args[0]] * 0x100) + addr[self.pn15_args[1]] - pn15_b = (addr[self.pn15_args[2]] * 0x100) + addr[self.pn15_args[3]] - pn15_A = pn15_a ^ pn15_b - # wire[15:0] pn15_addr = pn15_A ^ coef1[31:16]; - pn15_addr = pn15_A ^ self.coef1_hi16 - pn15_v = pn15(pn15_addr) - - if self.pn16_args: - pn16_A = addr[self.pn16_args] - # wire[16:0] pn16_addr = pn16_A ^ {coef1[15:8], pn16_bit, coef1[7:0]}; - pn16_addr = pn16_A ^ self.coef1_mix - pn16_v = pn16(pn16_addr) - - if self.pn32_args: - pn32_A = (addr // self.pn32_args[0]) + (addr * self.pn32_args[1]) - # wire[31:0] pn32_addr = pn32_A ^ coef0[31:0]; - pn32_addr = pn32_A ^ self.coef0 - pn32_v = pn32(pn32_addr) - - # assign pnout = pn32[31:0] ^ {pn15[15:0], pn16[15:0]} ^ random[31:0]; - pnout = pn32_v ^ ((pn15_v * 0x10000) + (pn16_v % 0x10000)) ^ self.random - return data ^ pnout diff --git a/tools/util/bkutil.py b/tools/util/bkutil.py deleted file mode 100644 index 8792187..0000000 --- a/tools/util/bkutil.py +++ /dev/null @@ -1,352 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-10. - -import sys -from os.path import dirname, join - -sys.path.append(join(dirname(__file__), "..", "..")) - -from argparse import ArgumentParser, FileType -from binascii import crc32 -from dataclasses import dataclass, field -from enum import Enum, IntFlag -from io import SEEK_SET, FileIO -from os import stat -from struct import Struct -from time import time -from typing import Generator, Tuple, Union - -from tools.util.bitint import BitInt -from tools.util.bkcrypto import BekenCrypto -from tools.util.crc16 import CRC16 -from tools.util.fileio import readbin, writebin -from tools.util.intbin import ( - ByteGenerator, - align_up, - betoint, - biniter, - fileiter, - geniter, - inttobe16, - inttole32, - letoint, - pad_data, - pad_up, -) - - -class DataType(Enum): - BINARY = "BINARY" - PADDING_SIZE = "PADDING_SIZE" - RBL = "RBL" - - -DataTuple = Tuple[DataType, Union[bytes, int]] -DataUnion = Union[bytes, DataTuple] -DataGenerator = Generator[DataUnion, None, None] - - -class OTAAlgorithm(IntFlag): - NONE = 0 - CRYPT_XOR = 1 - CRYPT_AES256 = 2 - COMPRESS_GZIP = 256 - COMPRESS_QUICKLZ = 512 - COMPRESS_FASTLZ = 768 - - -@dataclass -class RBL: - ota_algo: OTAAlgorithm = OTAAlgorithm.NONE - timestamp: float = field(default_factory=time) - name: Union[str, bytes] = "app" - version: Union[str, bytes] = "1.00" - sn: Union[str, bytes] = "0" * 23 - data_crc: int = 0 - data_hash: int = 0x811C9DC5 # https://github.com/znerol/py-fnvhash/blob/master/fnvhash/__init__.py - raw_size: int = 0 - data_size: int = 0 - container_size: int = 0 - has_part_table: bool = False - - @property - def container_size_crc(self) -> int: - return int(self.container_size + (self.container_size // 32) * 2) - - def update(self, data: bytes): - self.data_crc = crc32(data, self.data_crc) - for byte in data: - if self.data_size < self.raw_size: - self.data_hash ^= byte - self.data_hash *= 0x01000193 - self.data_hash %= 0x100000000 - self.data_size += 1 - - def serialize(self) -> bytes: - if isinstance(self.name, str): - self.name = self.name.encode() - if isinstance(self.version, str): - self.version = self.version.encode() - if isinstance(self.sn, str): - self.sn = self.sn.encode() - # based on https://github.com/khalednassar/bk7231tools/blob/main/bk7231tools/analysis/rbl.py - struct = Struct("<4sII16s24s24sIIII") # without header CRC - rbl = struct.pack( - b"RBL\x00", - self.ota_algo, - int(self.timestamp), - pad_data(self.name, 16, 0x00), - pad_data(self.version, 24, 0x00), - pad_data(self.sn, 24, 0x00), - self.data_crc, - self.data_hash, - self.raw_size, - self.data_size, - ) - return rbl + inttole32(crc32(rbl)) - - @classmethod - def deserialize(cls, data: bytes) -> "RBL": - crc_found = letoint(data[-4:]) - data = data[:-4] - crc_expected = crc32(data) - if crc_expected != crc_found: - raise ValueError( - f"Invalid RBL CRC (expected {crc_expected:X}, found {crc_found:X})" - ) - struct = Struct(" None: - if coeffs: - if isinstance(coeffs, str): - coeffs = bytes.fromhex(coeffs) - if len(coeffs) != 16: - raise ValueError( - f"Invalid length of encryption coefficients: {len(coeffs)}" - ) - coeffs = list(map(BitInt, map(betoint, biniter(coeffs, 4)))) - self.crypto = BekenCrypto(coeffs) - - def crc(self, data: ByteGenerator, type: DataType = None) -> DataGenerator: - for block in geniter(data, 32): - crc = CRC16.CMS.calc(block) - block += inttobe16(crc) - if type: - yield (type, block) - else: - yield block - - def uncrc(self, data: ByteGenerator, check: bool = True) -> ByteGenerator: - for block in geniter(data, 34): - if check: - crc = CRC16.CMS.calc(block[0:32]) - crc_found = betoint(block[32:34]) - if crc != crc_found: - print(f"CRC invalid: expected={crc:X}, found={crc_found:X}") - return - yield block[0:32] - - def crypt(self, addr: int, data: ByteGenerator) -> ByteGenerator: - for word in geniter(data, 4): - word = letoint(word) - word = self.crypto.encrypt_u32(addr, word) - word = inttole32(word) - yield word - addr += 4 - - def package( - self, - f: FileIO, - addr: int, - size: int, - rbl: RBL, - partial: bool = False, - ) -> DataGenerator: - if not rbl.container_size: - raise ValueError("RBL must have a total size when packaging") - crc_total = 0 - - # yield all data as (type, bytes) tuples, if partial mode enabled - type_binary = DataType.BINARY if partial else None - type_padding = DataType.PADDING_SIZE if partial else None - type_rbl = DataType.RBL if partial else None - - # when to stop reading input data - data_end = size - if rbl.has_part_table: - data_end = size - 0xC0 # do not encrypt the partition table - - # set RBL size including one 16-byte padding - rbl.raw_size = align_up(size + 16, 32) + 16 - - # encrypt the input file, padded to 32 bytes - data_crypt_gen = self.crypt( - addr, fileiter(f, size=32, padding=0xFF, count=data_end) - ) - # iterate over encrypted 32-byte blocks - for block in geniter(data_crypt_gen, 32): - # add CRC16 and yield - yield from self.crc(block, type_binary) - crc_total += 2 - rbl.update(block) - - # temporary buffer for small-size operations - buf = b"\xff" * 16 # add 16 bytes of padding - - if rbl.has_part_table: - # add an unencrypted partition table - buf += f.read(0xC0) - - # update RBL - rbl.update(buf) - # add last padding with different values - rbl.update(b"\x10" * 16) - - # add last padding with normal values - buf += b"\xff" * 16 - # yield the temporary buffer - yield from self.crc(buf, type_binary) - crc_total += 2 * (len(buf) // 32) - - # pad the entire container with 0xFF, excluding RBL and its CRC16 - pad_size = pad_up(rbl.data_size + crc_total, rbl.container_size_crc) - 102 - if type_padding: - yield (type_padding, pad_size) - else: - for _ in range(pad_size): - yield b"\xff" - - # yield RBL with CRC16 - yield from self.crc(rbl.serialize(), type_rbl) - - -def auto_int(x): - return int(x, 0) - - -def add_common_args(parser): - parser.add_argument( - "coeffs", type=str, help="Encryption coefficients (hex string, 32 chars)" - ) - parser.add_argument("input", type=FileType("rb"), help="Input file") - parser.add_argument("output", type=FileType("wb"), help="Output file") - parser.add_argument("addr", type=auto_int, help="Memory address (dec/hex)") - - -if __name__ == "__main__": - parser = ArgumentParser(description="Encrypt/decrypt Beken firmware binaries") - sub = parser.add_subparsers(dest="action", required=True) - - encrypt = sub.add_parser("encrypt", help="Encrypt binary files without packaging") - add_common_args(encrypt) - encrypt.add_argument("-c", "--crc", help="Include CRC16", action="store_true") - - decrypt = sub.add_parser("decrypt", description="Decrypt unpackaged binary files") - add_common_args(decrypt) - decrypt.add_argument( - "-C", - "--no-crc-check", - help="Do not check CRC16 (if present)", - action="store_true", - ) - - package = sub.add_parser( - "package", description="Package raw binary files as RBL containers" - ) - add_common_args(package) - package.add_argument( - "size", type=auto_int, help="RBL total size (excl. CRC) (dec/hex)" - ) - package.add_argument( - "-n", - "--name", - type=str, - help="Firmware name (default: app)", - default="app", - required=False, - ) - package.add_argument( - "-v", - "--version", - type=str, - help="Firmware version (default: 1.00)", - default="1.00", - required=False, - ) - - unpackage = sub.add_parser( - "unpackage", description="Unpackage a single RBL container" - ) - add_common_args(unpackage) - unpackage.add_argument( - "offset", type=auto_int, help="Offset in input file (dec/hex)" - ) - unpackage.add_argument( - "size", type=auto_int, help="Container total size (incl. CRC) (dec/hex)" - ) - - args = parser.parse_args() - bk = BekenBinary(args.coeffs) - f: FileIO = args.input - size = stat(args.input.name).st_size - start = time() - - if args.action == "encrypt": - print(f"Encrypting '{f.name}' ({size} bytes)") - if args.crc: - print(f" - calculating 32-byte block CRC16...") - gen = bk.crc(bk.crypt(args.addr, f)) - else: - print(f" - as raw binary, without CRC16...") - gen = bk.crypt(args.addr, f) - - if args.action == "decrypt": - print(f"Decrypting '{f.name}' ({size} bytes)") - if size % 34 == 0: - if args.no_crc_check: - print(f" - has CRC16, skipping checks...") - else: - print(f" - has CRC16, checking...") - gen = bk.crypt(args.addr, bk.uncrc(f, check=not args.no_crc_check)) - elif size % 4 != 0: - raise ValueError("Input file has invalid length") - else: - print(f" - raw binary, no CRC") - gen = bk.crypt(args.addr, f) - - if args.action == "package": - print(f"Packaging {args.name} '{f.name}' for memory address 0x{args.addr:X}") - rbl = RBL(name=args.name, version=args.version) - if args.name == "bootloader": - rbl.has_part_table = True - print(f" - in bootloader mode; partition table unencrypted") - rbl.container_size = args.size - print(f" - container size (excl. CRC): 0x{rbl.container_size:X}") - print(f" - container size (incl. CRC): 0x{rbl.container_size_crc:X}") - gen = bk.package(f, args.addr, size, rbl) - - if args.action == "unpackage": - print(f"Unpackaging '{f.name}' (at 0x{args.offset:X}, size 0x{args.size:X})") - f.seek(args.offset + args.size - 102, SEEK_SET) - rbl = f.read(102) - rbl = b"".join(bk.uncrc(rbl)) - rbl = RBL.deserialize(rbl) - print(f" - found '{rbl.name}' ({rbl.version}), size {rbl.data_size}") - f.seek(0, SEEK_SET) - crc_size = (rbl.data_size - 16) // 32 * 34 - gen = bk.crypt(args.addr, bk.uncrc(fileiter(f, 32, 0xFF, crc_size))) - - written = 0 - for data in gen: - args.output.write(data) - written += len(data) - print(f" - wrote {written} bytes in {time()-start:.3f} s") diff --git a/tools/util/crc16.py b/tools/util/crc16.py deleted file mode 100644 index f9a2841..0000000 --- a/tools/util/crc16.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -from enum import Enum -from typing import List - - -class CRC16(Enum): - # based on https://crccalc.com/ and https://reveng.sourceforge.io/crc-catalogue/16.htm - ANSI = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000) - ARC = dict(poly=0x8005, init=0x0000, ref=True, out=0x0000) - AUG_CCITT = dict(poly=0x1021, init=0x1D0F, ref=False, out=0x0000) - AUTOSAR = dict(poly=0x1021, init=0xFFFF, ref=False, out=0x0000) - BUYPASS = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000) - CCITT = dict(poly=0x1021, init=0x0000, ref=True, out=0x0000) - CCITT_FALSE = dict(poly=0x1021, init=0xFFFF, ref=False, out=0x0000) - CCITT_TRUE = dict(poly=0x1021, init=0x0000, ref=True, out=0x0000) - CDMA2000 = dict(poly=0xC867, init=0xFFFF, ref=False, out=0x0000) - CMS = dict(poly=0x8005, init=0xFFFF, ref=False, out=0x0000) - CRC_A = dict(poly=0x1021, init=0xC6C6, ref=True, out=0x0000) - CRC_B = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF) - DARC = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF) - DDS_110 = dict(poly=0x8005, init=0x800D, ref=False, out=0x0000) - DECT_R = dict(poly=0x0589, init=0x0000, ref=False, out=0x0001) - DECT_X = dict(poly=0x0589, init=0x0000, ref=False, out=0x0000) - DNP = dict(poly=0x3D65, init=0x0000, ref=True, out=0xFFFF) - EN_13757 = dict(poly=0x3D65, init=0x0000, ref=False, out=0xFFFF) - EPC = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF) - EPC_C1G2 = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF) - GENIBUS = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF) - GSM = dict(poly=0x1021, init=0x0000, ref=False, out=0xFFFF) - I_CODE = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF) - IBM = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000) - IBM_3740 = dict(poly=0x1021, init=0xFFFF, ref=False, out=0x0000) - IBM_SDLC = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF) - IEC_61158_2 = dict(poly=0x1DCF, init=0xFFFF, ref=False, out=0xFFFF) - ISO_14443_3_A = dict(poly=0x1021, init=0xC6C6, ref=True, out=0x0000) - ISO_14443_3_B = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF) - ISO_HDLC = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF) - KERMIT = dict(poly=0x1021, init=0x0000, ref=True, out=0x0000) - LHA = dict(poly=0x8005, init=0x0000, ref=True, out=0x0000) - LJ1200 = dict(poly=0x6F63, init=0x0000, ref=False, out=0x0000) - M17 = dict(poly=0x5935, init=0xFFFF, ref=False, out=0x0000) - MAXIM = dict(poly=0x8005, init=0x0000, ref=True, out=0xFFFF) - MCRF4XX = dict(poly=0x1021, init=0xFFFF, ref=True, out=0x0000) - MODBUS = dict(poly=0x8005, init=0xFFFF, ref=True, out=0x0000) - NRSC_5 = dict(poly=0x080B, init=0xFFFF, ref=True, out=0x0000) - OPENSAFETY_A = dict(poly=0x5935, init=0x0000, ref=False, out=0x0000) - OPENSAFETY_B = dict(poly=0x755B, init=0x0000, ref=False, out=0x0000) - PROFIBUS = dict(poly=0x1DCF, init=0xFFFF, ref=False, out=0xFFFF) - RIELLO = dict(poly=0x1021, init=0xB2AA, ref=True, out=0x0000) - SPI_FUJITSU = dict(poly=0x1021, init=0x1D0F, ref=False, out=0x0000) - T10_DIF = dict(poly=0x8BB7, init=0x0000, ref=False, out=0x0000) - TELEDISK = dict(poly=0xA097, init=0x0000, ref=False, out=0x0000) - TMS37157 = dict(poly=0x1021, init=0x89EC, ref=True, out=0x0000) - UMTS = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000) - USB = dict(poly=0x8005, init=0xFFFF, ref=True, out=0xFFFF) - V_41_LSB = dict(poly=0x1021, init=0x0000, ref=True, out=0x0000) - VERIFONE = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000) - X_25 = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF) - XMODEM = dict(poly=0x1021, init=0x0000, ref=False, out=0x0000) - - poly: int - init: int - ref: bool - out: int - table: List[int] - - def __init__(self, params: dict) -> None: - super().__init__() - self.poly = params["poly"] - self.init = params["init"] - self.ref = params["ref"] - self.out = params["out"] - self.table = None - if self.ref: - self.poly = self.reverse16(self.poly) - self.init = self.reverse16(self.init) - - @staticmethod - def reverse16(num: int) -> int: - out = 0 - for i in range(16): - out |= ((num & (1 << i)) >> i) << (15 - i) - return out - - def calc(self, data: bytes) -> int: - if self.ref: - self._init_ref() - return self._calc_ref(data) - self._init_std() - return self._calc_std(data) - - def _init_std(self): - if self.table: - return - self.table = [] - for b in range(256): - crc = b << 8 - for _ in range(8): - if crc & 0x8000: - crc <<= 1 - crc ^= self.poly - else: - crc <<= 1 - self.table.append(crc & 0xFFFF) - - def _init_ref(self): - if self.table: - return - self.table = [] - for b in range(256): - crc = b - for _ in range(8): - if crc & 0x0001: - crc >>= 1 - crc ^= self.poly - else: - crc >>= 1 - self.table.append(crc) - - def _calc_std(self, data: bytes) -> int: - crc = self.init - for b in data: - b ^= crc // 256 - crc = self.table[b] ^ (crc * 256 % 0x10000) - return crc ^ self.out - - def _calc_ref(self, data: bytes) -> int: - crc = self.init - for b in data: - b ^= crc % 256 - crc = self.table[b] ^ (crc // 256) - return crc ^ self.out diff --git a/tools/util/fileio.py b/tools/util/fileio.py deleted file mode 100644 index 566fa3e..0000000 --- a/tools/util/fileio.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-10. - -import json -from io import BytesIO -from os.path import dirname, getmtime, isfile, join -from typing import List, Union - - -def chname(path: str, name: str) -> str: - """Change the basename of 'path' to 'name'.""" - return join(dirname(path), name) - - -def chext(path: str, ext: str) -> str: - """Change the file extension of 'path' to 'ext' (without the dot).""" - return path.rpartition(".")[0] + "." + ext - - -def isnewer(what: str, than: str) -> bool: - """Check if 'what' is newer than 'than'. - - Returns False if 'what' is not a file. - - Returns True if 'than' is not a file. - """ - if not isfile(what): - return False - if not isfile(than): - return True - return getmtime(what) > getmtime(than) - - -def readbin(file: str) -> bytes: - """Read a binary file into a bytes object.""" - with open(file, "rb") as f: - data = f.read() - return data - - -def writebin(file: str, data: Union[bytes, BytesIO]): - """Write data into a binary file.""" - with open(file, "wb") as f: - if isinstance(data, BytesIO): - f.write(data.getvalue()) - else: - f.write(data) - - -# same as load_json -def readjson(file: str) -> Union[dict, list]: - """Read a JSON file into a dict or list.""" - with open(file, "r", encoding="utf-8") as f: - return json.load(f) - - -def writejson(file: str, data: Union[dict, list]): - """Write a dict or list to a JSON file.""" - with open(file, "w", encoding="utf-8") as f: - json.dump(data, f) - - -def readtext(file: str) -> str: - """Read a text file into a string.""" - with open(file, "r", encoding="utf-8") as f: - data = f.read() - return data - - -def writetext(file: str, data: Union[str, bytes, List[str]]): - """Write data into a text file.""" - with open(file, "w", encoding="utf-8") as f: - if isinstance(data, bytes): - f.write(data.decode()) - elif isinstance(data, list): - f.write("\n".join(data)) - f.write("\n") - else: - f.write(data) diff --git a/tools/util/intbin.py b/tools/util/intbin.py deleted file mode 100644 index 209e865..0000000 --- a/tools/util/intbin.py +++ /dev/null @@ -1,204 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -from io import FileIO -from typing import IO, Generator, Union - -ByteGenerator = Generator[bytes, None, None] - - -def bswap(data: bytes) -> bytes: - """Reverse the byte array (big-endian <-> little-endian).""" - return bytes(reversed(data)) - - -def betoint(data: bytes) -> int: - """Convert bytes to big-endian unsigned integer.""" - return int.from_bytes(data, byteorder="big") - - -def letoint(data: bytes) -> int: - """Convert bytes to little-endian unsigned integer.""" - return int.from_bytes(data, byteorder="little") - - -def betosint(data: bytes) -> int: - """Convert bytes to big-endian signed integer.""" - return int.from_bytes(data, byteorder="big", signed=True) - - -def letosint(data: bytes) -> int: - """Convert bytes to little-endian signed integer.""" - return int.from_bytes(data, byteorder="little", signed=True) - - -def inttobe32(data: int) -> bytes: - """Convert unsigned integer to 32 bits, big-endian.""" - return data.to_bytes(length=4, byteorder="big") - - -def inttole32(data: int) -> bytes: - """Convert unsigned integer to 32 bits, little-endian.""" - return data.to_bytes(length=4, byteorder="little") - - -def inttobe24(data: int) -> bytes: - """Convert unsigned integer to 24 bits, big-endian.""" - return data.to_bytes(length=3, byteorder="big") - - -def inttole24(data: int) -> bytes: - """Convert unsigned integer to 24 bits, little-endian.""" - return data.to_bytes(length=3, byteorder="little") - - -def inttobe16(data: int) -> bytes: - """Convert unsigned integer to 16 bits, big-endian.""" - return data.to_bytes(length=2, byteorder="big") - - -def inttole16(data: int) -> bytes: - """Convert unsigned integer to 16 bits, little-endian.""" - return data.to_bytes(length=2, byteorder="little") - - -def intto8(data: int) -> bytes: - """Convert unsigned integer to 8 bits.""" - return data.to_bytes(length=1, byteorder="big") - - -def sinttobe32(data: int) -> bytes: - """Convert signed integer to 32 bits, big-endian.""" - return data.to_bytes(length=4, byteorder="big", signed=True) - - -def sinttole32(data: int) -> bytes: - """Convert signed integer to 32 bits, little-endian.""" - return data.to_bytes(length=4, byteorder="little", signed=True) - - -def sinttobe24(data: int) -> bytes: - """Convert signed integer to 24 bits, big-endian.""" - return data.to_bytes(length=3, byteorder="big", signed=True) - - -def sinttole24(data: int) -> bytes: - """Convert signed integer to 24 bits, little-endian.""" - return data.to_bytes(length=3, byteorder="little", signed=True) - - -def sinttobe16(data: int) -> bytes: - """Convert signed integer to 16 bits, big-endian.""" - return data.to_bytes(length=2, byteorder="big", signed=True) - - -def sinttole16(data: int) -> bytes: - """Convert signed integer to 16 bits, little-endian.""" - return data.to_bytes(length=2, byteorder="little", signed=True) - - -def sintto8(data: int) -> bytes: - """Convert signed integer to 8 bits.""" - return data.to_bytes(length=1, byteorder="little", signed=True) - - -def align_up(x: int, n: int) -> int: - """Return x aligned up to block size of n.""" - return int((x - 1) // n + 1) * n - - -def align_down(x: int, n: int) -> int: - """Return 'x' aligned down to block size of 'n'.""" - return int(x // n) * n - - -def pad_up(x: int, n: int) -> int: - """Return how many bytes of padding is needed to align 'x' - up to block size of 'n'.""" - return n - (x % n) - - -def pad_data(data: bytes, n: int, char: int) -> bytes: - """Add 'char'-filled padding to 'data' to align to a 'n'-sized block.""" - if len(data) % n == 0: - return data - return data + (bytes([char]) * pad_up(len(data), n)) - - -def uint8(val): - """Get only the least-significant 8 bits of the value.""" - return val & 0xFF - - -def uint16(val): - """Get only the least-significant 16 bits of the value.""" - return val & 0xFFFF - - -def uint32(val): - """Get only the least-significant 32 bits of the value.""" - return val & 0xFFFFFFFF - - -def uintmax(bits: int) -> int: - """Get maximum integer size for given bit width.""" - return (2**bits) - 1 - - -def biniter(data: bytes, size: int) -> ByteGenerator: - """Iterate over 'data' in 'size'-bytes long chunks, returning - a generator.""" - if len(data) % size != 0: - raise ValueError( - f"Data length must be a multiple of block size ({len(data)} % {size})" - ) - for i in range(0, len(data), size): - yield data[i : i + size] - - -def geniter(gen: Union[ByteGenerator, bytes, IO], size: int) -> ByteGenerator: - """ - Take data from 'gen' and generate 'size'-bytes long chunks. - - If 'gen' is a bytes or IO object, it is wrapped using - biniter() or fileiter(). - """ - if isinstance(gen, bytes): - yield from biniter(gen, size) - return - if isinstance(gen, IO): - yield from fileiter(gen, size) - return - buf = b"" - for part in gen: - if not buf and len(part) == size: - yield part - continue - buf += part - while len(buf) >= size: - yield buf[0:size] - buf = buf[size:] - - -def fileiter( - f: FileIO, size: int, padding: int = 0x00, count: int = 0 -) -> ByteGenerator: - """ - Read data from 'f' and generate 'size'-bytes long chunks. - - Pad incomplete chunks with 'padding' character. - - Read up to 'count' bytes from 'f', if specified. Data is padded - if not on chunk boundary. - """ - read = 0 - while True: - if count and read + size >= count: - yield pad_data(f.read(count % size), size, padding) - return - data = f.read(size) - read += len(data) - if len(data) < size: - # got only part of the block - yield pad_data(data, size, padding) - return - yield data diff --git a/tools/util/models.py b/tools/util/models.py deleted file mode 100644 index 0365a52..0000000 --- a/tools/util/models.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -from os.path import dirname, isdir, join -from typing import List - - -class Family: - id: int - short_name: str - description: str - name: str = None - parent: str = None - code: str = None - parent_code: str = None - url: str = None - sdk: str = None - framework: str = None - mcus: List[str] = [] - - def __init__(self, data: dict): - for key, value in data.items(): - if key == "id": - self.id = int(value, 16) - else: - setattr(self, key, value) - - @property - def sdk_name(self) -> str: - return self.sdk.rpartition("/")[2] if self.sdk else None - - @property - def has_arduino_core(self) -> bool: - if not self.name: - return False - if isdir(join(dirname(__file__), "..", "..", "arduino", self.name)): - return True - if not self.parent: - return False - if isdir(join(dirname(__file__), "..", "..", "arduino", self.parent)): - return True - return False - - def dict(self) -> dict: - return dict( - FAMILY=self.short_name, - FAMILY_ID=self.id, - FAMILY_NAME=self.name, - FAMILY_PARENT=self.parent, - FAMILY_CODE=self.code, - FAMILY_PARENT_CODE=self.parent_code, - ) - - def __eq__(self, __o: object) -> bool: - return isinstance(__o, Family) and self.id == __o.id - - def __iter__(self): - return iter(self.dict().items()) - - def __repr__(self) -> str: - return f"" diff --git a/tools/util/obj.py b/tools/util/obj.py deleted file mode 100644 index 243887e..0000000 --- a/tools/util/obj.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -import json -from typing import Tuple, Union - -SliceLike = Union[slice, str, int] - - -def merge_dicts(d1, d2): - if d1 is not None and type(d1) != type(d2): - raise TypeError("d1 and d2 are different types") - if isinstance(d2, list): - if d1 is None: - d1 = [] - d1.extend(merge_dicts(None, item) for item in d2) - elif isinstance(d2, dict): - if d1 is None: - d1 = {} - for key in d2: - d1[key] = merge_dicts(d1.get(key, None), d2[key]) - else: - d1 = d2 - return d1 - - -def load_json(file: str) -> Union[dict, list]: - with open(file, "r", encoding="utf-8") as f: - return json.load(f) - - -def get(data: dict, path: str): - if not isinstance(data, dict) or not path: - return None - if "." not in path: - return data.get(path, None) - key, _, path = path.partition(".") - return get(data.get(key, None), path) - - -def slice2int(val: SliceLike) -> Tuple[int, int]: - """Convert a slice-like value (slice, string '7:0' or '3', int '3') - to a tuple of (start, stop).""" - if isinstance(val, int): - return (val, val) - if isinstance(val, slice): - if val.step: - raise ValueError("value must be a slice without step") - if val.start < val.stop: - raise ValueError("start must not be less than stop") - return (val.start, val.stop) - if isinstance(val, str): - if ":" in val: - val = val.split(":") - if len(val) == 2: - return tuple(map(int, val)) - elif val.isnumeric(): - return (int(val), int(val)) - raise ValueError(f"invalid slice format: {val}") - - -# https://stackoverflow.com/a/1094933/9438331 -def sizeof(num: int, suffix="iB", base=1024.0) -> str: - for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: - if abs(num) < base: - return f"{num:.1f} {unit}{suffix}".replace(".0 ", " ") - num /= base - return f"{num:.1f} Y{suffix}".replace(".0 ", " ") diff --git a/tools/util/platform.py b/tools/util/platform.py deleted file mode 100644 index 2b28cd7..0000000 --- a/tools/util/platform.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright (c) Kuba Szczodrzyński 2022-06-02. - -from glob import glob -from os.path import basename, dirname, isfile, join -from typing import Dict, List, Union - -from tools.util.models import Family -from tools.util.obj import load_json, merge_dicts - -boards_base: Dict[str, dict] = {} -families: List[Family] = [] - - -def get_board_list() -> List[str]: - boards_glob = join(dirname(__file__), "..", "..", "boards", "*.json") - return [basename(file)[:-5] for file in glob(boards_glob)] - - -def get_board_manifest(board: Union[str, dict]) -> dict: - boards_dir = join(dirname(__file__), "..", "..", "boards") - if not isinstance(board, dict): - if not isfile(board): - board = join(boards_dir, f"{board}.json") - board = load_json(board) - if "_base" in board: - base = board["_base"] - if not isinstance(base, list): - base = [base] - result = {} - for base_name in base: - if base_name not in boards_base: - file = join(boards_dir, "_base", f"{base_name}.json") - boards_base[base_name] = load_json(file) - merge_dicts(result, boards_base[base_name]) - merge_dicts(result, board) - board = result - return board - - -def get_families() -> List[Family]: - global families - if families: - return families - file = join(dirname(__file__), "..", "..", "families.json") - families = [Family(f) for f in load_json(file)] - return families - - -def get_family( - any: str = None, - id: Union[str, int] = None, - short_name: str = None, - name: str = None, - code: str = None, -) -> Family: - if any: - id = any - short_name = any - name = any - code = any - if id and isinstance(id, str) and id.startswith("0x"): - id = int(id, 16) - for family in get_families(): - if id and family.id == id: - return family - if short_name and family.short_name == short_name.upper(): - return family - if name and family.name == name.lower(): - return family - if code and family.code == code.lower(): - return family - if any: - raise ValueError(f"Family not found - {any}") - text = ", ".join(filter(None, [id, short_name, name, code])) - raise ValueError(f"Family not found - {text}")