package org.opentrafficsim.remotecontrol; import java.awt.BorderLayout; import java.awt.Container; import java.awt.Dimension; import java.awt.Frame; import java.awt.event.ActionEvent; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.rmi.RemoteException; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import javax.naming.NamingException; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.xml.bind.JAXBException; import javax.xml.parsers.ParserConfigurationException; import org.djunits.value.ValueRuntimeException; import org.djunits.value.vdouble.scalar.Duration; import org.djunits.value.vdouble.scalar.Length; import org.djunits.value.vdouble.scalar.Time; import org.djutils.cli.Checkable; import org.djutils.cli.CliUtil; import org.djutils.decoderdumper.HexDumper; import org.djutils.event.EventInterface; import org.djutils.event.EventListenerInterface; import org.djutils.event.EventTypeInterface; import org.djutils.immutablecollections.ImmutableMap; import org.djutils.logger.CategoryLogger; import org.djutils.logger.LogCategory; import org.djutils.serialization.SerializationException; import org.opentrafficsim.base.parameters.ParameterException; import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer; import org.opentrafficsim.core.dsol.AbstractOTSModel; import org.opentrafficsim.core.dsol.OTSAnimator; import org.opentrafficsim.core.dsol.OTSModelInterface; import org.opentrafficsim.core.dsol.OTSSimulatorInterface; import org.opentrafficsim.core.geometry.OTSGeometryException; import org.opentrafficsim.core.gtu.GTU; import org.opentrafficsim.core.gtu.GTUException; import org.opentrafficsim.core.gtu.GTUType; import org.opentrafficsim.core.network.Network; import org.opentrafficsim.core.network.NetworkException; import org.opentrafficsim.core.network.OTSNetwork; import org.opentrafficsim.core.object.InvisibleObjectInterface; import org.opentrafficsim.draw.core.OTSDrawingException; import org.opentrafficsim.draw.factory.DefaultAnimationFactory; import org.opentrafficsim.road.network.OTSRoadNetwork; import org.opentrafficsim.road.network.factory.xml.XmlParserException; import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser; import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder; import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList; import org.opentrafficsim.swing.gui.OTSAnimationPanel; import org.opentrafficsim.swing.gui.OTSSimulationApplication; import org.opentrafficsim.swing.gui.OTSSwingApplication; import org.opentrafficsim.trafficcontrol.TrafficControlException; import org.opentrafficsim.trafficcontrol.TrafficController; import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD; import org.pmw.tinylog.Level; import org.sim0mq.Sim0MQException; import org.sim0mq.message.Sim0MQMessage; import org.xml.sax.SAXException; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZMQ; import nl.tudelft.simulation.dsol.SimRuntimeException; import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface; import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit; import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock; import nl.tudelft.simulation.dsol.simulators.SimulatorInterface; import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane; import nl.tudelft.simulation.jstats.streams.MersenneTwister; import nl.tudelft.simulation.jstats.streams.StreamInterface; import nl.tudelft.simulation.language.d3.DirectedPoint; import picocli.CommandLine.Command; import picocli.CommandLine.Option; /** * Sim0MQ controlled OTS *

* Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See OpenTrafficSim License. *

* @version $Revision$, $LastChangedDate$, by $Author$, initial version Apr 18, 2017
* @author Alexander Verbraeck * @author Peter Knoppers * @author Wouter Schakel */ public class Sim0MQControlledOTS implements EventListenerInterface { /** ... */ private static final long serialVersionUID = 20200317L; /** Currently active model. */ private Sim0MQOTSModel model = null; /** The ZContext of all the sockets. */ private final ZContext zContext; /** The port number of the listening socket. */ private final int port; /** Communication channel to the master. */ private final MasterCommunication masterCommunication = new MasterCommunication(); /** * Construct a new Sim0MQ controlled OTS. * @param zContext ZContext; the context of ZMQ * @param port int; the port number of the listening socket */ public Sim0MQControlledOTS(final ZContext zContext, final int port) { this.zContext = zContext; this.port = port; this.masterCommunication.start(); } /** * Thread that handles ALL reads and writes on the socket to the master. */ class MasterCommunication extends Thread { @Override public void run() { System.err.println("MasterCommunication thread id is " + Thread.currentThread().getId()); ZMQ.Socket remoteControllerSocket = zContext.createSocket(SocketType.PAIR); remoteControllerSocket.setHWM(100000); remoteControllerSocket.bind("tcp://*:" + port); ZMQ.Socket resultQueue = zContext.createSocket(SocketType.PULL); resultQueue.bind("inproc://results"); ZMQ.Socket toCommandLoop = zContext.createSocket(SocketType.PUSH); toCommandLoop.setHWM(1000); toCommandLoop.connect("inproc://commands"); /*- while (!Thread.interrupted()) { byte[] data; data = remoteControllerSocket.recv(ZMQ.DONTWAIT); if (null != data) { System.err.println("Got incoming command"); toCommandLoop.send(data, 0); System.err.println("Incoming command handed over to toCommandLoop socket"); continue; } data = resultQueue.recv(ZMQ.DONTWAIT); if (null != data) { System.err.println("Got outgoing result"); remoteControllerSocket.send(data, 0); System.err.println("Outgoing result handed over to remoteControllerSocket"); } try { Thread.sleep(1); } catch (InterruptedException e) { //e.printStackTrace(); } } */ ///*- ZMQ.Poller poller = zContext.createPoller(2); poller.register(remoteControllerSocket, ZMQ.Poller.POLLIN); poller.register(resultQueue, ZMQ.Poller.POLLIN); while (!Thread.currentThread().isInterrupted()) { poller.poll(); if (poller.pollin(0)) { System.err.println("Got incoming command"); byte[] data = remoteControllerSocket.recv(); toCommandLoop.send(data, 0); System.err.println("Incoming command handed over to toCommandLoop socket"); } else if (poller.pollin(1)) { System.err.println("Got outgoing result"); byte[] data = resultQueue.recv(); remoteControllerSocket.send(data, 0); System.err.println("Outgoing result handed over to remoteControllerSocket"); } } //*/ } } /** * The command line options. */ @Command(description = "Sim0MQ Remotely Controlled OTS", name = "Sim0MQOTS", mixinStandardHelpOptions = true, version = "1.0") public static class Options implements Checkable { /** The IP port. */ @Option(names = { "-p", "--port" }, description = "Internet port to use", defaultValue = "8888") private int port; /** * Retrieve the port. * @return int; the port */ public final int getPort() { return this.port; } @Override public final void check() throws Exception { if (this.port <= 0 || this.port > 65535) { throw new Exception("Port should be between 1 and 65535"); } } } /** * Program entry point. * @param args String[]; the command line arguments * @throws OTSGeometryException on error * @throws NetworkException on error * @throws NamingException on error * @throws ValueRuntimeException on error * @throws SimRuntimeException on error * @throws ParameterException on error * @throws SerializationException on error * @throws Sim0MQException on error * @throws IOException on error */ public static void main(final String[] args) throws NetworkException, OTSGeometryException, NamingException, ValueRuntimeException, ParameterException, SimRuntimeException, Sim0MQException, SerializationException, IOException { CategoryLogger.setAllLogLevel(Level.WARNING); CategoryLogger.setLogCategories(LogCategory.ALL); Options options = new Options(); CliUtil.execute(options, args); // register Unit converters, parse the command line, etc.. int port = options.getPort(); System.out.println("Creating OTS server listening on port " + port); ZContext context = new ZContext(10); Sim0MQControlledOTS slave = new Sim0MQControlledOTS(context, port); slave.commandLoop(); // Currently, there is no shutdown command; so the following code is never executed context.destroy(); context.close(); } /** * Construct an OTS simulation experiment from an XML description. * @param xml String; the XML encoded network * @param simulationDuration Duration; total duration of the simulation * @param warmupTime Duration; warm up time of the simulation * @param seed Long; seed for the experiment * @return String; null on success, description of the problem on error */ private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed) { if (null != this.model) { return "Cannot create another network (yet)"; } else { try { OTSAnimator animator = new OTSAnimator("OTS Animator"); this.model = new Sim0MQOTSModel(animator, "OTS model", "Remotely controlled OTS model", xml); Map map = new LinkedHashMap<>(); map.put("generation", new MersenneTwister(seed)); animator.initialize(Time.ZERO, simulationDuration, warmupTime, this.model, map); this.model.getNetwork().addListener(this, Network.GTU_ADD_EVENT); this.model.getNetwork().addListener(this, Network.GTU_REMOVE_EVENT); OTSAnimationPanel animationPanel = new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000), animator, this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork()); DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer()); new Sim0MQRemoteControlSwingApplication(this.model, animationPanel); JFrame frame = (JFrame) animationPanel.getParent().getParent().getParent(); frame.setExtendedState(Frame.NORMAL); frame.setSize(new Dimension(1100, 1000)); frame.setBounds(0, 25, 1100, 1000); animator.setSpeedFactor(Double.MAX_VALUE, true); animator.setSpeedFactor(1000.0, true); ImmutableMap invisibleObjectMap = this.model.getNetwork().getInvisibleObjectMap(); animator.addListener(this, DEVSRealTimeClock.CHANGE_SPEED_FACTOR_EVENT); animator.addListener(this, SimulatorInterface.TIME_CHANGED_EVENT); for (InvisibleObjectInterface ioi : invisibleObjectMap.values()) { if (ioi instanceof TrafCOD) { TrafCOD trafCOD = (TrafCOD) ioi; Container controllerDisplayPanel = trafCOD.getDisplayContainer(); if (null != controllerDisplayPanel) { JPanel wrapper = new JPanel(new BorderLayout()); wrapper.add(new JScrollPane(controllerDisplayPanel)); TabbedContentPane tabbedPane = animationPanel.getTabbedPane(); tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper); } // trafCOD.addListener(this, // TrafficController.TRAFFICCONTROL_CONTROLLER_EVALUATING); trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_CONTROLLER_WARNING); trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_CONFLICT_GROUP_CHANGED); trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_STATE_CHANGED); trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_VARIABLE_CREATED); trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_TRACED_VARIABLE_UPDATED); } } try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll")); } catch (Exception e) { return e.getMessage(); } } return null; } /** Count transmitted messages. */ private AtomicInteger packetsSent = new AtomicInteger(0); /** * Read commands from the master, execute them and report the results. */ @SuppressWarnings("checkstyle:methodlength") public void commandLoop() { System.err.println("CommandLoop thread id is " + Thread.currentThread().getId()); ZMQ.Socket incomingCommands = this.zContext.createSocket(SocketType.PULL); incomingCommands.bind("inproc://commands"); while (!Thread.currentThread().isInterrupted()) { // Read the request from the client System.err.println("CommandLoop ready to read a command"); byte[] request = incomingCommands.recv(0); System.err.println("CommandLoop processing a command of " + request.length + " bytes"); Object[] message; String result = "At your command"; try { message = Sim0MQMessage.decode(request).createObjectArray(); System.out.println("Received Sim0MQ message:"); if (message.length >= 8 && message[5] instanceof String) { String command = (String) message[5]; System.out.println("Command is " + command); switch (command) { case "LOADNETWORK": if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration && message[10] instanceof Duration && message[11] instanceof Long) { System.out.println("xml length = " + ((String) message[8]).length()); String loadResult = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10], (Long) message[11]); if (null != loadResult) { result = loadResult; } } else { result = "no network, warmupTime and/or runTime provided with LOADNETWORK command"; } break; case "SIMULATEUNTIL": // XXX: the SimulateUntil is blocking this loop. Do we want that? if (null == this.model) { result = "No model loaded"; } else if (message.length == 9 && message[8] instanceof Time) { OTSSimulatorInterface simulator = this.model.getSimulator(); System.out.println("Simulating up to " + message[8]); simulator.runUpTo((Time) message[8]); int count = 0; while (simulator.isStartingOrRunning()) { System.out.print("."); count++; if (count > 1000) // 10 seconds { System.out.println("SIMULATOR DOES NOT STOP. TIME = " + simulator.getSimulatorTime()); Iterator> elIt = simulator.getEventList().iterator(); while (elIt.hasNext()) { System.out.println("EVENTLIST: " + elIt.next()); } simulator.stop(); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Simulator has stopped at time " + simulator.getSimulatorTime()); try { Thread.sleep(100); // EXTRA STOP FOR SYNC REASONS - BUG IN DSOL! } catch (InterruptedException e) { e.printStackTrace(); } } else { result = "Bad or missing stop time"; } break; case "SENDALLGTUPOSITIONS": if (null == this.model) { result = "No model loaded"; } else if (message.length == 8) { for (GTU gtu : this.model.network.getGTUs()) { // Send information about one GTU to master try { DirectedPoint gtuPosition = gtu.getLocation(); Object[] gtuData = new Object[] { gtu.getId(), gtu.getGTUType().getId(), gtuPosition.x, gtuPosition.y, gtuPosition.z, gtuPosition.getRotZ(), gtu.getSpeed(), gtu.getAcceleration() }; sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave_XXXXX", "master", "GTUPOSITION", 0, gtuData)); } catch (Sim0MQException | SerializationException e) { e.printStackTrace(); break; // this is fatal } } } break; default: System.out.println("Don't know how to handle message:"); System.out.println(Sim0MQMessage.print(message)); result = "Unimplemented command " + command; break; } } else { System.out.println("Don't know how to handle message:"); System.out.println(HexDumper.hexDumper(request)); result = "Ignored message"; } } catch (Sim0MQException | SerializationException e) { e.printStackTrace(); result = "Could not decode command: " + e.getMessage(); } catch (RemoteException e) { e.printStackTrace(); result = "Caught RemoteException: " + e.getMessage(); } // Send reply to master try { sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave_XXXXX", "master", "READY", 0, result)); } catch (Sim0MQException | SerializationException e) { e.printStackTrace(); break; // this is fatal } } } /** In memory sockets to talk to the multiplexer. */ private Map socketMap = new LinkedHashMap<>(); /** * Safe - synchronized - portal to send a message to the remote controller. * @param data byte[]; the data to send */ public synchronized void sendToMaster(final byte[] data) { byte[] fixedData = data; int number = -1; try { // Patch the sender field to include the packet counter value. Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray(); Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length); number = this.packetsSent.addAndGet(1); fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number), messageFields[4], messageFields[5], messageFields[6], newMessageFields); System.err.println("Prepared message " + number + ", type is " + messageFields[5]); } catch (Sim0MQException | SerializationException e) { e.printStackTrace(); } Long threadId = Thread.currentThread().getId(); ZMQ.Socket socket = this.socketMap.get(threadId); while (null == socket) { System.out.println("Creating new internal socket for thread " + threadId); try { socket = this.zContext.createSocket(SocketType.PUSH); socket.setHWM(100000); socket.connect("inproc://results"); this.socketMap.put(threadId, socket); // System.out.println("Socket created"); } catch (Exception cbie) { System.err.println("Caught funny exception - probably related to DSOL animator start/stop code ... retrying"); try { Thread.sleep(100); } catch (InterruptedException e) { System.err.println("Sleep interrupted!"); } } } System.out.println("pre send"); // ZMQ.Socket socket = this.zContext.createSocket(SocketType.PUSH); // socket.setHWM(100000); // socket.connect("inproc://results"); socket.send(fixedData, 0); // socket.close(); //System.out.println("post send"); } /** {@inheritDoc} */ @Override public void notify(final EventInterface event) throws RemoteException { try { EventTypeInterface type = event.getType(); String eventTypeName = type.getName(); System.out.println("notify: start processing event " + eventTypeName); switch (eventTypeName) { case "TRAFFICCONTROL.CONTROLLER_EVALUATING": { Object[] payload = (Object[]) event.getContent(); CategoryLogger.always().info("{}: Evaluating at time {}", payload[0], payload[1]); sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, String.format("%s: Evaluating at time %s", payload[0], payload[1]))); break; } case "TRAFFICCONTROL.CONFLICT_GROUP_CHANGED": { Object[] payload = (Object[]) event.getContent(); CategoryLogger.always().info("{}: Conflict group changed from {} to {}", payload[0], payload[1], payload[2]); sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload)); break; } case "TRAFFICCONTROL.VARIABLE_UPDATED": { Object[] payload = (Object[]) event.getContent(); CategoryLogger.always().info("{}: Variable changed {} <- {} {}", payload[0], payload[1], payload[4], payload[5]); sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload)); break; } case "TRAFFICCONTROL.CONTROLLER_WARNING": { Object[] payload = (Object[]) event.getContent(); CategoryLogger.always().info("{}: Warning {}", payload[0], payload[1]); sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload)); break; } case "TIME_CHANGED_EVENT": { CategoryLogger.always().info("Time changed to {}", event.getContent()); sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, String.format("Time changed to %s", event.getContent()))); break; } case "NETWORK.GTU.ADD": { sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, event.getContent())); break; } case "NETWORK.GTU.REMOVE": { sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, event.getContent())); break; } default: { CategoryLogger.always().info("Event of unhandled type {} with payload {}", event.getType(), event.getContent()); sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", "Event of unhandled type", 0, String .format("%s: Event of unhandled type %s with payload {}", event.getType(), event.getContent()))); break; } } System.out.println("notify: finished processing event " + eventTypeName); } catch (Sim0MQException | SerializationException e) { e.printStackTrace(); } } /** * The application. */ class Sim0MQRemoteControlSwingApplication extends OTSSimulationApplication { /** */ private static final long serialVersionUID = 1L; /** * @param model OTSModelInterface; the model * @param panel OTSAnimationPanel; the panel of the main screen * @throws OTSDrawingException on animation error */ Sim0MQRemoteControlSwingApplication(final OTSModelInterface model, final OTSAnimationPanel panel) throws OTSDrawingException { super(model, panel); } } /** * The Model. */ class Sim0MQOTSModel extends AbstractOTSModel implements EventListenerInterface { /** */ private static final long serialVersionUID = 20170419L; /** The network. */ @SuppressWarnings("checkstyle:visibilitymodifier") OTSRoadNetwork network; /** The XML. */ private final String xml; /** * @param simulator OTSSimulatorInterface; the simulator * @param shortName String; the model name * @param description String; the model description * @param xml String; the XML description of the simulation model */ Sim0MQOTSModel(final OTSSimulatorInterface simulator, final String shortName, final String description, final String xml) { super(simulator, shortName, description); this.xml = xml; } /** {@inheritDoc} */ @Override public void notify(final EventInterface event) throws RemoteException { System.err.println("Received event " + event); } /** {@inheritDoc} */ @Override public void constructModel() throws SimRuntimeException { this.network = new OTSRoadNetwork(getShortName(), true, getSimulator()); try { XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network, false); LaneCombinationList ignoreList = new LaneCombinationList(); LaneCombinationList permittedList = new LaneCombinationList(); ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE), getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)), ignoreList, permittedList); } catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException | ParserConfigurationException | GTUException | IOException | TrafficControlException exception) { exception.printStackTrace(); // Abusing the SimRuntimeException to propagate the message to the main method (the problem could actually be a // parsing problem) throw new SimRuntimeException(exception); } } /** {@inheritDoc} */ @Override public OTSNetwork getNetwork() { return this.network; } /** {@inheritDoc} */ @Override public Serializable getSourceId() { return "Sim0MQOTSModel"; } } }