package org.opentrafficsim.sim0mq.swing;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.naming.NamingException;
import org.djunits.unit.DurationUnit;
import org.djunits.unit.TimeUnit;
import org.djunits.value.vdouble.scalar.Duration;
import org.djunits.value.vdouble.scalar.Time;
import org.djutils.serialization.SerializationException;
import org.junit.Test;
import org.opentrafficsim.draw.core.OTSDrawingException;
import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
import org.sim0mq.Sim0MQException;
import org.sim0mq.message.Sim0MQMessage;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import nl.tudelft.simulation.dsol.SimRuntimeException;
import nl.tudelft.simulation.language.DSOLException;
* Unit tests. This requires half of OTS in the imports because it sets up a simulation and runs that for a couple of seconds.
* Copyright (c) 2020-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See OpenTrafficSim License.
* $LastChangedDate: 2020-02-13 11:08:16 +0100 (Thu, 13 Feb 2020) $, @version $Revision: 6383 $, by $Author: pknoppers $,
* @author Peter Knoppers
public class Sim0MQPublisherTest
* Verify an ACK or a NACK message.
* @param got byte[]; the not-yet-decoded message that is expected to decode into an ACK or a NACK
* @param field5 String; expected content for the message type id field
* @param field6 int; expected content for the message id field
* @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
* @param expectedDescription String; expected String value for the second and last payload field (field 9)
* @throws Sim0MQException when that happens, this test has failed
* @throws SerializationException when that happens this test has failed
public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
final String expectedDescription) throws Sim0MQException, SerializationException
Object[] objects = Sim0MQMessage.decodeToArray(got);
assertEquals("Field 5 of message echos the command", field5, objects[5]);
assertEquals("conversation id (field 6) matches", field6, objects[6]);
assertEquals("Response has 2 field payload", 10, objects.length);
assertTrue("First payload field is a boolean", objects[8] instanceof Boolean);
assertEquals("First payload field has the expected value", expectedValue, objects[8]);
assertTrue("Second (and last) payload field is a String", objects[9] instanceof String);
if (!((String) objects[9]).startsWith(expectedDescription))
fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
+ objects[9] + "\"");
* Wait for an incoming message and verify that it is an ACK or a NACK.
* @param receivedMessages List<byte[]>; the list where incoming messages should appear
* @param maximumSeconds double; maximum time to wait
* @param field5 String; expected content for the message type id field
* @param field6 int; expected content for the message id field
* @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
* @param expectedDescription String; expected String value for the second and last payload field (field 9)
* @throws Sim0MQException when that happens, this test has failed
* @throws SerializationException when that happens this test has failed
* @throws InterruptedException when that happens this test has failed
public void waitAndVerifyAckNack(final List receivedMessages, final double maximumSeconds, final String field5,
final int field6, final Boolean expectedValue, final String expectedDescription)
throws Sim0MQException, SerializationException, InterruptedException
waitForReceivedMessages(receivedMessages, maximumSeconds);
assertEquals("Should have received one message", 1, receivedMessages.size());
verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
* Test code.
* @throws IOException if that happens uncaught; this test has failed
* @throws NamingException if that happens uncaught; this test has failed
* @throws SimRuntimeException if that happens uncaught; this test has failed
* @throws DSOLException if that happens uncaught; this test has failed
* @throws OTSDrawingException if that happens uncaught; this test has failed
* @throws SerializationException if that happens uncaught; this test has failed
* @throws Sim0MQException if that happens uncaught; this test has failed
* @throws InterruptedException if that happens uncaught; this test has failed
* @throws URISyntaxException if network.xml file not found
public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DSOLException,
OTSDrawingException, Sim0MQException, SerializationException, InterruptedException, URISyntaxException
ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/network.xml").toURI())));
List receivedMessages = Collections.synchronizedList(new ArrayList<>());
List synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
PublisherThread publisherThread = new PublisherThread(zContext);
ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
int conversationId = 100; // Number the commands starting with something that is very different from 0
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
"No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
// Discover what services and commands are available
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
waitForReceivedMessages(receivedMessages, 1);
assertEquals("Should have received one message", 1, receivedMessages.size());
Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
assertTrue("message decodes into more than 8 fields", commands.length > 8);
for (int index = 8; index < commands.length; index++)
assertTrue("A service is identified by a String", commands[index] instanceof String);
String service = (String) commands[index];
System.out.println("Service " + service);
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
waitForReceivedMessages(receivedMessages, 1.0);
if (receivedMessages.size() > 0)
Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
assertTrue("result of GET_COMMANDS should be at least 8 long", result.length >= 8);
for (int i = 8; i < result.length; i++)
String command = (String) result[i];
// System.out.println("trying command " + service + "|" + command);
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
waitForReceivedMessages(receivedMessages, 1.0);
if (receivedMessages.size() > 0)
for (int ii = 8; ii < receivedMessages.size(); ii++)
System.out.println("Received no reply");
System.out.print(""); // Good for a breakpoint
System.out.println("Received no reply to GET_COMMANDS request");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
++conversationId, "2", "BAD")); // Too many fields
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
// Too few fields
waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
"Bad address: Address for GTU Id has wrong length");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
++conversationId, "NON EXISTING GTU ID")); // GTU id is not (currently) in use
waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
int conversationIdForSubscribeToAdd = ++conversationId; // We need that to unsubscribe later
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
"GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
"Subscription created");
int conversationIdForGTU2Move = ++conversationId;
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(20, TimeUnit.BASE_SECOND) }));
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
// unsubscribe from GTU ADD events using the previously saved conversationId
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
"GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
"Subscription removed");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
"GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(30, TimeUnit.BASE_SECOND) }));
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
"GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(60, TimeUnit.BASE_SECOND) }));
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(70, TimeUnit.BASE_SECOND) }));
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
"Simulation is already at end of simulation time");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
System.out.println("Master joining publisher thread (this should block until publisher has died)");
System.out.println("Master has joined publisher thread");
System.out.println("Master interrupts read message thread");
System.out.println("Master has interrupted read message thread; joining ...");
System.out.println("Master has joined read message thread");
System.out.println("Master exits");
* Wait for incoming messages up to one that has a specified conversation id, or until 1000 times time out.
* @param receivedMessages List<byte[]>; the list to monitor
* @param maximumSeconds double; how long to wait (in seconds)
* @param conversationId int; the conversation id to wait for
* @throws Sim0MQException when that happens, this test has failed
* @throws SerializationException when that happens, this test has failed
* @throws InterruptedException when that happens, this test has failed
public void waitAndEatMessagesUntilConversationId(final List receivedMessages, final double maximumSeconds,
final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
for (int attempt = 0; attempt < 1000; attempt++)
waitForReceivedMessages(receivedMessages, 1.0);
// System.out.println("attempt = " + attempt + " received " + receivedMessages.size() + " message(s)");
while (receivedMessages.size() > 1)
if (receivedMessages.size() == 1)
Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
if (objects[6].equals(conversationId))
* Sleep up to 1 second waiting for at least one message to be received.
* @param receivedMessages List<byte[]>; the list to monitor
* @param maximumSeconds double; how long to wait (in seconds)
* @throws InterruptedException when that happens uncaught; this test has failed
static void waitForReceivedMessages(final List receivedMessages, final double maximumSeconds)
throws InterruptedException
double timeWaited = 0;
while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
timeWaited += 0.01;
* Wrapper for ZMQ.Socket.send that may output some debugging information.
* @param socket ZMQ.Socket; the socket to send onto
* @param message byte[]; the message to transmit
static void sendCommand(final ZMQ.Socket socket, final byte[] message)
// try
// {
// Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
// System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
// }
// catch (Sim0MQException | SerializationException e)
// {
// e.printStackTrace();
// }
* Repeatedly try to read all available messages.
static class ReadMessageThread extends Thread
/** The ZContext needed to create the socket. */
private final ZContext zContext;
/** Storage for the received messages. */
private final List storage;
* Repeatedly read all available messages.
* @param zContext ZContext; the ZContext needed to create the read socket
* @param storage List<String>; storage for the received messages
ReadMessageThread(final ZContext zContext, final List storage)
this.zContext = zContext; = storage;
public void run()
System.out.println("Read message thread starting up");
ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
while (!Thread.interrupted())
byte[][] all = readMessages(socket);
for (byte[] one : all)
System.out.println("Read message thread exits due to interrupt");
* Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
* @param socket ZMQ.Socket; the socket
* @return byte[][]; the read messages
public static byte[][] readMessages(final ZMQ.Socket socket)
List resultList = new ArrayList<>();
while (true)
byte[] message = socket.recv();
StringBuilder output = new StringBuilder();
if (null != message)
output.append("Master received " + message.length + " byte message: ");
// System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
Object[] fields = Sim0MQMessage.decodeToArray(message);
for (Object field : fields)
output.append("/" + field);
catch (Sim0MQException | SerializationException e)
if (resultList.size() > 0)
"Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
return resultList.toArray(new byte[resultList.size()][]);
* Thread that runs a PublisherExperiment.
static class PublisherThread extends Thread
/** Passed onto the constructor of PublisherExperimentUsingSockets. */
private final ZContext zContext;
* Construct a new PublisherThread.
* @param zContext ZContext; needed to construct the PublisherExperimentUsingSockets
PublisherThread(final ZContext zContext)
this.zContext = zContext;
* Construct a new PublisherThread.
this.zContext = new ZContext(5);
public void run()
new Sim0MQPublisher(this.zContext, "publisherControl", "publisherOutput");
System.out.println("Publisher thread exits");
/** The test network. */
private static String networkXML;