diff --git a/tools/emporia_sniff_parsing.py b/tools/emporia_sniff_parsing.py index 950deb9..88dffbd 100644 --- a/tools/emporia_sniff_parsing.py +++ b/tools/emporia_sniff_parsing.py @@ -4,11 +4,12 @@ from collections import defaultdict import re from datetime import datetime + def parse_sniffed_packet(csv_data): # Regular expression pattern for the timestamp pattern = r'("\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}",)' # Split the data using the pattern, keeping the delimiters - entries = re.split(pattern, csv_data)[1:] # first element is always empty + entries = re.split(pattern, csv_data)[1:] # first element is always empty parsed_data = {} for i in range(0, len(entries), 2): @@ -16,7 +17,7 @@ def parse_sniffed_packet(csv_data): value = entries[i + 1] # Parse the timestamp string into a datetime object - timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') + timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S") if timestamp in parsed_data: # Concatenate value if timestamp already exists @@ -27,6 +28,7 @@ def parse_sniffed_packet(csv_data): return parsed_data + def parse_emporia(csv_data): parsed_data = {} last_value = None @@ -46,7 +48,7 @@ def parse_emporia(csv_data): # Parse the timestamp string into a datetime object try: - timestamp = datetime.strptime(timestamp_str, '%m/%d/%Y %H:%M:%S') + timestamp = datetime.strptime(timestamp_str, "%m/%d/%Y %H:%M:%S") except ValueError: continue @@ -56,6 +58,7 @@ def parse_emporia(csv_data): return parsed_data + def find_nearest_before(from_emporia, sniffed): result = [] for keyA, valueA in from_emporia.items(): @@ -75,6 +78,7 @@ def find_nearest_before(from_emporia, sniffed): result.append((nearest_time, valueA, nearest_value)) return result + def find_nearest_after(sniffed, from_emporia): result = [] for keyA, valueA in sniffed.items(): @@ -93,16 +97,18 @@ def find_nearest_after(sniffed, from_emporia): if nearest_time: result.append((keyA, valueA, nearest_value)) else: - print('have time', keyA, 'that doesnt have a nearest value') + print("have time", keyA, "that doesnt have a nearest value") return result + def swap_endianness(hex_string): # Split the string into bytes (two characters each) - bytes_list = [hex_string[i:i+2] for i in range(0, len(hex_string), 2)] + bytes_list = [hex_string[i : i + 2] for i in range(0, len(hex_string), 2)] # Reverse the list of bytes and join them back into a string - reversed_hex = ''.join(reversed(bytes_list)) + reversed_hex = "".join(reversed(bytes_list)) return reversed_hex + def hex_to_decimal(hex_string): try: # First, swap the endianness @@ -112,14 +118,20 @@ def hex_to_decimal(hex_string): except ValueError: return "Invalid hexadecimal number" + def read_file(file_path): - with open(file_path, 'r') as file: + with open(file_path, "r") as file: return file.read() + def main(): parser = argparse.ArgumentParser(description="Merge the data.") - parser.add_argument("--sniffed_data", required=True, help="Filepath to the sniffed UART traffic.") - parser.add_argument("--emporia_csv", required=True, help="Filepath to the Emporia 1SEC CSV.") + parser.add_argument( + "--sniffed_data", required=True, help="Filepath to the sniffed UART traffic." + ) + parser.add_argument( + "--emporia_csv", required=True, help="Filepath to the Emporia 1SEC CSV." + ) args = parser.parse_args() @@ -136,7 +148,8 @@ def main(): hex3_to_watts[hex3].append((watts, packet)) for hex3, watts in hex3_to_watts.items(): - print('Hex3:', hex3, 'Calculated W:', hex_to_decimal(hex3), 'Pairing:', watts) + print("Hex3:", hex3, "Calculated W:", hex_to_decimal(hex3), "Pairing:", watts) + if __name__ == "__main__": - main() \ No newline at end of file + main()