diff --git a/msgq/__init__.py b/msgq/__init__.py index 574e100a8..b47b3db12 100644 --- a/msgq/__init__.py +++ b/msgq/__init__.py @@ -1,6 +1,7 @@ # must be built with scons from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ - set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event + set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event, \ + context_is_zmq from msgq.ipc_pyx import MultiplePublishersError, IpcError from typing import Optional, List, Union @@ -12,6 +13,7 @@ assert get_fake_prefix assert delete_fake_prefix assert wait_for_one_event +assert context_is_zmq NO_TRAVERSAL_LIMIT = 2**64-1 diff --git a/msgq/conftest.py b/msgq/conftest.py new file mode 100644 index 000000000..bfd77925c --- /dev/null +++ b/msgq/conftest.py @@ -0,0 +1,14 @@ +import os +import pytest +import msgq + +@pytest.fixture(params=[False, True], ids=["msgq", "zmq"], autouse=True) +def zmq_mode(request): + if request.param: + os.environ["ZMQ"] = "1" + else: + os.environ.pop("ZMQ", None) + msgq.context = msgq.Context() + assert msgq.context_is_zmq() == request.param + yield request.param + os.environ.pop("ZMQ", None) diff --git a/msgq/ipc.pxd b/msgq/ipc.pxd index 0760561d2..171ddf6b6 100644 --- a/msgq/ipc.pxd +++ b/msgq/ipc.pxd @@ -35,6 +35,8 @@ cdef extern from "msgq/impl_fake.h": cdef extern from "msgq/ipc.h": + bool messaging_use_zmq() + cdef cppclass Context: @staticmethod Context * create() diff --git a/msgq/ipc_pyx.pyx b/msgq/ipc_pyx.pyx index 909262f4c..128f352ef 100644 --- a/msgq/ipc_pyx.pyx +++ b/msgq/ipc_pyx.pyx @@ -10,6 +10,7 @@ from libc.string cimport strerror from cython.operator import dereference +from .ipc cimport messaging_use_zmq from .ipc cimport Context as cppContext from .ipc cimport SubSocket as cppSubSocket from .ipc cimport PubSocket as cppPubSocket @@ -18,6 +19,10 @@ from .ipc cimport Message as cppMessage from .ipc cimport Event as cppEvent, SocketEventHandle as cppSocketEventHandle +def context_is_zmq(): + return messaging_use_zmq() + + class IpcError(Exception): def __init__(self, endpoint=None): suffix = f"with {endpoint.decode('utf-8')}" if endpoint else "" diff --git a/msgq/msgq.cc b/msgq/msgq.cc index 92f0ec1df..5bafc7055 100644 --- a/msgq/msgq.cc +++ b/msgq/msgq.cc @@ -446,9 +446,18 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ } int ms = (timeout == -1) ? 100 : timeout; + +#ifdef __APPLE__ + // On macOS, signals can't interrupt nanosleep, so poll more frequently + int poll_ms = std::min(ms, 10); + int remaining_ms = ms; +#else + int poll_ms = ms; +#endif + struct timespec ts; - ts.tv_sec = ms / 1000; - ts.tv_nsec = (ms % 1000) * 1000 * 1000; + ts.tv_sec = poll_ms / 1000; + ts.tv_nsec = (poll_ms % 1000) * 1000 * 1000; while (num == 0) { @@ -464,10 +473,23 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ } } +#ifdef __APPLE__ + // exit if we had a timeout and we've exhausted it + if (timeout != -1 && ret == 0){ + remaining_ms -= poll_ms; + if (remaining_ms <= 0){ + break; + } + poll_ms = std::min(remaining_ms, 10); + ts.tv_sec = poll_ms / 1000; + ts.tv_nsec = (poll_ms % 1000) * 1000 * 1000; + } +#else // exit if we had a timeout and the sleep finished if (timeout != -1 && ret == 0){ break; } +#endif } return num; diff --git a/msgq/tests/test_fake.py b/msgq/tests/test_fake.py index d2c51313c..4513973b2 100644 --- a/msgq/tests/test_fake.py +++ b/msgq/tests/test_fake.py @@ -64,12 +64,13 @@ def test_wait_zero_timeout(self): @pytest.mark.skipif(condition=platform.system() == "Darwin", reason="FakeSockets not supported on macOS") -@pytest.mark.skipif(condition="ZMQ" in os.environ, reason="FakeSockets not supported on ZMQ") @parameterized_class([{"prefix": None}, {"prefix": "test"}]) class TestFakeSockets: prefix: Optional[str] = None def setup_method(self): + if "ZMQ" in os.environ: + pytest.skip("FakeSockets not supported on ZMQ") msgq.toggle_fake_events(True) if self.prefix is not None: msgq.set_fake_prefix(self.prefix) diff --git a/msgq/tests/test_messaging.py b/msgq/tests/test_messaging.py index 8f9f65b2b..7c61cc5ed 100644 --- a/msgq/tests/test_messaging.py +++ b/msgq/tests/test_messaging.py @@ -61,12 +61,12 @@ def test_conflate(self): @pytest.mark.flaky(retries=3, delay=1) def test_receive_timeout(self): sock = random_sock() - for _ in range(10): + for _ in range(5): timeout = random.randrange(200) sub_sock = msgq.sub_sock(sock, timeout=timeout) zmq_sleep() start_time = time.monotonic() recvd = sub_sock.receive() - assert (time.monotonic() - start_time) < 0.2 + assert (time.monotonic() - start_time) < (timeout + 0.1) assert recvd is None diff --git a/setup.sh b/setup.sh index 8d0611628..abba95d05 100755 --- a/setup.sh +++ b/setup.sh @@ -8,8 +8,6 @@ PLATFORM=$(uname -s) echo "installing dependencies" if [[ $PLATFORM == "Darwin" ]]; then - export ZMQ=1 - export HOMEBREW_NO_AUTO_UPDATE=1 brew install python3 zeromq elif [[ $PLATFORM == "Linux" ]]; then