From 0584ac2579fc3af842673666d83e700e5b639c01 Mon Sep 17 00:00:00 2001 From: Thomas Purchas Date: Sun, 1 Apr 2018 18:22:16 +0100 Subject: [PATCH 1/4] Port demo accessories to asyncio MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Not a complete port, unfortunately some of the dependent libraries don’t support asyncio at the moment. However as Accessories are still running in separate threads at the moment we can ignore that. Additionally subprocess can’t use asyncio until the main thread is converted. --- pyhap/accessories/AM2302.py | 14 ++++++++------ pyhap/accessories/BMP180.py | 7 +++++-- pyhap/accessories/DisplaySwitch.py | 10 ++++++---- pyhap/accessories/TSL2591.py | 7 +++++-- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/pyhap/accessories/AM2302.py b/pyhap/accessories/AM2302.py index 9e3d41c5..0a6a51c2 100644 --- a/pyhap/accessories/AM2302.py +++ b/pyhap/accessories/AM2302.py @@ -4,6 +4,7 @@ The DHT22 module was taken from https://www.raspberrypi.org/forums/viewtopic.php?f=37&t=71336 """ +import asyncio import time import random @@ -19,7 +20,7 @@ class AM2302(Accessory): category = Category.SENSOR def __init__(self, *args, pin=4, **kwargs): - super(AM2302, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.pin = pin self.temp_char = self.get_service("TemperatureSensor")\ @@ -31,14 +32,14 @@ def __init__(self, *args, pin=4, **kwargs): self.sensor = DHT22.sensor(pigpio.pi(), pin) def _set_services(self): - super(AM2302, self)._set_services() + super()._set_services() self.add_service( loader.get_serv_loader().get("TemperatureSensor")) self.add_service( loader.get_serv_loader().get("HumiditySensor")) def __getstate__(self): - state = super(AM2302, self).__getstate__() + state = super().__getstate__() state["sensor"] = None return state @@ -46,10 +47,11 @@ def __setstate__(self, state): self.__dict__.update(state) self.sensor = DHT22.sensor(pigpio.pi(), self.pin) - def run(self): - while not self.run_sentinel.wait(10): + async def run(self): + while not stop_event.is_set(): + await asyncio.sleep(10) self.sensor.trigger() - time.sleep(0.2) + await acyncio.sleep(0.2) t = self.sensor.temperature() h = self.sensor.humidity() self.temp_char.set_value(t) diff --git a/pyhap/accessories/BMP180.py b/pyhap/accessories/BMP180.py index 36a1a3e1..b416e0a0 100644 --- a/pyhap/accessories/BMP180.py +++ b/pyhap/accessories/BMP180.py @@ -3,6 +3,8 @@ # Assume you have a bmp module with BMP180 class with read() method. from sensors.bmp180 import BMP180 as sensor +import asyncio + from pyhap.accessory import Accessory, Category import pyhap.loader as loader @@ -33,7 +35,8 @@ def __setstate__(self, state): self.__dict__.update(state) self.sensor = sensor() - def run(self): - while not self.run_sentinel.wait(30): + async def run(self, stop_event, loop=None): + while not stop_event.is_set(): + await asyncio.sleep(10) temp, _pressure = self.sensor.read() self.temp_char.set_value(temp) diff --git a/pyhap/accessories/DisplaySwitch.py b/pyhap/accessories/DisplaySwitch.py index 37a992fa..e80525f4 100644 --- a/pyhap/accessories/DisplaySwitch.py +++ b/pyhap/accessories/DisplaySwitch.py @@ -1,4 +1,5 @@ # An Accessory for viewing/controlling the status of a Mac display. +import asyncio import subprocess from pyhap.accessory import Accessory, Category @@ -26,19 +27,20 @@ class DisplaySwitch(Accessory): category = Category.SWITCH def __init__(self, *args, **kwargs): - super(DisplaySwitch, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.display = self.get_service("Switch")\ .get_characteristic("On") self.display.setter_callback = self.set_display def _set_services(self): - super(DisplaySwitch, self)._set_services() + super()._set_services() self.add_service( loader.get_serv_loader().get("Switch")) - def run(self): - while not self.run_sentinel.wait(1): + async def run(self, stop_event, loop=None): + while not stop_event.is_set(): + await asyncio.sleep(1) # We can't just use .set_value(state), because that will # trigger our listener. state = get_display_state() diff --git a/pyhap/accessories/TSL2591.py b/pyhap/accessories/TSL2591.py index 43430223..8fbcd057 100644 --- a/pyhap/accessories/TSL2591.py +++ b/pyhap/accessories/TSL2591.py @@ -3,6 +3,8 @@ import tsl2591 +import asyncio + from pyhap.accessory import Accessory, Category import pyhap.loader as loader @@ -32,8 +34,9 @@ def __setstate__(self, state): self.__dict__.update(state) self.tsl = tsl2591.Tsl2591() - def run(self): - while not self.run_sentinel.wait(10): + async def run(self, stop_event, loop=None): + while not stop_event.is_set(): + await asyncio.sleep(10) full, ir = self.tsl.get_full_luminosity() lux = min(max(0.001, self.tsl.calculate_lux(full, ir)), 10000) self.lux_char.set_value(lux) From b5f6ae065a0f73274412c68b701d6bf6f93bb8d2 Mon Sep 17 00:00:00 2001 From: Thomas Purchas Date: Sun, 1 Apr 2018 18:27:02 +0100 Subject: [PATCH 2/4] Fire stop event before task cancel --- pyhap/accessory_driver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py index d7fe14f4..b373b4b7 100755 --- a/pyhap/accessory_driver.py +++ b/pyhap/accessory_driver.py @@ -123,6 +123,7 @@ def run(self): logger.info("Sucessfully stopped accessory event loop.") def stop(self): + self.stop_event.set() self.loop.call_soon_threadsafe(self.task.cancel) class AccessoryDriver(object): From 666c5fe6ec09eba3fdf9273e0b2a509289c3444e Mon Sep 17 00:00:00 2001 From: Thomas Purchas Date: Sun, 1 Apr 2018 23:29:20 +0100 Subject: [PATCH 3/4] Make sure each accessory thread gets its own loop This makes sure that each accessory thread get a complete event loop to run the accessory with. Also ensure that the event loop is shutdown cleanly. --- pyhap/accessory_driver.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py index b373b4b7..1a13f9e8 100755 --- a/pyhap/accessory_driver.py +++ b/pyhap/accessory_driver.py @@ -108,23 +108,35 @@ class AIOThread(threading.Thread): def __init__(self, run_method): """ """ + self.run_method = run_method + super().__init__() + + def run(self): self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.stop_event = asyncio.Event() - self.task = self.loop.create_task(run_method(self.stop_event, self.loop)) - super(AIOThread, self).__init__(target=self.loop.run_until_complete, - args=(self.task,)) + self.task = self.loop.create_task(self.run_method(self.stop_event, self.loop)) - def run(self): + self.loop.run_forever() + self.loop.close() + logger.debug("Sucessfully stopped accessory event loop.") + + async def shutdown(self): + logger.debug("Shutting down accessory event loop") + self.stop_event.set() try: - super(AIOThread, self).run() - except CancelledError: + await asyncio.wait_for(self.task, timeout=5) + except asyncio.TimeoutError: + logger.info("Accessory task did not shutdown within 5 seconds") + finally: self.loop.stop() - self.loop.close() - logger.info("Sucessfully stopped accessory event loop.") + + def safe_shutdown(self): + task = self.loop.create_task(self.shutdown()) def stop(self): - self.stop_event.set() - self.loop.call_soon_threadsafe(self.task.cancel) + self.loop.call_soon_threadsafe(self.safe_shutdown) class AccessoryDriver(object): """ From b6a9b53c3170c4843748f2d413c3159a3d7befaa Mon Sep 17 00:00:00 2001 From: Thomas Purchas Date: Sun, 1 Apr 2018 23:30:04 +0100 Subject: [PATCH 4/4] Move the http bridge to use aiohttp --- pyhap/accessories/Http.py | 112 ++++++++++---------------------------- requirements_all.txt | 3 +- 2 files changed, 32 insertions(+), 83 deletions(-) diff --git a/pyhap/accessories/Http.py b/pyhap/accessories/Http.py index 0977ed74..b9c17655 100644 --- a/pyhap/accessories/Http.py +++ b/pyhap/accessories/Http.py @@ -6,73 +6,52 @@ import threading import logging from http.server import HTTPServer, BaseHTTPRequestHandler +from aiohttp import web from pyhap.accessory import Bridge, Category logger = logging.getLogger(__name__) -class HttpBridgeHandler(BaseHTTPRequestHandler): +class HttpBridgeHandler(web.Application): """Handles requests and passes value updates to an HttpAccessory. The POST request should contain json data with the format: - { "aid": + { "aid": , "services": { : { - : value, + : } } } Example: - { "aid": 2 + { "aid": 2, "services": { - TemperatureSensor" : { + "TemperatureSensor" : { "CurrentTemperature": 20 } } } """ - def __init__(self, http_accessory, sock, client_addr, server): + def __init__(self, http_accessory): """Create a handler that passes updates to the given HttpAccessory. """ + super().__init__() + self.http_accessory = http_accessory - super(HttpBridgeHandler, self).__init__(sock, client_addr, server) + self.add_routes([web.post('/', self.post_handler)]) - def respond_ok(self): - """Reply with code 200 (OK) and close the connection. - """ - self.send_response(200) - self.send_header("Content-Type", "text/html") - self.send_header("Content-Length", 0) - self.end_headers() - self.close_connection = 1 - - def respond_err(self): - """Reply with code 400 and close the connection. - """ - self.send_response(400) - self.send_header("Content-Type", "text/html") - self.send_header("Content-Length", 0) - self.end_headers() - self.close_connection = 1 - - def do_POST(self): - """Read the payload as json and update the state of the accessory. - """ - length = int(self.headers["Content-Length"]) + async def post_handler(self, request): try: - # The below decode is necessary only for python <3.6, because loads prior 3.6 - # doesn't know bytes/bytearray. - content = self.rfile.read(length).decode("utf-8") - data = json.loads(content) + data = await request.json() + await self.http_accessory.update_state(data) except Exception as e: logger.error("Bad POST request; Error was: %s", str(e)) - self.respond_err() - else: - self.http_accessory.update_state(data) - self.respond_ok() + return web.Response(text="Bad POST", status=400) + + return web.Response(text="OK") class HttpBridge(Bridge): @@ -127,7 +106,7 @@ class HttpBridge(Bridge): category = Category.OTHER - def __init__(self, address, *args, **kwargs): + def __init__(self, *args, address, **kwargs): """Initialise and add the given services. @param address: The address-port on which to listen for requests. @@ -135,40 +114,11 @@ def __init__(self, address, *args, **kwargs): @param accessories: """ - super(HttpBridge, self).__init__(*args, **kwargs) - - # For exclusive access to updates. Slight overkill... - self.update_lock = None - self.server_thread = None - self._set_server(address) + super().__init__(*args, **kwargs) - def _set_server(self, address): - """Set up a HTTPServer to listen on the given address. - """ - self.server = HTTPServer(address, lambda *a: HttpBridgeHandler(self, *a)) - self.server_thread = threading.Thread(target=self.server.serve_forever) - self.update_lock = threading.Lock() + self.address = address - def __getstate__(self): - """Return the state of this instance, less the server and server thread. - - Also add the server address. All this is because we cannot pickle such - objects and to allow to recover the server using the address. - """ - state = super(HttpBridge, self).__getstate__() - state["server"] = None - state["server_thread"] = None - state["update_lock"] = None - state["address"] = self.server.server_address - return state - - def __setstate__(self, state): - """Load the state and set up the server with the address in the state. - """ - self.__dict__.update(state) - self._set_server(state["address"]) - - def update_state(self, data): + async def update_state(self, data): """Update the characteristics from the received data. Expected to be called from HapHttpHandler. Updates are thread-safe. @@ -192,19 +142,17 @@ def update_state(self, data): service_obj = accessory.get_service(service) for char, value in char_data.items(): char_obj = service_obj.get_characteristic(char) - with self.update_lock: - char_obj.set_value(value) - - def stop(self): - """Stop the server. - """ - super(HttpBridge, self).stop() - logger.debug("Stopping HTTP bridge server.") - self.server.shutdown() - self.server.server_close() + char_obj.set_value(value) - def run(self): + async def run(self, stop_event, loop=None): """Start the server - can listen for requests. """ logger.debug("Starting HTTP bridge server.") - self.server_thread.start() + app = HttpBridgeHandler(self) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, self.address[0], self.address[1]) + await site.start() + + await stop_event.wait() + await runner.cleanup() diff --git a/requirements_all.txt b/requirements_all.txt index 3e1e072e..cbe5afce 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -4,4 +4,5 @@ ed25519 zeroconf curve25519-donna pyqrcode -base36 \ No newline at end of file +base36 +aiohttp==3.1.1