package org.opentrafficsim.sim0mq.swing;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.naming.NamingException;
import org.djunits.unit.DurationUnit;
import org.djunits.unit.TimeUnit;
import org.djunits.value.vdouble.scalar.Duration;
import org.djunits.value.vdouble.scalar.Time;
import org.djutils.io.URLResource;
import org.djutils.serialization.SerializationException;
import org.junit.Test;
import org.opentrafficsim.draw.core.OTSDrawingException;
import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
import org.sim0mq.Sim0MQException;
import org.sim0mq.message.Sim0MQMessage;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import nl.tudelft.simulation.dsol.SimRuntimeException;
import nl.tudelft.simulation.language.DSOLException;
/**
* Unit tests. This requires half of OTS in the imports because it sets up a simulation and runs that for a couple of seconds.
*
* Copyright (c) 2020-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See OpenTrafficSim License.
*
* $LastChangedDate: 2020-02-13 11:08:16 +0100 (Thu, 13 Feb 2020) $, @version $Revision: 6383 $, by $Author: pknoppers $,
* @author Peter Knoppers
*/
public class Sim0MQPublisherTest
{
/**
* Verify an ACK or a NACK message.
* @param got byte[]; the not-yet-decoded message that is expected to decode into an ACK or a NACK
* @param field5 String; expected content for the message type id field
* @param field6 int; expected content for the message id field
* @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
* @param expectedDescription String; expected String value for the second and last payload field (field 9)
* @throws Sim0MQException when that happens, this test has failed
* @throws SerializationException when that happens this test has failed
*/
public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
final String expectedDescription) throws Sim0MQException, SerializationException
{
Object[] objects = Sim0MQMessage.decodeToArray(got);
assertEquals("Field 5 of message echos the command", field5, objects[5]);
assertEquals("conversation id (field 6) matches", field6, objects[6]);
assertEquals("Response has 2 field payload", 10, objects.length);
assertTrue("First payload field is a boolean", objects[8] instanceof Boolean);
assertEquals("First payload field has the expected value", expectedValue, objects[8]);
assertTrue("Second (and last) payload field is a String", objects[9] instanceof String);
if (!((String) objects[9]).startsWith(expectedDescription))
{
fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
+ objects[9] + "\"");
}
}
/**
* Wait for an incoming message and verify that it is an ACK or a NACK.
* @param receivedMessages List<byte[]>; the list where incoming messages should appear
* @param maximumSeconds double; maximum time to wait
* @param field5 String; expected content for the message type id field
* @param field6 int; expected content for the message id field
* @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
* @param expectedDescription String; expected String value for the second and last payload field (field 9)
* @throws Sim0MQException when that happens, this test has failed
* @throws SerializationException when that happens this test has failed
* @throws InterruptedException when that happens this test has failed
*/
public void waitAndVerifyAckNack(final List receivedMessages, final double maximumSeconds, final String field5,
final int field6, final Boolean expectedValue, final String expectedDescription)
throws Sim0MQException, SerializationException, InterruptedException
{
waitForReceivedMessages(receivedMessages, maximumSeconds);
assertEquals("Should have received one message", 1, receivedMessages.size());
verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
receivedMessages.clear();
}
/**
* Test code.
* @throws IOException if that happens uncaught; this test has failed
* @throws NamingException if that happens uncaught; this test has failed
* @throws SimRuntimeException if that happens uncaught; this test has failed
* @throws DSOLException if that happens uncaught; this test has failed
* @throws OTSDrawingException if that happens uncaught; this test has failed
* @throws SerializationException if that happens uncaught; this test has failed
* @throws Sim0MQException if that happens uncaught; this test has failed
* @throws InterruptedException if that happens uncaught; this test has failed
* @throws URISyntaxException if network.xml file not found
*/
@Test
public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DSOLException,
OTSDrawingException, Sim0MQException, SerializationException, InterruptedException, URISyntaxException
{
ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/network.xml").toURI())));
List receivedMessages = Collections.synchronizedList(new ArrayList<>());
List synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
readMessageThread.start();
PublisherThread publisherThread = new PublisherThread(zContext);
publisherThread.start();
ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
publisherControlSocket.connect("inproc://publisherControl");
int conversationId = 100; // Number the commands starting with something that is very different from 0
String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
"No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
// Discover what services and commands are available
sendCommand(publisherControlSocket,
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
waitForReceivedMessages(receivedMessages, 1);
assertEquals("Should have received one message", 1, receivedMessages.size());
Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
assertTrue("message decodes into more than 8 fields", commands.length > 8);
for (int index = 8; index < commands.length; index++)
{
receivedMessages.clear();
assertTrue("A service is identified by a String", commands[index] instanceof String);
String service = (String) commands[index];
System.out.println("Service " + service);
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
waitForReceivedMessages(receivedMessages, 1.0);
if (receivedMessages.size() > 0)
{
Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
assertTrue("result of GET_COMMANDS should be at least 8 long", result.length >= 8);
for (int i = 8; i < result.length; i++)
{
String command = (String) result[i];
receivedMessages.clear();
// System.out.println("trying command " + service + "|" + command);
sendCommand(publisherControlSocket,
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
waitForReceivedMessages(receivedMessages, 1.0);
if (receivedMessages.size() > 0)
{
for (int ii = 8; ii < receivedMessages.size(); ii++)
{
System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
}
}
else
{
System.out.println("Received no reply");
}
System.out.print(""); // Good for a breakpoint
}
}
else
{
System.out.println("Received no reply to GET_COMMANDS request");
}
}
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
++conversationId, "2", "BAD")); // Too many fields
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
sendCommand(publisherControlSocket,
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
// Too few fields
waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
"Bad address: Address for GTU Id has wrong length");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
++conversationId, "NON EXISTING GTU ID")); // GTU id is not (currently) in use
waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
sendCommand(publisherControlSocket,
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
sendCommand(publisherControlSocket,
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
sendCommand(publisherControlSocket,
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
int conversationIdForSubscribeToAdd = ++conversationId; // We need that to unsubscribe later
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
"GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
"Subscription created");
int conversationIdForGTU2Move = ++conversationId;
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(20, TimeUnit.BASE_SECOND) }));
sendCommand(publisherControlSocket,
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
// unsubscribe from GTU ADD events using the previously saved conversationId
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
"GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
"Subscription removed");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
"GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(30, TimeUnit.BASE_SECOND) }));
sendCommand(publisherControlSocket,
Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
"GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(60, TimeUnit.BASE_SECOND) }));
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
++conversationId, new Object[] { new Time(70, TimeUnit.BASE_SECOND) }));
waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
"Simulation is already at end of simulation time");
sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
System.out.println("Master joining publisher thread (this should block until publisher has died)");
publisherThread.join();
System.out.println("Master has joined publisher thread");
System.out.println("Master interrupts read message thread");
readMessageThread.interrupt();
System.out.println("Master has interrupted read message thread; joining ...");
readMessageThread.join();
System.out.println("Master has joined read message thread");
System.out.println("Master exits");
}
/**
* Wait for incoming messages up to one that has a specified conversation id, or until 1000 times time out.
* @param receivedMessages List<byte[]>; the list to monitor
* @param maximumSeconds double; how long to wait (in seconds)
* @param conversationId int; the conversation id to wait for
* @throws Sim0MQException when that happens, this test has failed
* @throws SerializationException when that happens, this test has failed
* @throws InterruptedException when that happens, this test has failed
*/
public void waitAndEatMessagesUntilConversationId(final List receivedMessages, final double maximumSeconds,
final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
{
for (int attempt = 0; attempt < 1000; attempt++)
{
waitForReceivedMessages(receivedMessages, 1.0);
// System.out.println("attempt = " + attempt + " received " + receivedMessages.size() + " message(s)");
while (receivedMessages.size() > 1)
{
receivedMessages.remove(0);
}
if (receivedMessages.size() == 1)
{
Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
if (objects[6].equals(conversationId))
{
break;
}
receivedMessages.remove(0);
}
}
}
/**
* Sleep up to 1 second waiting for at least one message to be received.
* @param receivedMessages List<byte[]>; the list to monitor
* @param maximumSeconds double; how long to wait (in seconds)
* @throws InterruptedException when that happens uncaught; this test has failed
*/
static void waitForReceivedMessages(final List receivedMessages, final double maximumSeconds)
throws InterruptedException
{
double timeWaited = 0;
while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
{
Thread.sleep(10);
timeWaited += 0.01;
}
}
/**
* Wrapper for ZMQ.Socket.send that may output some debugging information.
* @param socket ZMQ.Socket; the socket to send onto
* @param message byte[]; the message to transmit
*/
static void sendCommand(final ZMQ.Socket socket, final byte[] message)
{
// try
// {
// Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
// System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
// }
// catch (Sim0MQException | SerializationException e)
// {
// e.printStackTrace();
// }
socket.send(message);
}
/**
* Repeatedly try to read all available messages.
*/
static class ReadMessageThread extends Thread
{
/** The ZContext needed to create the socket. */
private final ZContext zContext;
/** Storage for the received messages. */
private final List storage;
/**
* Repeatedly read all available messages.
* @param zContext ZContext; the ZContext needed to create the read socket
* @param storage List<String>; storage for the received messages
*/
ReadMessageThread(final ZContext zContext, final List storage)
{
this.zContext = zContext;
this.storage = storage;
}
@Override
public void run()
{
System.out.println("Read message thread starting up");
ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
socket.setReceiveTimeOut(100);
socket.bind("inproc://publisherOutput");
while (!Thread.interrupted())
{
byte[][] all = readMessages(socket);
for (byte[] one : all)
{
this.storage.add(one);
}
}
System.out.println("Read message thread exits due to interrupt");
}
}
/**
* Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
* @param socket ZMQ.Socket; the socket
* @return byte[][]; the read messages
*/
public static byte[][] readMessages(final ZMQ.Socket socket)
{
List resultList = new ArrayList<>();
while (true)
{
byte[] message = socket.recv();
StringBuilder output = new StringBuilder();
if (null != message)
{
output.append("Master received " + message.length + " byte message: ");
// System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
try
{
Object[] fields = Sim0MQMessage.decodeToArray(message);
for (Object field : fields)
{
output.append("/" + field);
}
output.append("/");
}
catch (Sim0MQException | SerializationException e)
{
e.printStackTrace();
}
System.out.println(output);
resultList.add(message);
}
else
{
if (resultList.size() > 0)
{
System.out.println(
"Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
}
break;
}
}
return resultList.toArray(new byte[resultList.size()][]);
}
/**
* Thread that runs a PublisherExperiment.
*/
static class PublisherThread extends Thread
{
/** Passed onto the constructor of PublisherExperimentUsingSockets. */
private final ZContext zContext;
/**
* Construct a new PublisherThread.
* @param zContext ZContext; needed to construct the PublisherExperimentUsingSockets
*/
PublisherThread(final ZContext zContext)
{
this.zContext = zContext;
}
/**
* Construct a new PublisherThread.
*/
PublisherThread()
{
this.zContext = new ZContext(5);
}
@Override
public void run()
{
new Sim0MQPublisher(this.zContext, "publisherControl", "publisherOutput");
System.out.println("Publisher thread exits");
}
}
/** The test network. */
private static String networkXML;
}