Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion msgq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# must be built with scons
from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event, \
context_is_zmq
from msgq.ipc_pyx import MultiplePublishersError, IpcError

from typing import Optional, List, Union
Expand All @@ -12,6 +13,7 @@
assert get_fake_prefix
assert delete_fake_prefix
assert wait_for_one_event
assert context_is_zmq

NO_TRAVERSAL_LIMIT = 2**64-1

Expand Down
14 changes: 14 additions & 0 deletions msgq/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os
import pytest
import msgq

@pytest.fixture(params=[False, True], ids=["msgq", "zmq"], autouse=True)
def zmq_mode(request):
if request.param:
os.environ["ZMQ"] = "1"
else:
os.environ.pop("ZMQ", None)
msgq.context = msgq.Context()
assert msgq.context_is_zmq() == request.param
yield request.param
os.environ.pop("ZMQ", None)
2 changes: 2 additions & 0 deletions msgq/ipc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ cdef extern from "msgq/impl_fake.h":


cdef extern from "msgq/ipc.h":
bool messaging_use_zmq()

cdef cppclass Context:
@staticmethod
Context * create()
Expand Down
5 changes: 5 additions & 0 deletions msgq/ipc_pyx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ from libc.string cimport strerror
from cython.operator import dereference


from .ipc cimport messaging_use_zmq
from .ipc cimport Context as cppContext
from .ipc cimport SubSocket as cppSubSocket
from .ipc cimport PubSocket as cppPubSocket
Expand All @@ -18,6 +19,10 @@ from .ipc cimport Message as cppMessage
from .ipc cimport Event as cppEvent, SocketEventHandle as cppSocketEventHandle


def context_is_zmq():
return messaging_use_zmq()


class IpcError(Exception):
def __init__(self, endpoint=None):
suffix = f"with {endpoint.decode('utf-8')}" if endpoint else ""
Expand Down
26 changes: 24 additions & 2 deletions msgq/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,18 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
}

int ms = (timeout == -1) ? 100 : timeout;

#ifdef __APPLE__
// On macOS, signals can't interrupt nanosleep, so poll more frequently
int poll_ms = std::min(ms, 10);
int remaining_ms = ms;
#else
int poll_ms = ms;
#endif

struct timespec ts;
ts.tv_sec = ms / 1000;
ts.tv_nsec = (ms % 1000) * 1000 * 1000;
ts.tv_sec = poll_ms / 1000;
ts.tv_nsec = (poll_ms % 1000) * 1000 * 1000;


while (num == 0) {
Expand All @@ -464,10 +473,23 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
}
}

#ifdef __APPLE__
// exit if we had a timeout and we've exhausted it
if (timeout != -1 && ret == 0){
remaining_ms -= poll_ms;
if (remaining_ms <= 0){
break;
}
poll_ms = std::min(remaining_ms, 10);
ts.tv_sec = poll_ms / 1000;
ts.tv_nsec = (poll_ms % 1000) * 1000 * 1000;
}
#else
// exit if we had a timeout and the sleep finished
if (timeout != -1 && ret == 0){
break;
}
#endif
}

return num;
Expand Down
3 changes: 2 additions & 1 deletion msgq/tests/test_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ def test_wait_zero_timeout(self):


@pytest.mark.skipif(condition=platform.system() == "Darwin", reason="FakeSockets not supported on macOS")
@pytest.mark.skipif(condition="ZMQ" in os.environ, reason="FakeSockets not supported on ZMQ")
@parameterized_class([{"prefix": None}, {"prefix": "test"}])
class TestFakeSockets:
prefix: Optional[str] = None

def setup_method(self):
if "ZMQ" in os.environ:
pytest.skip("FakeSockets not supported on ZMQ")
msgq.toggle_fake_events(True)
if self.prefix is not None:
msgq.set_fake_prefix(self.prefix)
Expand Down
4 changes: 2 additions & 2 deletions msgq/tests/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ def test_conflate(self):
@pytest.mark.flaky(retries=3, delay=1)
def test_receive_timeout(self):
sock = random_sock()
for _ in range(10):
for _ in range(5):
timeout = random.randrange(200)
sub_sock = msgq.sub_sock(sock, timeout=timeout)
zmq_sleep()

start_time = time.monotonic()
recvd = sub_sock.receive()
assert (time.monotonic() - start_time) < 0.2
assert (time.monotonic() - start_time) < (timeout + 0.1)
assert recvd is None
2 changes: 0 additions & 2 deletions setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ PLATFORM=$(uname -s)

echo "installing dependencies"
if [[ $PLATFORM == "Darwin" ]]; then
export ZMQ=1

export HOMEBREW_NO_AUTO_UPDATE=1
brew install python3 zeromq
elif [[ $PLATFORM == "Linux" ]]; then
Expand Down
Loading