package org.sim0mq.demo.mm1; import java.rmi.RemoteException; import javax.naming.NamingException; import org.djunits.unit.DurationUnit; import org.djunits.value.vdouble.scalar.Duration; import org.djutils.serialization.SerializationException; import org.sim0mq.Sim0MQException; import org.sim0mq.message.MessageStatus; import org.sim0mq.message.MessageUtil; import org.sim0mq.message.SimulationMessage; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZMQ; import nl.tudelft.simulation.dsol.SimRuntimeException; import nl.tudelft.simulation.dsol.experiment.Replication; import nl.tudelft.simulation.dsol.experiment.ReplicationMode; import nl.tudelft.simulation.dsol.simulators.DEVSSimulator; import nl.tudelft.simulation.dsol.simulators.DEVSSimulatorInterface; /** *
* Copyright (c) 2015-2019 Delft University of Technology, Jaffalaan 5, 2628 BX Delft, the Netherlands. All rights reserved. *
* See for project information www.simulation.tudelft.nl. *
* The DSOL project is distributed under a BSD-style license.
* @version Aug 15, 2014
* @author Alexander Verbraeck
*/
public class MM1Queue41Application
{
/** */
private DEVSSimulator.TimeDouble simulator;
/** */
private MM1Queue41Model model;
/** the socket. */
private ZMQ.Socket fsSocket;
/** the context. */
private ZContext fsContext;
/** federation run id. */
private Object federationRunId;
/** modelId unique Id of the model that is used as the sender/receiver when communicating. */
private String modelId;
/** runtime. */
private Duration runTime;
/** warmup. */
private Duration warmupTime;
/** message count. */
private long messageCount = 0;
/**
* Construct a console application.
* @param modelId unique Id of the model that is used as the sender/receiver when communicating
* @param port the sim0mq port number on which the model listens
* @throws SimRuntimeException on error
* @throws RemoteException on error
* @throws NamingException on error
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
protected MM1Queue41Application(final String modelId, final int port)
throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
{
this.simulator = new DEVSSimulator.TimeDouble();
this.modelId = modelId.trim();
this.model = new MM1Queue41Model(this.simulator);
startListener(port);
}
/**
* Start listening on a port.
* @param port the sim0mq port number on which the model listens
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
protected void startListener(final int port) throws Sim0MQException, SerializationException
{
this.fsContext = new ZContext(1);
this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
this.fsSocket.bind("tcp://*:" + port);
System.out.println("Model started. Listening at port: " + port);
System.out.flush();
while (!Thread.currentThread().isInterrupted())
{
// Wait for next request from the client -- first the identity (String) and the delimiter (#0)
String identity = this.fsSocket.recvStr();
this.fsSocket.recvStr();
byte[] request = this.fsSocket.recv(0);
System.out.println(MessageUtil.printBytes(request));
Object[] fields = SimulationMessage.decode(request);
System.out.println("Received " + SimulationMessage.print(fields));
System.out.flush();
this.federationRunId = fields[1];
String senderId = fields[2].toString();
String receiverId = fields[3].toString();
String messageId = fields[4].toString();
long uniqueId = ((Long) fields[5]).longValue();
if (receiverId.equals(this.modelId))
{
System.err.println("Received: " + messageId + ", payload = " + SimulationMessage.listPayload(fields));
switch (messageId)
{
case "FS.1":
case "FM.5":
processRequestStatus(identity, senderId, uniqueId);
break;
case "FM.2":
processSimRunControl(identity, senderId, uniqueId, fields);
break;
case "FM.3":
processSetParameter(identity, senderId, uniqueId, fields);
break;
case "FM.4":
processSimStart(identity, senderId, uniqueId);
break;
case "FM.6":
processRequestStatistics(identity, senderId, uniqueId, fields);
break;
case "FS.3":
processKillFederate();
break;
default:
// wrong message
System.err.println("Received unknown message -- not processed: " + messageId);
}
}
else
{
// wrong receiver
System.err.println(
"Received message not intended for " + this.modelId + " but for " + receiverId + " -- not processed: ");
}
}
}
/**
* Process FS.1 message and send MC.1 message back.
* @param identity reply id for REQ-ROUTER pattern
* @param receiverId the receiver of the response
* @param replyToMessageId the message to which this is the reply
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void processRequestStatus(final String identity, final String receiverId, final long replyToMessageId)
throws Sim0MQException, SerializationException
{
String status = "started";
if (this.simulator.isRunning())
{
status = "running";
}
else if (this.simulator.getSimulatorTime() != null && this.simulator.getReplication() != null
&& this.simulator.getReplication().getTreatment() != null)
{
if (this.simulator.getSimulatorTime() >= this.simulator.getReplication().getTreatment().getEndTime())
{
status = "ended";
}
else
{
status = "error";
}
}
this.fsSocket.sendMore(identity);
this.fsSocket.sendMore("");
byte[] mc1Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.1",
++this.messageCount, MessageStatus.NEW, replyToMessageId, status, "");
this.fsSocket.send(mc1Message, 0);
System.out.println("Sent MC.1");
System.out.flush();
}
/**
* Process FM.2 message and send MC.2 message back.
* @param identity reply id for REQ-ROUTER pattern
* @param receiverId the receiver of the response
* @param replyToMessageId the message to which this is the reply
* @param fields the message
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void processSimRunControl(final String identity, final String receiverId, final long replyToMessageId,
final Object[] fields) throws Sim0MQException, SerializationException
{
boolean status = true;
String error = "";
try
{
Object runTimeField = fields[8];
if (runTimeField instanceof Number)
{
this.runTime = new Duration(((Number) fields[8]).doubleValue(), DurationUnit.SI);
}
else if (runTimeField instanceof Duration)
{
this.runTime = (Duration) runTimeField;
}
else
{
throw new Sim0MQException("runTimeField " + runTimeField + " neither Number nor Duration");
}
Object warmupField = fields[8];
if (warmupField instanceof Number)
{
this.warmupTime = new Duration(((Number) fields[9]).doubleValue(), DurationUnit.SI);
}
else if (warmupField instanceof Duration)
{
this.warmupTime = (Duration) warmupField;
}
else
{
throw new Sim0MQException("warmupField " + warmupField + " neither Number nor Duration");
}
}
catch (Exception e)
{
status = false;
error = e.getMessage();
}
byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
this.fsSocket.sendMore(identity);
this.fsSocket.sendMore("");
this.fsSocket.send(mc2Message, 0);
System.out.println("Sent MC.2");
System.out.flush();
}
/**
* Process FM.3 message and send MC.2 message back.
* @param identity reply id for REQ-ROUTER pattern
* @param receiverId the receiver of the response
* @param replyToMessageId the message to which this is the reply
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void processSimStart(final String identity, final String receiverId, final long replyToMessageId)
throws Sim0MQException, SerializationException
{
boolean status = true;
String error = "";
try
{
Replication.TimeDouble