Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion broadlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .cover import dooya, dooya2, wser
from .device import Device, ping, scan
from .hub import s3
from .light import lb1, lb2
from .light import lb1, lb2, fl1
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1, a2
from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
Expand Down Expand Up @@ -179,6 +179,9 @@
0xA5F7: ("LB27 R1", "Broadlink"),
0xA6EF: ("EFCF60WSMT", "Luceco"),
},
fl1: {
0x647A: {"Castra", "Luceco"},
},
S1C: {
0x2722: ("S2KIT", "Broadlink"),
},
Expand Down
128 changes: 96 additions & 32 deletions broadlink/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,42 @@
from .device import Device


class lb1(Device):
class lb1_base(Device):
"""Base for lb1 style devices."""

def get_state(self) -> dict:
"""Return the power state of the device.

Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': '', 'bulb_sceneidx': 255}`
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(14)
data = json.dumps(state, separators=(",", ":")).encode()
p_len = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, p_len, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet[0x02:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet

def _decode(self, response: bytes) -> dict:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0xE:0xE+js_len])
return state


class lb1(lb1_base):
"""Controls a Broadlink LB1."""

TYPE = "LB1"
Expand All @@ -21,16 +56,6 @@ class ColorMode(enum.IntEnum):
WHITE = 1
SCENE = 2

def get_state(self) -> dict:
"""Return the power state of the device.

Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': '', 'bulb_sceneidx': 255}`
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)

def set_state(
self,
pwr: Optional[bool] = None,
Expand Down Expand Up @@ -84,27 +109,6 @@ def set_state(
e.check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(14)
data = json.dumps(state, separators=(",", ":")).encode()
p_len = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, p_len, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet[0x02:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet

def _decode(self, response: bytes) -> dict:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0xE:0xE+js_len])
return state


class lb2(Device):
"""Controls a Broadlink LB26/LB27."""
Expand Down Expand Up @@ -198,3 +202,63 @@ def _decode(self, response: bytes) -> dict:
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C:0x0C+js_len])
return state


class fl1(lb1_base):
"""Controls a Luceco Castra Floodlight.

These set_state items control the floodlight
"mode_switch": 0 - detect motion; 5 - manual control; 6 - dusk to dawn; 7 - night glow
"brightness": 0 - 100 percent
"Delay time": 10 - 1800 seconds
"lightpwr": in manual mode, 0 - off; 1 - on
"Brightness control": 5 - 2000 set external brightness below which light turns on
"sensitivity_en": 0 - 3 set how much motion triggers light
"""

TYPE = "FL1"

def fix_state(self, state: dict) -> dict:
"""change state names that contain spaces """
if "Brightness control" in state:
state["brightness_control"] = state["Brightness control"]
del state["Brightness control"]
if "Delay time" in state:
state["delay_time"] = state["Delay time"]
del state["Delay time"]

return state

def get_state(self) -> dict:
return self.fix_state(super().get_state())

def set_state(
self,
mode_switch: Optional[int] = None,
lightpwr: Optional[bool] = None,
brightness: Optional[int] = None,
brightness_control: Optional[int] = None,
delay_time: Optional[int] = None,
sensitivity_en: Optional[int] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if mode_switch is not None:
state["mode_switch"] = int(mode_switch)
if lightpwr is not None:
state["lightpwr"] = int(bool(lightpwr))
state["mode_switch"] = 5
if brightness is not None:
state["brightness"] = int(brightness)
if brightness_control is not None:
state["Brightness control"] = int(brightness_control)
if delay_time is not None:
state["Delay time"] = int(delay_time)
if sensitivity_en is not None:
state["sensitivity_en"] = int(sensitivity_en)

packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])

return self.fix_state(self._decode(response))
25 changes: 25 additions & 0 deletions cli/broadlink_cli
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ parser.add_argument("--device", help="device definition as 'type host mac'")
parser.add_argument("--type", type=auto_int, default=0x2712, help="type of device")
parser.add_argument("--host", help="host address")
parser.add_argument("--mac", help="mac address (hex reverse), as used by python-broadlink library")
parser.add_argument("--brightness", action="store", help="set lamp brightness")
parser.add_argument("--flmode", action="store", help="set floodlight mode (0, 5, 6, 7)")
parser.add_argument("--flon", action="store_true", help="turn floodlight on")
parser.add_argument("--floff", action="store_true", help="turn floodlight off")
parser.add_argument("--ontime", action="store", help="set floodlight on time (10 - 1800)")
parser.add_argument("--sensitivity", action="store", help="set motion sensitivity (0 - 3)")
parser.add_argument("--brightnesscontrol", action="store", help="set ambient light threshold (5 - 2000)")
parser.add_argument("--getstate", action="store_true", help="get current state")
parser.add_argument("--temperature", action="store_true", help="request temperature from device")
parser.add_argument("--humidity", action="store_true", help="request humidity from device")
parser.add_argument("--energy", action="store_true", help="request energy consumption from device")
Expand Down Expand Up @@ -74,6 +82,23 @@ if args.host or args.device:
if args.joinwifi:
broadlink.setup(args.joinwifi[0], args.joinwifi[1], 4)

if args.flmode:
dev.set_state(mode_switch=args.flmode)
if args.flon:
dev.set_state(lightpwr=1)
if args.floff:
dev.set_state(lightpwr=0)
if args.brightness:
dev.set_state(brightness=args.brightness)
if args.ontime:
dev.set_state(delay_time=args.ontime)
if args.sensitivity:
dev.set_state(sensitivity_en=args.sensitivity)
if args.brightnesscontrol:
dev.set_state(brightness_control=args.brightnesscontrol)
if args.getstate:
print(dev.get_state())

if args.convert:
data = bytearray.fromhex(''.join(args.data))
pulses = data_to_pulses(data)
Expand Down