package org.opentrafficsim.demo.web; import java.io.IOException; import java.net.URL; import javax.naming.NamingException; import org.djunits.unit.DurationUnit; import org.djunits.value.vdouble.scalar.Duration; import org.djunits.value.vdouble.scalar.Time; import org.djutils.cli.Checkable; import org.djutils.cli.CliUtil; import org.djutils.io.URLResource; import org.djutils.serialization.SerializationException; import org.opentrafficsim.core.dsol.OTSAnimator; import org.opentrafficsim.core.dsol.OTSModelInterface; import org.opentrafficsim.core.dsol.OTSReplication; import org.opentrafficsim.core.dsol.OTSSimulatorInterface; import org.opentrafficsim.demo.CircularRoadModel; import org.opentrafficsim.demo.CrossingTrafficLightsModel; import org.opentrafficsim.demo.NetworksModel; import org.opentrafficsim.demo.ShortMerge; import org.opentrafficsim.demo.StraightModel; import org.opentrafficsim.demo.conflict.BusStreetDemo; import org.opentrafficsim.demo.conflict.TJunctionDemo; import org.opentrafficsim.demo.conflict.TurboRoundaboutDemo; import org.opentrafficsim.demo.trafficcontrol.TrafCODDemo1; import org.opentrafficsim.demo.trafficcontrol.TrafCODDemo2; import org.sim0mq.Sim0MQException; import org.sim0mq.message.MessageUtil; import org.sim0mq.message.Sim0MQMessage; import org.sim0mq.message.federatestarter.FS1RequestStatusMessage; import org.sim0mq.message.federationmanager.FM2SimRunControlMessage; import org.sim0mq.message.federationmanager.FM3SetParameterMessage; import org.sim0mq.message.federationmanager.FM4SimStartMessage; import org.sim0mq.message.federationmanager.FM5RequestStatus; import org.sim0mq.message.federationmanager.FM6RequestStatisticsMessage; import org.sim0mq.message.modelcontroller.MC1StatusMessage; import org.sim0mq.message.modelcontroller.MC2AckNakMessage; import org.sim0mq.message.modelcontroller.MC3StatisticsMessage; import org.sim0mq.message.modelcontroller.MC4StatisticsErrorMessage; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZMQ; import nl.tudelft.simulation.dsol.SimRuntimeException; import nl.tudelft.simulation.dsol.experiment.ReplicationMode; import picocli.CommandLine.Option; /** *
* Copyright (c) 2002-2014 Delft University of Technology, Jaffalaan 5, 2628 BX Delft, the Netherlands. All rights reserved. *
* See for project information www.simulation.tudelft.nl. *
* The DSOL project is distributed under a BSD-style license.
* @version Aug 15, 2014
* @author Alexander Verbraeck
*/
public class SuperDemoWebApplication implements Checkable
{
/** */
private OTSSimulatorInterface simulator;
/** */
private OTSModelInterface model;
/** the socket. */
private ZMQ.Socket fsSocket;
/** the context. */
private ZContext fsContext;
/** federation run id. */
private Object federationRunId;
/** modelId unique Id of the model that is used as the sender/receiver when communicating. */
private String modelId;
/** runtime. */
private Duration runDuration;
/** warmup. */
private Duration warmupDuration;
/** message count. */
private long messageCount = 0;
/** home page for the web server. */
@SuppressWarnings("checkstyle:visibilitymodifier")
@Option(names = {"-m", "--modelId"}, description = "Id of the model to run", required = true)
String homePage;
/** internet port for the web server. */
@SuppressWarnings("checkstyle:visibilitymodifier")
@Option(names = {"-p", "--port"}, description = "Internet port to use", defaultValue = "8081")
int port;
/** {@inheritDoc} */
@Override
public void check() throws Exception
{
if (this.port < 1 || this.port > 65535)
{
throw new Exception("Port number should be between 1 and 65535");
}
}
/**
* Construct a console application.
* @throws SimRuntimeException on error
* @throws NamingException on error
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
* @throws IOException when TRAFCOD file cannot be found
*/
protected void init() throws SimRuntimeException, NamingException, Sim0MQException, SerializationException, IOException
{
this.simulator = new OTSAnimator("SuperDemoWebApplication");
this.modelId = this.modelId.trim();
if (this.modelId.toLowerCase().contains("circularroad"))
{
this.model = new CircularRoadModel(this.simulator);
}
else if (this.modelId.toLowerCase().contains("straight"))
{
this.model = new StraightModel(this.simulator);
}
else if (this.modelId.toLowerCase().contains("shortmerge"))
{
this.model = new ShortMerge.ShortMergeModel(this.simulator);
}
else if (this.modelId.toLowerCase().contains("networksdemo"))
{
this.model = new NetworksModel(this.simulator);
}
else if (this.modelId.toLowerCase().contains("crossingtrafficlights"))
{
this.model = new CrossingTrafficLightsModel(this.simulator);
}
else if (this.modelId.toLowerCase().contains("trafcoddemosimple"))
{
URL url = URLResource.getResource("/TrafCODDemo1/TrafCODDemo1.xml");
String xml = TrafCODDemo2.readStringFromURL(url);
this.model = new TrafCODDemo1.TrafCODModel(this.simulator, "TrafCODDemo1", "TrafCODDemo1", xml);
}
else if (this.modelId.toLowerCase().contains("trafcoddemocomplex"))
{
URL url = URLResource.getResource("/TrafCODDemo2/TrafCODDemo2.xml");
String xml = TrafCODDemo2.readStringFromURL(url);
this.model = new TrafCODDemo2.TrafCODModel(this.simulator, "TrafCODDemo2", "TrafCODDemo2", xml);
}
else if (this.modelId.toLowerCase().contains("tjunction"))
{
this.model = new TJunctionDemo.TJunctionModel(this.simulator);
}
else if (this.modelId.toLowerCase().contains("busstreet"))
{
this.model = new BusStreetDemo.BusStreetModel(this.simulator);
}
else if (this.modelId.toLowerCase().contains("turboroundabout"))
{
this.model = new TurboRoundaboutDemo.TurboRoundaboutModel(this.simulator);
}
if (this.model == null)
{
System.err.println("Could not find model " + this.modelId);
}
else
{
startListener();
}
}
/**
* Start listening on a port.
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
protected void startListener() throws Sim0MQException, SerializationException
{
this.fsContext = new ZContext(1);
this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
this.fsSocket.bind("tcp://*:" + this.port);
System.out.println("Model started. Listening at port: " + this.port);
System.out.flush();
while (!Thread.currentThread().isInterrupted())
{
// Wait for next request from the client -- first the identity (String) and the delimiter (#0)
String identity = this.fsSocket.recvStr();
this.fsSocket.recvStr();
byte[] request = this.fsSocket.recv(0);
System.out.println(MessageUtil.printBytes(request));
Object[] fields = Sim0MQMessage.decodeToArray(request);
Object receiverId = fields[4];
Object messageTypeId = fields[5];
System.out.println("Received " + Sim0MQMessage.print(fields));
System.out.flush();
if (receiverId.equals(this.modelId))
{
switch (messageTypeId.toString())
{
case "FS.1":
processRequestStatus(identity, new FS1RequestStatusMessage(fields));
break;
case "FM.5":
processRequestStatus(identity, new FM5RequestStatus(fields));
break;
case "FM.2":
processSimRunControl(identity, new FM2SimRunControlMessage(fields));
break;
case "FM.3":
processSetParameter(identity, new FM3SetParameterMessage(fields));
break;
case "FM.4":
processSimStart(identity, new FM4SimStartMessage(fields));
break;
case "FM.6":
processRequestStatistics(identity, new FM6RequestStatisticsMessage(fields));
break;
case "FS.3":
processKillFederate();
break;
default:
// wrong message
System.err.println("Received unknown message -- not processed: " + messageTypeId);
}
}
else
{
// wrong receiver
System.err.println(
"Received message not intended for " + this.modelId + " but for " + receiverId + " -- not processed: ");
}
}
}
/**
* Process FS.1 or FM.5 message and send MC.1 message back.
* @param identity reply id for REQ-ROUTER pattern
* @param message the message (0 payload fields)
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void processRequestStatus(final String identity, final Sim0MQMessage message)
throws Sim0MQException, SerializationException
{
if (this.federationRunId == null)
{
this.federationRunId = message.getFederationId();
}
String status = "started";
if (this.simulator.isStartingOrRunning())
{
status = "running";
}
else if (this.simulator.getSimulatorTime() != null && this.simulator.getReplication() != null
&& this.simulator.getReplication().getTreatment() != null)
{
if (this.simulator.getSimulatorTime().ge(this.simulator.getReplication().getTreatment().getEndTime()))
{
status = "ended";
}
else
{
status = "error";
}
}
this.fsSocket.sendMore(identity);
this.fsSocket.sendMore("");
byte[] mc1Message = new MC1StatusMessage(this.federationRunId, this.modelId, message.getSenderId(), ++this.messageCount,
message.getMessageId(), status, "").createByteArray();
this.fsSocket.send(mc1Message, 0);
System.out.println("Sent MC.1");
System.out.flush();
}
/**
* Process FM.2 message and send MC.2 message back.
* @param identity reply id for REQ-ROUTER pattern
* @param message the FM.2 message
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void processSimRunControl(final String identity, final FM2SimRunControlMessage message)
throws Sim0MQException, SerializationException
{
boolean status = true;
String error = "";
try
{
Object runDurationField = message.getRunDuration();
if (runDurationField instanceof Number)
{
this.runDuration = new Duration(((Number) runDurationField).doubleValue(), DurationUnit.SI);
}
else if (runDurationField instanceof Duration)
{
this.runDuration = (Duration) runDurationField;
}
else
{
throw new Sim0MQException("runTimeField " + runDurationField + " neither Number nor Duration");
}
Object warmupDurationField = message.getWarmupDuration();
if (warmupDurationField instanceof Number)
{
this.warmupDuration = new Duration(((Number) warmupDurationField).doubleValue(), DurationUnit.SI);
}
else if (warmupDurationField instanceof Duration)
{
this.warmupDuration = (Duration) warmupDurationField;
}
else
{
throw new Sim0MQException("warmupField " + warmupDurationField + " neither Number nor Duration");
}
}
catch (Exception e)
{
status = false;
error = e.getMessage();
}
byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(), ++this.messageCount,
message.getMessageId(), status, error).createByteArray();
this.fsSocket.sendMore(identity);
this.fsSocket.sendMore("");
this.fsSocket.send(mc2Message, 0);
System.out.println("Sent MC.2");
System.out.flush();
}
/**
* Process FM.3 message and send MC.2 message back.
* @param identity reply id for REQ-ROUTER pattern
* @param message the FM3 message
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void processSetParameter(final String identity, final FM3SetParameterMessage message)
throws Sim0MQException, SerializationException
{
boolean status = true;
String error = "";
try
{
// TODO: change for InputParameter
/*-
String parameterName = message.getParameterName();
Object parameterValueField = message.getParameterValue();
switch (parameterName)
{
case "seed":
this.model.seed = ((Number) parameterValueField).longValue();
break;
case "iat":
this.model.iat = ((Number) parameterValueField).doubleValue();
break;
case "servicetime":
this.model.serviceTime = ((Number) parameterValueField).doubleValue();
break;
default:
status = false;
error = "Parameter " + parameterName + " unknown";
break;
}
*/
}
catch (Exception e)
{
status = false;
error = e.getMessage();
}
byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(), ++this.messageCount,
message.getMessageId(), status, error).createByteArray();
this.fsSocket.sendMore(identity);
this.fsSocket.sendMore("");
this.fsSocket.send(mc2Message, 0);
System.out.println("Sent MC.2");
System.out.flush();
}
/**
* Process FM.4 message and send MC.2 message back.
* @param identity reply id for REQ-ROUTER pattern
* @param message the FM.4 message
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void processSimStart(final String identity, final FM4SimStartMessage message)
throws Sim0MQException, SerializationException
{
boolean status = true;
String error = "";
try
{
OTSReplication replication =
OTSReplication.create("rep1", Time.ZERO, this.warmupDuration, this.runDuration, this.model);
this.simulator.initialize(replication, ReplicationMode.TERMINATING);
// TODO: different... this.simulator.scheduleEventAbs(100.0, this, this, "terminate", null);
this.simulator.start();
}
catch (Exception e)
{
status = false;
error = e.getMessage();
}
byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(), ++this.messageCount,
message.getMessageId(), status, error).createByteArray();
this.fsSocket.sendMore(identity);
this.fsSocket.sendMore("");
this.fsSocket.send(mc2Message, 0);
System.out.println("Sent MC.2");
System.out.flush();
}
/**
* Process FM.6 message and send MC.3 or MC.4 message back.
* @param identity reply id for REQ-ROUTER pattern
* @param message the FM.6 message
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void processRequestStatistics(final String identity, final FM6RequestStatisticsMessage message)
throws Sim0MQException, SerializationException
{
boolean ok = true;
String error = "";
String variableName = message.getVariableName();
double variableValue = Double.NaN;
try
{
// TODO: change for real outputs of the model.
/*-
switch (variableName)
{
case "dN.average":
variableValue = this.model.dN.getSampleMean();
break;
case "uN.average":
variableValue = this.model.uN.getSampleMean();
break;
case "qN.max":
variableValue = this.model.qN.getMax();
break;
default:
ok = false;
error = "Parameter " + variableName + " unknown";
break;
}
*/
}
catch (Exception e)
{
ok = false;
error = e.getMessage();
}
if (Double.isNaN(variableValue))
{
ok = false;
error = "Parameter " + variableName + " not set to a value";
}
if (ok)
{
byte[] mc3Message = new MC3StatisticsMessage(this.federationRunId, this.modelId, message.getSenderId(),
++this.messageCount, variableName, variableValue).createByteArray();
this.fsSocket.sendMore(identity);
this.fsSocket.sendMore("");
this.fsSocket.send(mc3Message, 0);
System.out.println("Sent MC.3");
System.out.flush();
}
else
{
byte[] mc4Message = new MC4StatisticsErrorMessage(this.federationRunId, this.modelId, message.getSenderId(),
++this.messageCount, variableName, error).createByteArray();
this.fsSocket.sendMore(identity);
this.fsSocket.sendMore("");
this.fsSocket.send(mc4Message, 0);
System.out.println("Sent MC.4");
System.out.flush();
}
}
/**
* Process FS.3 message.
*/
private void processKillFederate()
{
this.fsSocket.close();
this.fsContext.destroy();
this.fsContext.close();
System.exit(0);
}
/** stop the simulation. */
protected final void terminate()
{
// TODO: This probably goes away or is replaced by metadata
/*-
System.out.println("average queue length = " + this.model.qN.getSampleMean());
System.out.println("average queue wait = " + this.model.dN.getSampleMean());
System.out.println("average utilization = " + this.model.uN.getSampleMean());
*/
}
/**
* @param args contain e.g., port number, and a model to run: SuperDemoWebpplication --port=8080 -m TJunctionDemo.
* @throws SimRuntimeException on error
* @throws NamingException on error
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
* @throws IOException when TRAFCODDEMO file cannot be found
*/
public static void main(final String[] args)
throws SimRuntimeException, NamingException, Sim0MQException, SerializationException, IOException
{
SuperDemoWebApplication webApp = new SuperDemoWebApplication();
CliUtil.execute(webApp, args);
webApp.init();
}
}