package org.opentrafficsim.remotecontrol; import java.awt.BorderLayout; import java.awt.Dimension; import java.awt.EventQueue; import java.awt.FlowLayout; import java.awt.Font; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.WindowEvent; import java.awt.event.WindowListener; import java.io.IOException; import java.io.PrintStream; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Scanner; import javax.swing.JButton; import javax.swing.JComponent; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.WindowConstants; import javax.swing.border.EmptyBorder; import org.djunits.unit.DurationUnit; import org.djunits.unit.TimeUnit; import org.djunits.value.vdouble.scalar.Duration; import org.djunits.value.vdouble.scalar.Time; import org.djutils.cli.Checkable; import org.djutils.cli.CliUtil; import org.djutils.decoderdumper.HexDumper; import org.djutils.io.URLResource; import org.djutils.logger.CategoryLogger; import org.djutils.logger.LogCategory; import org.djutils.serialization.SerializationException; import org.pmw.tinylog.Level; import org.sim0mq.Sim0MQException; import org.sim0mq.message.Sim0MQMessage; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZMQ; import org.zeromq.ZMQException; import picocli.CommandLine.Command; import picocli.CommandLine.Option; /** * Remotely control OTS using Sim0MQ messages. *
* Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See OpenTrafficSim License.
*
* @version $Revision$, $LastChangedDate$, by $Author$, initial version Mar 4, 2020
* @author Alexander Verbraeck
* @author Peter Knoppers
* @author Wouter Schakel
*/
public class Sim0MQRemoteControllerNew extends JFrame implements WindowListener, ActionListener
{
/** ... */
private static final long serialVersionUID = 20200304L;
/**
* The command line options.
*/
@Command(description = "Test program for Remote Control OTS", name = "Remote Control OTS", mixinStandardHelpOptions = true,
version = "1.0")
public static class Options implements Checkable
{
/** The IP port. */
@Option(names = { "-p", "--port" }, description = "Internet port to use", defaultValue = "8888")
private int port;
/**
* Retrieve the port.
* @return int; the port
*/
public final int getPort()
{
return this.port;
}
/** The host name. */
@Option(names = { "-H", "--host" }, description = "Internet host to use", defaultValue = "localhost")
private String host;
/**
* Retrieve the host name.
* @return String; the host name
*/
public final String getHost()
{
return this.host;
}
@Override
public final void check() throws Exception
{
if (this.port <= 0 || this.port > 65535)
{
throw new Exception("Port should be between 1 and 65535");
}
}
}
/** The instance of the RemoteControl. */
@SuppressWarnings("checkstyle:visibilitymodifier")
static Sim0MQRemoteControllerNew gui = null;
/** Socket for sending messages that should be relayed to OTS. */
private ZMQ.Socket toOTS;
/**
* Start the OTS remote control program.
* @param args String[]; the command line arguments
*/
public static void main(final String[] args)
{
CategoryLogger.setAllLogLevel(Level.WARNING);
CategoryLogger.setLogCategories(LogCategory.ALL);
// Instantiate the RemoteControl GUI
try
{
EventQueue.invokeAndWait(new Runnable()
{
/** {@inheritDoc} */
@Override
public void run()
{
try
{
gui = new Sim0MQRemoteControllerNew();
gui.setVisible(true);
}
catch (Exception e)
{
e.printStackTrace();
System.exit(ERROR);
}
}
});
}
catch (Exception e)
{
e.printStackTrace();
System.exit(ERROR);
}
// We don't get here until the GUI is fully running.
Options options = new Options();
CliUtil.execute(options, args); // register Unit converters, parse the command line, etc..
gui.processArguments(options.getHost(), options.getPort());
}
/** ... */
private ZContext zContext = new ZContext(1);
/** Message relayer. */
private Thread pollerThread;
/**
* Poller thread for relaying messages between the remote OTS and local AWT.
*/
class PollerThread extends Thread
{
/** The ZContext. */
private final ZContext context;
/** The host that runs the OTS simulation. */
private final String slaveHost;
/** The port on which to connect to the OTS simulation. */
private final int slavePort;
/**
* Construct a new PollerThread for relaying messages.
* @param context ZContext; the ZMQ context
* @param slaveHost String; host name of the OTS server machine
* @param slavePort int; port number on which to connect to the OTS server machine
*/
PollerThread(final ZContext context, final String slaveHost, final int slavePort)
{
this.context = context;
this.slaveHost = slaveHost;
this.slavePort = slavePort;
}
@Override
public final void run()
{
int nextExpectedPacket = 0;
ZMQ.Socket slaveSocket = this.context.createSocket(SocketType.PAIR);
slaveSocket.setHWM(100000);
ZMQ.Socket awtSocketIn = this.context.createSocket(SocketType.PULL);
awtSocketIn.setHWM(100000);
ZMQ.Socket awtSocketOut = this.context.createSocket(SocketType.PUSH);
awtSocketOut.setHWM(100000);
slaveSocket.connect("tcp://" + this.slaveHost + ":" + this.slavePort);
awtSocketIn.bind("inproc://fromAWT");
awtSocketOut.bind("inproc://toAWT");
ZMQ.Poller items = this.context.createPoller(2);
items.register(slaveSocket, ZMQ.Poller.POLLIN);
items.register(awtSocketIn, ZMQ.Poller.POLLIN);
while (!Thread.currentThread().isInterrupted())
{
items.poll();
if (items.pollin(0))
{
byte[] message = slaveSocket.recv(0);
String expectedSenderField = String.format("slave_%05d", ++nextExpectedPacket);
try
{
Object[] messageFields = Sim0MQMessage.decode(message).createObjectArray();
String senderTag = (String) messageFields[3];
if (!senderTag.equals(expectedSenderField))
{
System.err.println("Got message " + senderTag + " , expected " + expectedSenderField
+ ", message is " + messageFields[5]);
}
}
catch (Sim0MQException | SerializationException e)
{
e.printStackTrace();
}
// System.out.println("poller has received a message on the slaveSocket; transmitting to AWT");
awtSocketOut.send(message);
}
if (items.pollin(1))
{
byte[] message = awtSocketIn.recv(0);
// System.out.println("poller has received a message on the fromAWT PULL socket; transmitting to OTS");
slaveSocket.send(message);
}
}
}
}
/**
* Open connections as specified on the command line, then start the message transfer threads.
* @param host String; host to connect to (listening OTS server should already be running)
* @param port int; port to connect to (listening OTS server should be listening on that port)
*/
public void processArguments(final String host, final int port)
{
this.output.println("host is " + host + ", port is " + port);
this.pollerThread = new PollerThread(this.zContext, host, port);
this.pollerThread.start();
this.toOTS = this.zContext.createSocket(SocketType.PUSH);
this.toOTS.setHWM(100000);
new OTS2AWT(this.zContext).start();
this.toOTS.connect("inproc://fromAWT");
}
/**
* Write something to the remote OTS.
* @param command String; the command to write
* @throws IOException when communication fails
*/
public void write(final String command) throws IOException
{
this.toOTS.send(command);
// output.println("Wrote " + command.getBytes().length + " bytes");
this.output.println("Sent string \"" + command + "\"");
}
/**
* Write something to the remote OTS.
* @param bytes byte[]; the bytes to write
* @throws IOException when communication fails
*/
public void write(final byte[] bytes) throws IOException
{
this.toOTS.send(bytes);
// output.println("Wrote " + command.getBytes().length + " bytes");
// output.println(HexDumper.hexDumper(bytes));
}
/** Step to button. */
private JButton stepTo;
/**
* Construct the GUI.
*/
Sim0MQRemoteControllerNew()
{
// Construct the GUI
setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
setBounds(100, 100, 1000, 800);
JPanel panelAll = new JPanel();
panelAll.setBorder(new EmptyBorder(5, 5, 5, 5));
panelAll.setLayout(new BorderLayout(0, 0));
setContentPane(panelAll);
JPanel panelControls = new JPanel();
panelAll.add(panelControls, BorderLayout.PAGE_START);
JTextArea textArea = new JTextArea();
textArea.setFont(new Font("monospaced", Font.PLAIN, 12));
JScrollPane scrollPane = new JScrollPane(textArea);
scrollPane.setPreferredSize(new Dimension(800, 400));
panelAll.add(scrollPane, BorderLayout.PAGE_END);
this.output = new PrintStream(new TextAreaOutputStream(textArea), true);
JPanel controls = new JPanel();
controls.setLayout(new FlowLayout());
JButton sendNetwork = new JButton("Send network");
sendNetwork.setActionCommand("SendNetwork");
sendNetwork.addActionListener(this);
controls.add(sendNetwork);
this.stepTo = new JButton("Step to 10 s");
this.stepTo.setActionCommand("StepTo");
this.stepTo.addActionListener(this);
controls.add(this.stepTo);
JButton step100TimesTo = new JButton("Step 100 times 10 s");
step100TimesTo.setActionCommand("Step100To");
step100TimesTo.addActionListener(this);
controls.add(step100TimesTo);
JButton getGTUPositions = new JButton("Get all GTU positions");
getGTUPositions.setActionCommand("GetAllGTUPositions");
getGTUPositions.addActionListener(this);
controls.add(getGTUPositions);
JButton shutDown = new JButton("Shutdown server");
shutDown.setActionCommand("Send DIE command");
shutDown.addActionListener(this);
controls.add(shutDown);
panelAll.add(controls, BorderLayout.CENTER);
}
/** Debugging and other output goes here. */
@SuppressWarnings("checkstyle:visibilitymodifier")
PrintStream output = null;
/**
* Shut down this application.
*/
public void shutDown()
{
// Do we have to kill anything for a clean exit?
}
/** {@inheritDoc} */
@Override
public void windowOpened(final WindowEvent e)
{
// Do nothing
}
/** {@inheritDoc} */
@Override
public final void windowClosing(final WindowEvent e)
{
shutDown();
}
/** {@inheritDoc} */
@Override
public void windowClosed(final WindowEvent e)
{
// Do nothing
}
/** {@inheritDoc} */
@Override
public void windowIconified(final WindowEvent e)
{
// Do nothing
}
/** {@inheritDoc} */
@Override
public void windowDeiconified(final WindowEvent e)
{
// Do nothing
}
/** {@inheritDoc} */
@Override
public void windowActivated(final WindowEvent e)
{
// Do nothing
}
/** {@inheritDoc} */
@Override
public void windowDeactivated(final WindowEvent e)
{
// Do nothing
}
/**
* Thread that reads results from OTS and (for now) writes those to the textArea.
*/
class OTS2AWT extends Thread
{
/** Socket where the message from OTS will appear. */
private final ZMQ.Socket fromOTS;
/**
* Construct a new OTS2AWT thread.
* @param zContext ZContext; the ZContext that is needed to construct the PULL socket to read the messages
*/
OTS2AWT(final ZContext zContext)
{
this.fromOTS = zContext.createSocket(SocketType.PULL);
this.fromOTS.setHWM(100000);
this.fromOTS.connect("inproc://toAWT");
}
/**
* Interpret the ACK NACK field.
* @param o Object; the actual object that should be an ACK or a NACK
* @return String; textual representation of o
*/
private String ackNack(final Object o)
{
if (null == o)
{
return "null";
}
if (o instanceof Boolean)
{
return ((Boolean) o) ? "ACK" : "NACK";
}
return "????";
}
/** {@inheritDoc} */
@Override
public void run()
{
do
{
try
{
// Read from remotely controlled OTS
byte[] bytes = this.fromOTS.recv(0); /// XXX: this one is okay to block
// System.out.println("remote controller has received a message on the fromOTS PULL socket");
Object[] message = Sim0MQMessage.decode(bytes).createObjectArray();
if (message.length > 8 && message[5] instanceof String)
{
// System.out.println(Sim0MQMessage.print(message));
String command = (String) message[5];
switch (command)
{
case "GTU move":
Sim0MQRemoteControllerNew.this.output
.println(String.format("%10.10s (%s): location=%s heading=%s, v=%s, a=%s", message[8],
message[9], message[10], message[11], message[12], message[13]));
break;
case "NEWSIMULATION":
Sim0MQRemoteControllerNew.this.output
.println(String.format("NEWSIMULATION %s: %s", ackNack(message[8]), message[9]));
break;
case "SIMULATEUNTIL":
Sim0MQRemoteControllerNew.this.output
.println(String.format("SIMULATEUNTIL %s: %s", ackNack(message[8]), message[9]));
break;
case "GTUs in network":
StringBuilder listOfGTUIds = new StringBuilder();
listOfGTUIds.append(message[5] + ":");
for (int index = 8; index < message.length; index++)
{
listOfGTUIds.append(" " + message[index]);
}
Sim0MQRemoteControllerNew.this.output.println(listOfGTUIds.toString());
for (int index = 8; index < message.length; index++)
{
// Request detailed data
try
{
write(Sim0MQMessage.encodeUTF8(true, 0, "RemoteControl", "OTS", "GTU move|GET_CURRENT",
0, message[index]));
}
catch (IOException e)
{
e.printStackTrace();
}
}
break;
case "TIME_CHANGED_EVENT":
Sim0MQRemoteControllerNew.this.output.println(message[8]);
break;
case "TRAFFICCONTROL.CONTROLLER_WARNING":
Sim0MQRemoteControllerNew.this.output
.println(String.format("%s: warning %s", message[8], message[9]));
break;
case "TRAFFICCONTROL.CONTROLLER_EVALUATING":
Sim0MQRemoteControllerNew.this.output
.println(String.format("%s: evaluating %s", message[8], message[9]));
break;
case "TRAFFICCONTROL.CONFLICT_GROUP_CHANGED":
Sim0MQRemoteControllerNew.this.output.println(String.format(
"%s: conflict group changed from %s to %s", message[8], message[9], message[10]));
break;
case "NETWORK.GTU.ADD":
Sim0MQRemoteControllerNew.this.output.println(String.format("GTU added %s", message[8]));
break;
case "NETWORK.GTU.REMOVE":
Sim0MQRemoteControllerNew.this.output.println(String.format("GTU removed %s", message[8]));
break;
case "READY":
Sim0MQRemoteControllerNew.this.output.println("Slave is ready for the next command");
break;
default:
Sim0MQRemoteControllerNew.this.output.println("Unhandled reply: " + command);
Sim0MQRemoteControllerNew.this.output.println(HexDumper.hexDumper(bytes));
Sim0MQRemoteControllerNew.this.output.println("Received:");
Sim0MQRemoteControllerNew.this.output.println(Sim0MQMessage.print(message));
break;
}
}
else
{
Sim0MQRemoteControllerNew.this.output.println(HexDumper.hexDumper(bytes));
}
}
catch (ZMQException | Sim0MQException | SerializationException e)
{
e.printStackTrace();
return;
}
}
while (true);
}
}
/**
* Open an URL, read it and store the contents in a string. Adapted from
* https://stackoverflow.com/questions/4328711/read-url-to-string-in-few-lines-of-java-code
* @param url URL; the URL
* @return String
* @throws IOException when reading the file fails
*/
public static String readStringFromURL(final URL url) throws IOException
{
try (Scanner scanner = new Scanner(url.openStream(), StandardCharsets.UTF_8.toString()))
{
scanner.useDelimiter("\\A");
return scanner.hasNext() ? scanner.next() : "";
}
}
/** {@inheritDoc} */
@Override
public void actionPerformed(final ActionEvent e)
{
switch (e.getActionCommand())
{
case "SendNetwork":
{
String networkFile = "/TrafCODDemo2/TrafCODDemo2.xml";
Duration warmupDuration = Duration.ZERO;
Duration runDuration = new Duration(3600, DurationUnit.SECOND);
Long seed = 123456L;
URL url = URLResource.getResource(networkFile);
// System.out.println("url is " + url);
try
{
String xml = readStringFromURL(url);
// System.out.println("xml length = " + xml.length());
try
{
write(Sim0MQMessage.encodeUTF8(true, 0, "RemoteControl", "OTS", "NEWSIMULATION", 0, xml, runDuration,
warmupDuration, seed));
String caption = this.stepTo.getText();
int position;
for (position = 0; position < caption.length(); position++)
{
if (Character.isDigit(caption.charAt(position)))
{
break;
}
}
Time toTime = new Time(10.0, TimeUnit.BASE_SECOND);
this.stepTo.setText(caption.substring(0, position)
+ String.format("%.0f %s", toTime.getInUnit(), toTime.getDisplayUnit()));
}
catch (IOException e1)
{
this.output.println("Write failed; Caught IOException");
((JComponent) e.getSource()).setEnabled(false);
}
catch (Sim0MQException e1)
{
e1.printStackTrace();
}
catch (SerializationException e1)
{
e1.printStackTrace();
}
}
catch (IOException e2)
{
System.err.println("Could not load file " + networkFile);
e2.printStackTrace();
}
break;
}
case "StepTo":
{
String caption = this.stepTo.getText();
int position;
for (position = 0; position < caption.length(); position++)
{
if (Character.isDigit(caption.charAt(position)))
{
break;
}
}
Time toTime = Time.valueOf(caption.substring(position));
try
{
write(Sim0MQMessage.encodeUTF8(true, 0, "RemoteControl", "OTS", "SIMULATEUNTIL", 0, toTime));
toTime = toTime.plus(new Duration(10, DurationUnit.SECOND));
this.stepTo.setText(caption.substring(0, position)
+ String.format("%.0f %s", toTime.getInUnit(), toTime.getDisplayUnit()));
}
catch (IOException | Sim0MQException | SerializationException e1)
{
e1.printStackTrace();
}
break;
}
case "Step100To":
{
for (int i = 0; i < 100; i++)
{
actionPerformed(new ActionEvent(this.stepTo, 0, "StepTo"));
}
break;
}
case "GetAllGTUPositions":
{
try
{
write(Sim0MQMessage.encodeUTF8(true, 0, "RemoteControl", "OTS", "GTUs in network|GET_CURRENT", 0));
}
catch (IOException | Sim0MQException | SerializationException e1)
{
e1.printStackTrace();
}
break;
}
case "Send DIE command":
{
try
{
write(Sim0MQMessage.encodeUTF8(true, 0, "RemoteControl", "OTS", "DIE", 0));
}
catch (IOException | Sim0MQException | SerializationException e1)
{
e1.printStackTrace();
}
break;
}
default:
this.output.println("Oops: unhandled action command");
}
}
}