Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 24 additions & 26 deletions msgq/tests/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,33 @@ def test_conflate(self):
sock = random_sock()
pub_sock = msgq.pub_sock(sock)
for conflate in [True, False]:
for _ in range(10):
num_msgs = random.randint(3, 10)
sub_sock = msgq.sub_sock(sock, conflate=conflate, timeout=None)
zmq_sleep()
num_msgs = random.randint(3, 10)
sub_sock = msgq.sub_sock(sock, conflate=conflate, timeout=None)
zmq_sleep()

sent_msgs = []
for __ in range(num_msgs):
msg = random_bytes()
pub_sock.send(msg)
sent_msgs.append(msg)
time.sleep(0.1)
recvd_msgs = msgq.drain_sock_raw(sub_sock)
if conflate:
assert len(recvd_msgs) == 1
assert recvd_msgs[0] == sent_msgs[-1]
else:
assert len(recvd_msgs) == len(sent_msgs)
for rec_msg, sent_msg in zip(recvd_msgs, sent_msgs):
assert rec_msg == sent_msg
sent_msgs = []
for __ in range(num_msgs):
msg = random_bytes()
pub_sock.send(msg)
sent_msgs.append(msg)
time.sleep(0.1)
recvd_msgs = msgq.drain_sock_raw(sub_sock)
if conflate:
assert len(recvd_msgs) == 1
assert recvd_msgs[0] == sent_msgs[-1]
else:
assert len(recvd_msgs) == len(sent_msgs)
for rec_msg, sent_msg in zip(recvd_msgs, sent_msgs):
assert rec_msg == sent_msg

@pytest.mark.flaky(retries=3, delay=1)
def test_receive_timeout(self):
sock = random_sock()
for _ in range(5):
timeout = random.randrange(200)
sub_sock = msgq.sub_sock(sock, timeout=timeout)
zmq_sleep()
timeout = random.randrange(200)
sub_sock = msgq.sub_sock(sock, timeout=timeout)
zmq_sleep()

start_time = time.monotonic()
recvd = sub_sock.receive()
assert (time.monotonic() - start_time) < (timeout + 0.1)
assert recvd is None
start_time = time.monotonic()
recvd = sub_sock.receive()
assert (time.monotonic() - start_time) < (timeout + 0.1)
assert recvd is None
Loading