mirror of
https://github.com/nekorevend/esphome-emporia-vue-utility.git
synced 2026-01-08 20:40:39 -07:00
Add a tool I used during my reverse engineering.
This commit is contained in:
48
tools/README.md
Normal file
48
tools/README.md
Normal file
@@ -0,0 +1,48 @@
|
||||
# Tools
|
||||
|
||||
During my investigation, I wanted to see the claimed wattage that corresponds to a given payload, but in an automated way. Since Emporia lets you export CSV data, I made a quick and dirty script to match up the `*1SEC.csv` file to the data I sniffed over UART.
|
||||
|
||||
## emporia_sniff_parsing.py
|
||||
|
||||
This script was crafted to deal with the firmware V7's payload, but you can adjust it to parse other kinds of payloads.
|
||||
|
||||
### How to run
|
||||
|
||||
`python3 .\emporia_sniff_parsing.py --sniffed_data capture_from_MGM111.txt --emporia_csv emporia_export_1SEC.csv`
|
||||
|
||||
### What it expects
|
||||
|
||||
`capture_from_MGM111.txt` should have contents like this:
|
||||
```
|
||||
"2023-12-08 09:05:11",2401722C18480100000025877D16090000010000251A14970000000103002201000002030022E803000004002AA704000D"2023-12-08 09:05:41",2401722C18490100000025907D16090000010000251A14970000000103002201000002030022E803000004002A8F04000D"2023-12-08 09:06:11",2401722C184A01000000259A7D16090000010000251A14970000000103002201000002030022E803000004002AA304000D"2023-12-08 09:06:41",2401722C184B0100000025A37D16090000010000251A14970000000103002201000002030022E803000004002A5404000D"2023-12-08 09:07:11",2401722C184C0100000025AC7D16090000010000251A14970000000103002201000002030022E803000004002A8E04000D"2023-12-08 09:07:41",2401722C184D0100000025B77D16090000010000251A14970000000103002201000002030022E803000004002AD104000D
|
||||
```
|
||||
|
||||
This was a format returned by [Realterm](https://realterm.sourceforge.io/), but you can adjust the code to match the format of your sniffed data.
|
||||
|
||||
`emporia_export_1SEC.csv` should have contents like this:
|
||||
```
|
||||
12/08/2023 08:55:40,1.3850
|
||||
12/08/2023 08:55:41,1.3850
|
||||
12/08/2023 08:55:42,1.3850
|
||||
12/08/2023 08:55:43,1.3850
|
||||
12/08/2023 08:55:44,1.3850
|
||||
12/08/2023 08:55:45,1.3850
|
||||
12/08/2023 08:55:46,1.4140
|
||||
12/08/2023 08:55:47,1.4140
|
||||
12/08/2023 08:55:48,1.4140
|
||||
```
|
||||
|
||||
### What to expect
|
||||
|
||||
```
|
||||
Hex3: C90600 Calculated W: 1737 Pairing: [('1.6560', '2401722C18210100000025A97B16090000010000251A14970000000103002201000002030022E803000004002AC906000D')]
|
||||
Hex3: 780600 Calculated W: 1656 Pairing: [('1.6790', '2401722C18230100000025C67B16090000010000251A14970000000103002201000002030022E803000004002A7806000D')]
|
||||
Hex3: 8F0600 Calculated W: 1679 Pairing: [('1.6820', '2401722C18250100000025E07B16090000010000251A14970000000103002201000002030022E803000004002A8F06000D')]
|
||||
Hex3: 7F0600 Calculated W: 1663 Pairing: [('1.6370', '2401722C18290100000025197C16090000010000251A14970000000103002201000002030022E803000004002A7F06000D')]
|
||||
Hex3: 650600 Calculated W: 1637 Pairing: [('1.5340', '2401722C182B0100000025347C16090000010000251A14970000000103002201000002030022E803000004002A6506000D')]
|
||||
Hex3: FE0500 Calculated W: 1534 Pairing: [('1.5260', '2401722C182D01000000254F7C16090000010000251A14970000000103002201000002030022E803000004002AFE05000D')]
|
||||
Hex3: F60500 Calculated W: 1526 Pairing: [('1.5820', '2401722C182F0100000025697C16090000010000251A14970000000103002201000002030022E803000004002AF605000D')]
|
||||
Hex3: 2E0600 Calculated W: 1582 Pairing: [('1.4740', '2401722C18310100000025827C16090000010000251A14970000000103002201000002030022E803000004002A2E06000D')]
|
||||
Hex3: C20500 Calculated W: 1474 Pairing: [('1.3850', '2401722C183301000000259B7C16090000010000251A14970000000103002201000002030022E803000004002AC205000D')]
|
||||
Hex3: 690500 Calculated W: 1385 Pairing: [('1.4140', '2401722C18350100000025B47C16090000010000251A14970000000103002201000002030022E803000004002A6905000D')]
|
||||
```
|
||||
142
tools/emporia_sniff_parsing.py
Normal file
142
tools/emporia_sniff_parsing.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import argparse
|
||||
import csv
|
||||
from collections import defaultdict
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
def parse_sniffed_packet(csv_data):
|
||||
# Regular expression pattern for the timestamp
|
||||
pattern = r'("\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}",)'
|
||||
# Split the data using the pattern, keeping the delimiters
|
||||
entries = re.split(pattern, csv_data)[1:] # first element is always empty
|
||||
|
||||
parsed_data = {}
|
||||
for i in range(0, len(entries), 2):
|
||||
timestamp_str = entries[i].strip('",')
|
||||
value = entries[i + 1]
|
||||
|
||||
# Parse the timestamp string into a datetime object
|
||||
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
|
||||
|
||||
if timestamp in parsed_data:
|
||||
# Concatenate value if timestamp already exists
|
||||
parsed_data[timestamp] += value
|
||||
else:
|
||||
# Otherwise, just add the new entry
|
||||
parsed_data[timestamp] = value
|
||||
|
||||
return parsed_data
|
||||
|
||||
def parse_emporia(csv_data):
|
||||
parsed_data = {}
|
||||
last_value = None
|
||||
|
||||
reader = csv.reader(csv_data.splitlines())
|
||||
for row in reader:
|
||||
# print(row)
|
||||
# Skip blank lines
|
||||
if not row or len(row) < 2:
|
||||
continue
|
||||
|
||||
timestamp_str, value = row
|
||||
|
||||
# Skip if the value is the same as the last one
|
||||
if value == last_value:
|
||||
continue
|
||||
|
||||
# Parse the timestamp string into a datetime object
|
||||
try:
|
||||
timestamp = datetime.strptime(timestamp_str, '%m/%d/%Y %H:%M:%S')
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# Update the last value and add the new entry
|
||||
last_value = value
|
||||
parsed_data[timestamp] = value
|
||||
|
||||
return parsed_data
|
||||
|
||||
def find_nearest_before(from_emporia, sniffed):
|
||||
result = []
|
||||
for keyA, valueA in from_emporia.items():
|
||||
# Initialize nearest datetime and its value
|
||||
nearest_time = None
|
||||
nearest_value = None
|
||||
# Go through each item in sniffed
|
||||
for keyB, valueB in sniffed.items():
|
||||
# Check if keyB is before keyA and within 1-5 seconds
|
||||
if 0 < (keyA - keyB).total_seconds() <= 5:
|
||||
# Update nearest_time if it's closer to keyA than the previous nearest_time
|
||||
if nearest_time is None or (keyA - keyB) < (keyA - nearest_time):
|
||||
nearest_time = keyB
|
||||
nearest_value = valueB
|
||||
# Append to result if a match is found
|
||||
if nearest_time:
|
||||
result.append((nearest_time, valueA, nearest_value))
|
||||
return result
|
||||
|
||||
def find_nearest_after(sniffed, from_emporia):
|
||||
result = []
|
||||
for keyA, valueA in sniffed.items():
|
||||
# Initialize nearest datetime and its value
|
||||
nearest_time = None
|
||||
nearest_value = None
|
||||
# Go through each item in dictB
|
||||
for keyB, valueB in from_emporia.items():
|
||||
# Check if keyB is after keyA and within 1-5 seconds
|
||||
if 1 < (keyB - keyA).total_seconds() <= 5:
|
||||
# Update nearest_time if it's closer to keyA than the previous nearest_time
|
||||
if nearest_time is None or (keyB - keyA) < (nearest_time - keyA):
|
||||
nearest_time = keyB
|
||||
nearest_value = valueB
|
||||
# Append to result if a match is found
|
||||
if nearest_time:
|
||||
result.append((keyA, valueA, nearest_value))
|
||||
else:
|
||||
print('have time', keyA, 'that doesnt have a nearest value')
|
||||
return result
|
||||
|
||||
def swap_endianness(hex_string):
|
||||
# Split the string into bytes (two characters each)
|
||||
bytes_list = [hex_string[i:i+2] for i in range(0, len(hex_string), 2)]
|
||||
# Reverse the list of bytes and join them back into a string
|
||||
reversed_hex = ''.join(reversed(bytes_list))
|
||||
return reversed_hex
|
||||
|
||||
def hex_to_decimal(hex_string):
|
||||
try:
|
||||
# First, swap the endianness
|
||||
swapped_hex = swap_endianness(hex_string)
|
||||
# Then, convert the hexadecimal string to decimal
|
||||
return int(swapped_hex, 16)
|
||||
except ValueError:
|
||||
return "Invalid hexadecimal number"
|
||||
|
||||
def read_file(file_path):
|
||||
with open(file_path, 'r') as file:
|
||||
return file.read()
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Merge the data.")
|
||||
parser.add_argument("--sniffed_data", required=True, help="Filepath to the sniffed UART traffic.")
|
||||
parser.add_argument("--emporia_csv", required=True, help="Filepath to the Emporia 1SEC CSV.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
sniffed_data = read_file(args.sniffed_data)
|
||||
parsed_sniff = parse_sniffed_packet(sniffed_data)
|
||||
|
||||
emporia_data = read_file(args.emporia_csv)
|
||||
parsed_emporia = parse_emporia(emporia_data)
|
||||
|
||||
hex3_to_watts = defaultdict(list)
|
||||
result = find_nearest_after(parsed_sniff, parsed_emporia)
|
||||
for timestamp, packet, watts in result:
|
||||
hex3 = packet[90:96]
|
||||
hex3_to_watts[hex3].append((watts, packet))
|
||||
|
||||
for hex3, watts in hex3_to_watts.items():
|
||||
print('Hex3:', hex3, 'Calculated W:', hex_to_decimal(hex3), 'Pairing:', watts)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user