package org.opentrafficsim.sim0mq.swing;
import java.awt.BorderLayout;
import java.awt.Container;
import java.awt.Dimension;
import java.awt.Frame;
import java.awt.event.ActionEvent;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.xml.bind.JAXBException;
import javax.xml.parsers.ParserConfigurationException;
import org.djunits.value.vdouble.scalar.Duration;
import org.djunits.value.vdouble.scalar.Length;
import org.djunits.value.vdouble.scalar.Time;
import org.djutils.decoderdumper.HexDumper;
import org.djutils.immutablecollections.ImmutableMap;
import org.djutils.serialization.SerializationException;
import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
import org.opentrafficsim.core.dsol.AbstractOTSModel;
import org.opentrafficsim.core.dsol.OTSAnimator;
import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
import org.opentrafficsim.core.geometry.OTSGeometryException;
import org.opentrafficsim.core.gtu.GTUException;
import org.opentrafficsim.core.gtu.GTUType;
import org.opentrafficsim.core.network.NetworkException;
import org.opentrafficsim.core.network.OTSNetwork;
import org.opentrafficsim.core.object.InvisibleObjectInterface;
import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
import org.opentrafficsim.road.network.OTSRoadNetwork;
import org.opentrafficsim.road.network.factory.xml.XmlParserException;
import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
import org.opentrafficsim.sim0mq.publisher.Publisher;
import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
import org.opentrafficsim.swing.gui.OTSAnimationPanel;
import org.opentrafficsim.swing.gui.OTSSimulationApplication;
import org.opentrafficsim.swing.gui.OTSSwingApplication;
import org.opentrafficsim.trafficcontrol.TrafficControlException;
import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
import org.sim0mq.Sim0MQException;
import org.sim0mq.message.Sim0MQMessage;
import org.xml.sax.SAXException;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import nl.tudelft.simulation.dsol.SimRuntimeException;
import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
import nl.tudelft.simulation.jstats.streams.MersenneTwister;
import nl.tudelft.simulation.jstats.streams.StreamInterface;
/**
* Sim0MQPublisher - make many OTS simulation controls and observations available over Sim0MQ.
*
* Copyright (c) 2020-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See OpenTrafficSim License.
*
* @author Alexander Verbraeck
* @author Peter Knoppers
*/
public final class Sim0MQPublisher
{
/** The publisher. */
private Publisher publisher = null;
/** The ZContect. */
private final ZContext zContext;
/** The simulation model. */
private Sim0MQOTSModel model = null;
/** The OTS road network. */
private OTSRoadNetwork network = null;
/** The OTS animation panel. */
private OTSAnimationPanel animationPanel = null;
/**
* Create a new Sim0MQPublisher that is operated through //inproc sockets.
* @param zContext ZContext; needed to create the sockets
* @param controlInput String; PULL socket for control input
* @param resultOutput String; PUSH socket to output results
*/
public Sim0MQPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
{
this.zContext = zContext;
ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
controlSocket.bind("inproc://" + controlInput);
ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
resultOutputQueue.connect("inproc://" + resultOutput);
pollingLoop(controlSocket, resultOutputQueue);
}
/**
* Create a new Sim0MQPublisher that uses TCP transport.
* @param port int; port number to bind to
*/
public Sim0MQPublisher(final int port)
{
this.zContext = new ZContext(5);
ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
socket.bind("tcp://*:" + port);
pollingLoop(socket, socket);
}
/**
* Poller that receives the commands and ensures that various output sources can talk to the master.
* @param controlSocket ZMQ.Socket; PULL socket for commands from the master
* @param resultOutputQueue ZMQ.Socket; PULL socket for output that must be relayed to the master
*/
private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
{
System.out
.println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
resultOutputQueue.setHWM(100000);
AtomicInteger packetsSent = new AtomicInteger(0);
Map socketMap = new HashMap<>();
ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
resultInputQueue.bind("inproc://simulationEvents");
// Poll the two input sockets using ZMQ poller
ZMQ.Poller poller = this.zContext.createPoller(2);
// TODO ensure that this also handles a closed control socket gracefully
poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
poller.register(controlSocket, ZMQ.Poller.POLLIN);
while (!Thread.currentThread().isInterrupted())
{
// System.out.println("Publisher calls Poller.poll()");
poller.poll();
if (poller.pollin(0))
{
byte[] data = resultInputQueue.recv();
// System.out.println("Publisher got outgoing result of " + data.length + " bytes");
byte[] fixedData = data;
int number = -1;
try
{
// Patch the sender field to include the packet counter value - this is bloody expensive...
Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
number = packetsSent.addAndGet(1);
fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
messageFields[4], messageFields[5], messageFields[6], newMessageFields);
// System.out
// .println("Prepared message " + number + ", type is \"" + messageFields[5] + "\", " + messageFields[6]);
}
catch (Sim0MQException | SerializationException e)
{
e.printStackTrace();
}
resultOutputQueue.send(fixedData, 0);
// System.out.println("Outgoing result handed over to controlSocket");
continue; // Check for more results before checking the control input
}
if (poller.pollin(1))
{
byte[] data = controlSocket.recv();
// System.out.println("Publisher received a command of " + data.length + " bytes");
if (!handleCommand(data, socketMap))
{
break;
}
}
}
System.out.println("Exiting publisher polling loop");
}
/**
* Construct an OTS simulation experiment from an XML description.
* @param xml String; the XML encoded network
* @param simulationDuration Duration; total duration of the simulation
* @param warmupTime Duration; warm up time of the simulation
* @param seed Long; seed for the experiment
* @return String; null on success, description of the problem on error
*/
private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
{
try
{
OTSAnimator animator = new OTSAnimator("OTS Animator");
this.network = new OTSRoadNetwork("OTS model for Sim0MQPublisher", true, animator);
this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, xml);
Map map = new LinkedHashMap<>();
map.put("generation", new MersenneTwister(seed));
animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
this.publisher = new Publisher(this.network);
this.animationPanel = new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
animator, this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
new OTSSimulationApplication(this.model, this.animationPanel);
DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
frame.setExtendedState(Frame.NORMAL);
frame.setSize(new Dimension(1100, 1000));
frame.setBounds(0, 25, 1100, 1000);
animator.setSpeedFactor(Double.MAX_VALUE, true);
animator.setSpeedFactor(1000.0, true);
ImmutableMap invisibleObjectMap = this.model.getNetwork().getInvisibleObjectMap();
for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
{
if (ioi instanceof TrafCOD)
{
TrafCOD trafCOD = (TrafCOD) ioi;
Container controllerDisplayPanel = trafCOD.getDisplayContainer();
if (null != controllerDisplayPanel)
{
JPanel wrapper = new JPanel(new BorderLayout());
wrapper.add(new JScrollPane(controllerDisplayPanel));
TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
}
}
}
try
{
Thread.sleep(300);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
}
catch (Exception e)
{
return e.getMessage();
}
return null;
}
/**
* Execute one remote control command.
* @param data byte[]; the SIM0MQ encoded command
* @param socketMap Map<Long, ZMQ.Socket>; cache of created sockets for returned messages
* @return boolean; true if another command can be processed after this one; false when no further commands can be processed
*/
private boolean handleCommand(final byte[] data, final Map socketMap)
{
boolean result = true;
try
{
Object[] message = Sim0MQMessage.decode(data).createObjectArray();
String resultMessage = "OK";
Boolean ackNack = null;
if (message.length >= 8 && message[5] instanceof String)
{
String command = (String) message[5];
System.out.println("Publisher thread decoded Sim0MQ command: " + command);
String[] parts = command.split("\\|");
if (parts.length == 2)
{
// This is a command for the embedded Publisher
ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
socketMap);
if (null == this.publisher)
{
returnWrapper.nack("No simulation loaded; cannot execute command " + command);
System.err.println("No publisher for command " + command);
return true;
}
Object[] payload = Arrays.copyOfRange(message, 8, message.length);
this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
return true;
}
else
{
switch (command)
{
case "NEWSIMULATION":
if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
&& message[10] instanceof Duration && message[11] instanceof Long)
{
if (null != this.animationPanel)
{
for (Container container = this.animationPanel; container != null; container =
container.getParent())
{
if (container instanceof JFrame)
{
JFrame jFrame = (JFrame) container;
jFrame.dispose();
}
}
}
// System.out.println("xml length = " + ((String) message[8]).length());
resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
(Long) message[11]);
ackNack = null == resultMessage;
if (ackNack)
{
resultMessage = "OK";
}
}
else
{
resultMessage =
"No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
ackNack = false;
}
break;
case "DIE":
for (Container container = this.animationPanel; container != null; container =
container.getParent())
{
// System.out.println("container is " + container);
if (container instanceof JFrame)
{
JFrame jFrame = (JFrame) container;
jFrame.dispose();
}
}
return false;
case "SIMULATEUNTIL":
if (message.length == 9 && message[8] instanceof Time)
{
System.out.println("Simulating up to " + message[8]);
if (null == this.network)
{
resultMessage = "No network loaded";
ackNack = false;
break;
}
OTSSimulatorInterface simulator = this.network.getSimulator();
if (simulator.getSimulatorTime()
.ge(simulator.getReplication().getExperiment().getTreatment().getEndTime()))
{
resultMessage = "Simulation is already at end of simulation time";
ackNack = false;
break;
}
if (simulator.isStartingOrRunning())
{
resultMessage = "Simulator is already running"; // cannot happen for now
ackNack = false;
break;
}
ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
returnWrapper.ack(resultMessage);
simulator.runUpTo((Time) message[8]);
int count = 0;
while (this.network.getSimulator().isStartingOrRunning())
{
System.out.print(".");
count++;
if (count > 1000) // Quit after 1000 attempts of 10 ms; 10 s
{
System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
+ this.network.getSimulator().getSimulatorTime());
this.network.getSimulator().stop();
Iterator> elIt =
this.network.getSimulator().getEventList().iterator();
while (elIt.hasNext())
{
System.out.println("EVENTLIST: " + elIt.next());
}
}
try
{
Thread.sleep(10); // 10 ms
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// TODO Fix this (it is still needed - 2020-06-16)
try
{
Thread.sleep(100); // EXTRA STOP FOR SYNC REASONS - BUG IN DSOL!
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return true; // ack has been sent when simulation started
}
else
{
resultMessage = "Bad or missing stop time";
ackNack = false;
}
break;
default:
resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
ackNack = false;
break;
}
}
ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
if (ackNack)
{
returnWrapper.ack(resultMessage);
}
else
{
returnWrapper.nack(resultMessage);
}
}
else
{
// We cannot construct a ReturnWrapper because the request has too few fields.
System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
System.err.println(HexDumper.hexDumper(data));
return true; // Do we really want to try again?
}
}
catch (Sim0MQException | SerializationException | RemoteException e)
{
System.err.println("Publisher thread could not decode command");
e.printStackTrace();
}
return result;
}
}
/**
* The Model.
*/
class Sim0MQOTSModel extends AbstractOTSModel
{
/** */
private static final long serialVersionUID = 20170419L;
/** The network. */
private final OTSRoadNetwork network;
/** The XML. */
private final String xml;
/**
* @param description String; the model description
* @param network OTSRoadNetwork; the network
* @param xml String; the XML description of the simulation model
*/
Sim0MQOTSModel(final String description, final OTSRoadNetwork network, final String xml)
{
super(network.getSimulator(), network.getId(), description);
this.network = network;
this.xml = xml;
}
/** {@inheritDoc} */
@Override
public void constructModel() throws SimRuntimeException
{
try
{
XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
false);
ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)),
new LaneCombinationList(), new LaneCombinationList());
}
catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
| ParserConfigurationException | GTUException | IOException | TrafficControlException exception)
{
exception.printStackTrace();
// Abusing the SimRuntimeException to propagate the message to the main method (the problem could actually be a
// parsing problem)
throw new SimRuntimeException(exception);
}
}
/** {@inheritDoc} */
@Override
public OTSNetwork getNetwork()
{
return this.network;
}
/** {@inheritDoc} */
@Override
public Serializable getSourceId()
{
return "Sim0MQPublisherModel";
}
}