From afce48207a2c8ee9645474e1996ca084f483882f Mon Sep 17 00:00:00 2001 From: Piotr Paczynski Date: Sat, 13 Jul 2024 08:37:26 +0200 Subject: [PATCH] add support for LifaAir air purifier --- README.md | 14 ++++ broadlink/__init__.py | 4 ++ broadlink/purifier.py | 156 ++++++++++++++++++++++++++++++++++++++++++ cli/test_lifaair | 44 ++++++++++++ 4 files changed, 218 insertions(+) create mode 100644 broadlink/purifier.py create mode 100644 cli/test_lifaair diff --git a/README.md b/README.md index 81c6de5b..68c9a98c 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ A Python module and CLI for controlling Broadlink devices locally. The following - **Curtain motors**: Dooya DT360E-45/20 - **Thermostats**: Hysen HY02B05H - **Hubs**: S3 +- **Air purifiers**: LIFAair LM05 ## Installation @@ -216,6 +217,19 @@ devices[0].set_state(bulb_colormode=1) data = device.check_sensors() ``` +## Air purifiers + +### Fetching purifier state +```python3 +data = device.get_state() +``` + +### Controlling purifier fan +```python3 +device.set_fan_mode(FanMode.TURBO) +device.set_fan_speed(50) +``` + ## Hubs ### Discovering subdevices diff --git a/broadlink/__init__.py b/broadlink/__init__.py index d3135501..09c9ed81 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -14,6 +14,7 @@ from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro from .sensor import a1, a2 from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b +from .purifier import lifaair SUPPORTED_TYPES = { sp1: { @@ -207,6 +208,9 @@ ehc31: { 0x6480: ("EHC31", "BG Electrical"), }, + lifaair: { + 0x4ec2: ("LM05", "LIFAair"), + } } diff --git a/broadlink/purifier.py b/broadlink/purifier.py new file mode 100644 index 00000000..bf27093f --- /dev/null +++ b/broadlink/purifier.py @@ -0,0 +1,156 @@ +"""Support for air purifiers.""" + +import enum + +from . import exceptions as e +from .device import Device + +@enum.unique +class FanMode(enum.IntEnum): + """Represents mode of the fan.""" + + OFF = 0 + AUTO = 1 + NIGHT = 2 + TURBO = 3 + ANTI_ALLERGY = 4 + MANUAL = 5 + + UNKNOWN = -1 + + +class lifaair(Device): + """Controls a broadcom-based LIFAair air purifier.""" + + TYPE = "LIFAAIR" + + FAN_STATE_TO_MODE = { + 0x81: FanMode.OFF, + 0xA5: FanMode.AUTO, + 0x95: FanMode.NIGHT, + 0x8D: FanMode.TURBO, + 0x85: FanMode.MANUAL, + 0x01: None, # fan is offline + } + + @enum.unique + class _Operation(enum.IntEnum): + SET_STATE = 1 + GET_STATE = 2 + + @enum.unique + class _Action(enum.IntEnum): + SET_FAN_SPEED = 1 + SET_FAN_MODE = 2 + + FAN_MODE_TO_ACTION_ARG = { + FanMode.OFF: 1, + FanMode.AUTO: 2, + FanMode.NIGHT: 6, + FanMode.TURBO: 7, + FanMode.ANTI_ALLERGY: 11, + } + + def set_fan_mode(self, fan_mode: FanMode) -> dict: + """Set mode of the fan. Returns updated state.""" + if fan_mode == FanMode.MANUAL: + return self.set_fan_speed(50) + + action_arg = self.FAN_MODE_TO_ACTION_ARG.get(fan_mode) + if action_arg is not None: + data = self._send( + self._Operation.SET_STATE, self._Action.SET_FAN_MODE, action_arg + ) + return self._decode_state(data) + + return self.get_state() + + def set_fan_speed(self, fan_speed: int) -> dict: + """Set fan speed (0-121). Returns updated state. Note that fan mode will be changed to MANUAL by the device.""" + data = self._send( + self._Operation.SET_STATE, self._Action.SET_FAN_SPEED, fan_speed + ) + return self._decode_state(data) + + def get_state(self) -> dict: + """ + Return the current state of the purifier as python dict. + + Note that the smart remote we're communicating with contains co2, tvoc and PM2.5 sensors, + while temperature, humidity and fan-state are fetched remotely from main unit which can be + offline (unplugged from mains, out of range) in which case those keys will be None. + + Format: + { + "temperature": 24.5, # float, deg C, can be None if main-unit offline + "humidity": 41, # int, %, can be None if main-unit offline + "co2": 425 # int, ppm + "tvoc": 150 # int, ug/m3 + "pm10": 9 # int, ug/m3 (unsure if this is PM10) + "pm2_5": 7 # int, ug/m3 (confirmed PM2.5) + "pm1": 5 # int, ug/m3 (unsure if this is PM1.0) + "fan_mode": FanMode.AUTO # FanMode enum, can be None if main-unit offline + "fan_speed": 50 # int, 0-121 + } + """ + data = self._send(self._Operation.GET_STATE) + return self._decode_state(data) + + def _decode_state(self, data: bytes) -> dict: + raw = self._decode_state_raw(data) + fan_mode = self._decode_fan_mode(raw["fan_state"], raw["fan_flags"]) + isOffline = fan_mode is None + return { + "temperature": None if isOffline else raw["temperature"] / 10.0, + "humidity": None if isOffline else raw["humidity"], + "co2": raw["co2"], + "tvoc": raw["tvoc"] * 10, + "pm10": raw["pm10"], + "pm2_5": raw["pm2_5"], + "pm1": raw["pm1"], + "fan_mode": fan_mode, + "fan_speed": raw["fan_speed"], + } + + def _decode_state_raw(self, data: bytes) -> dict: + return { + "temperature": data[27] + 256 * data[28], + "humidity": data[29], + "co2": data[31] + 256 * data[32], + "tvoc": data[35] + 256 * data[36], + "pm10": data[37] + 256 * data[38], + "pm2_5": data[39] + 256 * data[40], + "pm1": data[41] + 256 * data[42], + "fan_state": data[55], + "fan_speed": data[56], + "fan_flags": data[57], + } + + def _decode_fan_mode(self, fan_state: int, fan_flags: int) -> FanMode: + if fan_flags & 0x40 == 0: + return FanMode.ANTI_ALLERGY + return self.FAN_STATE_TO_MODE.get(fan_state, FanMode.UNKNOWN) + + def _send(self, operation: int, action: int = 0, action_arg: int = 0) -> bytes: + """Send a command to the device.""" + packet = bytearray(26) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation & 0xFF + packet[0x0A] = 0x0C + packet[0x0E] = action & 0xFF + packet[0x0F] = action_arg & 0xFF + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + return self.decrypt(resp[0x38:]) diff --git a/cli/test_lifaair b/cli/test_lifaair new file mode 100644 index 00000000..f2d361b7 --- /dev/null +++ b/cli/test_lifaair @@ -0,0 +1,44 @@ +import time +import broadlink +from broadlink.purifier import lifaair, FanMode + +def print_state(state: dict): + temperature = state["temperature"] + humidity = state["humidity"] + print("-> Temperature=%s Humidity=%s CO2=%4dppm TVOC=%4dug/m3 PM2.5=%2dug/m3 FanSpeed=%d (%s)" % ( + "%2.1fC" % temperature if temperature is not None else "-----", + "%2d%%" %humidity if humidity is not None else "---", + state["co2"], + state["tvoc"], + state["pm2_5"], + state["fan_speed"], + state["fan_mode"])) + +print("Searching for lifaair devices... ") +dev: lifaair = next(dev for dev in broadlink.discover() if isinstance(dev, lifaair)) +print("Found %s" % dev) + +print("Authenticating... ", end="") +dev.auth() +print("OK (id=%d, key=%s)" % (dev.id, dev.aes.algorithm.key.hex())) + +print("Getting firmware version... ", end="") +print(dev.get_fwversion()) + +print("Getting state...") +print_state(dev.get_state()) + +for mode in FanMode: + print("Setting fan mode to %s" % mode) + print_state(dev.set_fan_mode(mode)) + time.sleep(5) + +for speed in (0, 60, 121): + print("Setting fan speed to %d" % speed) + print_state(dev.set_fan_speed(speed)) + time.sleep(5) + +print("Monitoring state...") +while True: + print_state(dev.get_state()) + time.sleep(1)