diff --git a/ct002/__init__.py b/ct002/__init__.py new file mode 100644 index 0000000..f6f2199 --- /dev/null +++ b/ct002/__init__.py @@ -0,0 +1 @@ +from .ct002 import CTEmulator \ No newline at end of file diff --git a/ct002/ct002.py b/ct002/ct002.py new file mode 100644 index 0000000..16c4d26 --- /dev/null +++ b/ct002/ct002.py @@ -0,0 +1,271 @@ +import socket +import threading +import time +import re + +SOH = 0x01 +STX = 0x02 +ETX = 0x03 +SEPARATOR = '|' +UDP_PORT = 12345 + +# Field labels for response (from reference) +RESPONSE_LABELS = [ + "meter_dev_type", "meter_mac_code", "hhm_dev_type", "hhm_mac_code", + "A_phase_power", "B_phase_power", "C_phase_power", "total_power", + "A_chrg_nb", "B_chrg_nb", "C_chrg_nb", "ABC_chrg_nb", "wifi_rssi", "info_idx", + "x_chrg_power", "A_chrg_power", "B_chrg_power", "C_chrg_power", "ABC_chrg_power", + "x_dchrg_power", "A_dchrg_power", "B_dchrg_power", "C_dchrg_power", "ABC_dchrg_power" +] + +class CTEmulator: + def __init__(self, device_type="HMG-50", battery_mac="001122334455", ct_mac="009c17abcdef", ct_type="HME-4", poll_interval=1, discovery_battery_macs=None, dedupe_time_window=10): + self.device_type = device_type + self.battery_mac = battery_mac + self.ct_mac = ct_mac + self.ct_type = ct_type + self.poll_interval = poll_interval + self._udp_thread = None + self._stop = False + self._value = [0, 0, 0] + self._value_mutex = threading.Lock() + if discovery_battery_macs is None: + self.discovery_battery_macs = ["001122334455"] + else: + self.discovery_battery_macs = discovery_battery_macs + self.dedupe_time_window = dedupe_time_window + self._last_response_time = {} + self.before_send = None + + @property + def value(self): + return self._value + + @value.setter + def value(self, value): + with self._value_mutex: + self._value = value + + def validate_mac(self, mac): + return re.fullmatch(r"[0-9a-fA-F]{12}", mac) is not None + + def calculate_checksum(self, data_bytes): + xor = 0 + for b in data_bytes: + xor ^= b + return xor + + def parse_ct002_request(self, data): + # Validate SOH, STX, ETX, length, checksum + if len(data) < 10: + return None, "Too short" + if data[0] != SOH or data[1] != STX: + return None, "Missing SOH/STX" + sep_index = data.find(b'|', 2) + if sep_index == -1: + return None, "No separator after length" + try: + length = int(data[2:sep_index].decode('ascii')) + except ValueError: + return None, "Invalid length field" + if len(data) != length: + return None, f"Length mismatch (expected {length}, got {len(data)})" + if data[-3] != ETX: + return None, "Missing ETX" + # Checksum + xor = 0 + for b in data[:length-2]: + xor ^= b + expected_checksum = f"{xor:02x}".encode('ascii') + actual_checksum = data[-2:] + # Accept both '03' and ' 3' as valid for 0x03 + if actual_checksum.lower() != expected_checksum: + if actual_checksum[0:1] == b' ' and actual_checksum[1:2] == expected_checksum[1:2]: + pass # Accept space-padded single digit + else: + return None, f"Checksum mismatch (expected {expected_checksum}, got {actual_checksum})" + # Parse fields + try: + message = data[4:-3].decode('ascii') + except UnicodeDecodeError: + return None, "Invalid ASCII encoding" + fields = message.split('|')[1:] # first char is '|' + return fields, None + + def build_ct002_response(self, request_fields): + values = self.value if self.value else [0, 0, 0] + # meter_dev_type and meter_mac_code from request, hhm_dev_type and hhm_mac_code from emulator config + response_fields = [ + request_fields[0], # meter_dev_type (from request) + request_fields[1], # meter_mac_code (from request) + self.ct_type, # hhm_dev_type (from emulator) + self.ct_mac, # hhm_mac_code (from emulator) + str(values[0]), # A_phase_power + str(values[1]), # B_phase_power + str(values[2]), # C_phase_power + str(sum(values)), # total_power + ] + ["0"] * (len(RESPONSE_LABELS) - 8) + message_str = SEPARATOR + SEPARATOR.join(response_fields) + message_bytes = message_str.encode('ascii') + base_size = 1 + 1 + len(message_bytes) + 1 + 2 + for length_digits in range(1, 5): + total_length = base_size + length_digits + if len(str(total_length)) == length_digits: + break + length_str = str(total_length).encode('ascii') + payload = bytearray([SOH, STX]) + length_str + message_bytes + bytearray([ETX]) + checksum = f"{self.calculate_checksum(payload):02x}".encode('ascii') + payload += checksum + return payload + + def format_ct_response_readable(self, data): + # Show control characters as labels, printable ASCII as-is + def safe_char(byte): + if byte == SOH: + return '' + elif byte == STX: + return '' + elif byte == ETX: + return '' + elif 32 <= byte <= 126: + return chr(byte) + else: + return f'<0x{byte:02X}>' + return ''.join(safe_char(b) for b in data) + + def udp_server(self): + udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_sock.bind(("", UDP_PORT)) + print(f"CTEmulator UDP server is listening on port {UDP_PORT}...") + try: + while not self._stop: + udp_sock.settimeout(1.0) + try: + data, addr = udp_sock.recvfrom(1024) + except socket.timeout: + continue + # Try CT001 (plain ASCII 'hame') + try: + decoded = data.decode() + if decoded == "hame": + current_time = time.time() + if ( + addr not in self._last_response_time + or (current_time - self._last_response_time[addr]) > self.dedupe_time_window + ): + print(f"Received 'hame' from {addr}") + udp_sock.sendto(b"ack", addr) + self._last_response_time[addr] = current_time + print(f"Sent 'ack' to {addr}") + else: + print(f"Received 'hame' from {addr} but ignored due to dedupe window") + continue + except UnicodeDecodeError: + pass # Not a plain ASCII message + # Try CT002 protocol + fields, error = self.parse_ct002_request(data) + if error: + print(f"Invalid CT002 request from {addr}: {error}") + continue + # Discovery logic: only respond if CT MAC matches and battery MAC is valid + if len(fields) < 4: + print(f"CT002 request from {addr} does not have enough fields for discovery check") + continue + req_battery_mac = fields[1] + req_ct_mac = fields[3] + if req_ct_mac.lower() != self.ct_mac.lower() and req_ct_mac != "000000000000": + print(f"Ignoring CT002 request from {addr}: CT MAC mismatch (got {req_ct_mac}, expected {self.ct_mac} or 000000000000)") + continue + # Accept any battery MAC + # if req_battery_mac not in self.discovery_battery_macs and req_battery_mac != self.battery_mac: + # print(f"Ignoring CT002 request from {addr}: Battery MAC {req_battery_mac} not in discovery list or not our battery MAC") + # continue + print(f"Valid CT002 discovery/query request from {addr}: {fields}") + if self.before_send: + self.before_send(addr) + response = self.build_ct002_response(fields) + print(f"CT002 response to {addr}: {response.hex()} | {self.format_ct_response_readable(response)}") + udp_sock.sendto(response, addr) + print(f"Sent CT002 response to {addr}") + finally: + udp_sock.close() + + def handle_tcp_client(self, conn, addr): + print(f"TCP connection established with {addr}") + try: + data = conn.recv(1024) + decoded = data.decode() + if decoded == "hello": + print("Received 'hello'") + last_send_time = 0 + while not self._stop: + current_time = time.time() + time_since_last_send = current_time - last_send_time + if time_since_last_send >= self.poll_interval: + if self.before_send: + self.before_send(addr) + with self._value_mutex: + if self.value is None: + print(f"No value to send to {addr}") + break + value1, value2, value3 = self.value + value1 = round(value1) + value2 = round(value2) + value3 = round(value3) + message = f"HM:{value1}|{value2}|{value3}" + try: + conn.send(message.encode()) + last_send_time = current_time + print(f"Sent message to {addr}: {message}") + except BrokenPipeError: + print(f"Connection with {addr} broken. Waiting for a new connection.") + break + time.sleep(self.poll_interval) + else: + time.sleep(0.01) + else: + print(f"Received unknown TCP message: {decoded}") + finally: + conn.close() + print(f"Connection with {addr} closed") + + def tcp_server(self): + tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tcp_sock.bind(("", UDP_PORT)) + tcp_sock.listen(5) + print("CTEmulator TCP server is listening...") + try: + while not self._stop: + conn, addr = tcp_sock.accept() + client_thread = threading.Thread( + target=self.handle_tcp_client, args=(conn, addr) + ) + client_thread.start() + finally: + print("Stop listening for TCP connections") + tcp_sock.close() + + def start(self): + if self._udp_thread or hasattr(self, '_tcp_thread') and self._tcp_thread: + return + self._stop = False + self._udp_thread = threading.Thread(target=self.udp_server) + self._tcp_thread = threading.Thread(target=self.tcp_server) + self._udp_thread.start() + self._tcp_thread.start() + + def join(self): + if self._udp_thread: + self._udp_thread.join() + if hasattr(self, '_tcp_thread') and self._tcp_thread: + self._tcp_thread.join() + + def stop(self): + self._stop = True + if self._udp_thread: + self._udp_thread.join() + if hasattr(self, '_tcp_thread') and self._tcp_thread: + self._tcp_thread.join() + self._udp_thread = None + if hasattr(self, '_tcp_thread'): + self._tcp_thread = None \ No newline at end of file diff --git a/main.py b/main.py index c73513f..982eeb1 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from powermeter import Powermeter from shelly import Shelly from collections import OrderedDict +from ct002 import CTEmulator def test_powermeter(powermeter, client_filter): @@ -84,6 +85,47 @@ def update_readings(addr): device.before_send = update_readings + elif device_type == "ct002": + # CT002-specific config (can be extended to use config/args) + device_type_str = cfg.get("GENERAL", "CT002_DEVICE_TYPE", fallback="HMG-50") + battery_mac = cfg.get("GENERAL", "CT002_BATTERY_MAC", fallback="001122334455") + ct_mac = cfg.get("GENERAL", "CT002_CT_MAC", fallback="009c17abcdef") + ct_type = cfg.get("GENERAL", "CT002_CT_TYPE", fallback="HME-4") + poll_interval = ( + args.poll_interval + if args.poll_interval is not None + else cfg.getint("GENERAL", "POLL_INTERVAL", fallback=1) + ) + print(f"CT002 Settings for {device_id}:") + print(f"Device Type: {device_type_str}") + print(f"Battery MAC: {battery_mac}") + print(f"CT MAC: {ct_mac}") + print(f"CT Type: {ct_type}") + print(f"Poll Interval: {poll_interval}") + device = CTEmulator( + device_type=device_type_str, + battery_mac=battery_mac, + ct_mac=ct_mac, + ct_type=ct_type, + poll_interval=poll_interval, + ) + def update_readings(addr): + powermeter = None + for pm, client_filter in powermeters: + if client_filter.matches(addr[0]): + powermeter = pm + break + if powermeter is None: + print(f"No powermeter found for client {addr[0]}") + device.value = None + return + values = powermeter.get_powermeter_watts() + value1 = values[0] if len(values) > 0 else 0 + value2 = values[1] if len(values) > 1 else 0 + value3 = values[2] if len(values) > 2 else 0 + device.value = [value1, value2, value3] + device.before_send = update_readings + elif device_type == "shellypro3em_old": print(f"Shelly Pro 3EM Settings:") print(f"Device ID: {device_id}") @@ -126,6 +168,7 @@ def main(): nargs="+", choices=[ "ct001", + "ct002", "shellypro3em", "shellyemg3", "shellyproem50",