package org.sim0mq.demo.mm1; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.djutils.serialization.SerializationException; import org.sim0mq.Sim0MQException; import org.sim0mq.federationmanager.ModelState; import org.sim0mq.message.Sim0MQMessage; import org.sim0mq.message.federatestarter.FS2FederateStartedMessage; import org.sim0mq.message.federatestarter.FS4FederateKilledMessage; import org.sim0mq.message.federationmanager.FM1StartFederateMessage; import org.sim0mq.message.federationmanager.FM2SimRunControlMessage; import org.sim0mq.message.federationmanager.FM3SetParameterMessage; import org.sim0mq.message.federationmanager.FM4SimStartMessage; import org.sim0mq.message.federationmanager.FM5RequestStatus; import org.sim0mq.message.federationmanager.FM6RequestStatisticsMessage; import org.sim0mq.message.federationmanager.FM8KillFederateMessage; import org.sim0mq.message.modelcontroller.MC1StatusMessage; import org.sim0mq.message.modelcontroller.MC2AckNakMessage; import org.sim0mq.message.modelcontroller.MC3StatisticsMessage; import org.sim0mq.message.modelcontroller.MC4StatisticsErrorMessage; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZMQ; /** * Example implementation of a FederationManager to start the MM1Queue41Application DSOL model. *

* Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See Sim0MQ License. *

* $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $, * initial version April 10, 2017
* @author Alexander Verbraeck */ public final class MM1FederationManager20 { /** * @param args parameters for main * @throws Sim0MQException on error */ public static void main(final String[] args) throws Sim0MQException { if (args.length < 5) { System.err.println("Use as FederationManager federationName federationManagerPortNumber " + "federateStarterIPorName federateStarterPortNumber modelFolder"); System.exit(-1); } String federationName = args[0]; String fmsPort = args[1]; int fmPort = 0; try { fmPort = Integer.parseInt(fmsPort); } catch (NumberFormatException nfe) { System.err.println("Use as FederationManager fedName fmPort fsIP fsPort modelFolder, where fmPort is a number"); System.exit(-1); } if (fmPort == 0 || fmPort > 65535) { System.err.println("fmPort should be between 1 and 65535"); System.exit(-1); } String fsServerNameOrIP = args[2]; String fsPortString = args[3]; int fsPort = 0; try { fsPort = Integer.parseInt(fsPortString); } catch (NumberFormatException nfe) { System.err.println("Use as FederationManager fedName fmPort fsIP fsPort modelFolder, where fmPort is a number"); System.exit(-1); } if (fsPort == 0 || fsPort > 65535) { System.err.println("fsPort should be between 1 and 65535"); System.exit(-1); } String mm1ModelFolder = args[4]; new MM1FederationManager20(federationName, fmPort, fsServerNameOrIP, fsPort, mm1ModelFolder); } /** * Send an FM.1 message to the FederateStarter. * @param federationName the name of the federation * @param fmPort the port number to listen on * @param fsServerNameOrIP name or IP address of the federate starter we are using * @param fsPort the port where the federate starter can be reached * @param mm1ModelFolder location on the computer of the federate starter where the model can be found * @throws Sim0MQException on error */ private MM1FederationManager20(final String federationName, final int fmPort, final String fsServerNameOrIP, final int fsPort, final String mm1ModelFolder) throws Sim0MQException { AtomicLong messageCount = new AtomicLong(0L); AtomicInteger nrRunning = new AtomicInteger(); Map> statMap = Collections.synchronizedMap(new LinkedHashMap>()); for (int modelNr = 0; modelNr < 20; modelNr++) { new Thread() { @Override public void run() { final int nr = nrRunning.getAndIncrement(); StateMachine stateMachine = null; System.out.println("inc modelNr to " + nr); try { stateMachine = new StateMachine(messageCount, federationName, fsServerNameOrIP, fsPort, mm1ModelFolder, nr); } catch (Sim0MQException | SerializationException exception) { exception.printStackTrace(); } int decNr = nrRunning.decrementAndGet(); System.out.println("dec modelNr to " + decNr); synchronized (statMap) { statMap.put(nr, stateMachine.getStatistics()); } } }.start(); } while (nrRunning.get() > 0) { Thread.yield(); } synchronized (statMap) { for (int nr : statMap.keySet()) { Map stats = statMap.get(nr); StringBuilder s = new StringBuilder(); s.append(String.format("%2d ", nr)); for (String code : stats.keySet()) { s.append(String.format("%10s=%10.4f ", code, stats.get(code).doubleValue())); } System.out.println(s.toString()); } } } /** * State machine to run several models in parallel. *

* Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. *
* BSD-style license. See Sim0MQ License. *

* $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $, * initial version May 5, 2017
* @author Alexander Verbraeck */ static class StateMachine { /** the state of the started model. */ private ModelState state; /** the model socket. */ private ZMQ.Socket modelSocket; /** the model name. */ private String modelName; /** the federate starter socket. */ private ZMQ.Socket fsSocket; /** the context. */ private ZContext fmContext; /** the message counter. */ private AtomicLong messageCount; /** statistics. */ private Map statistics = new LinkedHashMap<>(); /** * @param messageCount AtomicLong; message counter * @param federationName the name of the federation * @param fsServerNameOrIP name or IP address of the federate starter we are using * @param fsPort the port where the federate starter can be reached * @param mm1ModelFolder location on the computer of the federate starter where the model can be found * @param modelNr sequence number of the model to run * @throws Sim0MQException on error * @throws SerializationException on serialization problem */ StateMachine(final AtomicLong messageCount, final String federationName, final String fsServerNameOrIP, final int fsPort, final String mm1ModelFolder, final int modelNr) throws Sim0MQException, SerializationException { this.fmContext = new ZContext(1); this.fsSocket = this.fmContext.createSocket(SocketType.REQ); this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes()); this.modelName = "MM1." + modelNr; this.messageCount = messageCount; this.modelSocket = this.fmContext.createSocket(SocketType.REQ); this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes()); this.state = ModelState.NOT_STARTED; boolean ready = false; while (!ready) { System.out.println(this.state); switch (this.state) { case NOT_STARTED: // federationName, fsServerNameOrIP, fsPort, mm1ModelFolder startModel(federationName, fsServerNameOrIP, fsPort, mm1ModelFolder); break; case STARTED: sendSimRunControl(federationName); break; case RUNCONTROL: setParameters(federationName); break; case PARAMETERS: sendSimStart(federationName); break; case SIMULATORSTARTED: waitForSimEnded(federationName); break; case SIMULATORENDED: requestStatistics(federationName); break; case STATISTICSGATHERED: killFederate(federationName); ready = true; break; case ERROR: killFederate(federationName); ready = true; break; default: break; } } this.fsSocket.close(); this.modelSocket.close(); this.fmContext.destroy(); this.fmContext.close(); } /** * Send the FM.1 message to the FederateStarter to start the MM1 model. * @param federationName the name of the federation * @param fsServerNameOrIP name or IP address of the federate starter we are using * @param fsPort the port where the federate starter can be reached * @param mm1ModelFolder location on the computer of the federate starter where the model can be found * @throws Sim0MQException on error * @throws SerializationException on serialization problem */ private void startModel(final String federationName, final String fsServerNameOrIP, final int fsPort, final String mm1ModelFolder) throws Sim0MQException, SerializationException { // Start model mmm1.jar byte[] fm1Message = new FM1StartFederateMessage(federationName, "FM", "FS", this.messageCount.getAndIncrement(), this.modelName, "java8+", "-jar", mm1ModelFolder + "/mm1.jar", this.modelName + " %PORT%", mm1ModelFolder, "", mm1ModelFolder + "/out_" + this.modelName + ".txt", mm1ModelFolder + "/err_" + this.modelName + ".txt", false, true, true).createByteArray(); this.fsSocket.connect("tcp://" + fsServerNameOrIP + ":" + fsPort); this.fsSocket.send(fm1Message); byte[] reply = this.fsSocket.recv(0); try { Object[] fs2Fields = Sim0MQMessage.decode(reply).createObjectArray(); System.out.println("Received\n" + Sim0MQMessage.print(fs2Fields)); FS2FederateStartedMessage message = new FS2FederateStartedMessage(fs2Fields); if (message.getStatus().toString().equals("started") && message.getInstanceId().equals(this.modelName)) { this.state = ModelState.STARTED; this.modelSocket.connect("tcp://" + fsServerNameOrIP + ":" + message.getModelPort()); } else { this.state = ModelState.ERROR; System.err.println("Model not started correctly -- state = " + message.getStatus()); System.err.println("Started model = " + message.getInstanceId() + " on port " + message.getModelPort()); System.err.println("Error message = " + message.getError()); } } catch (Exception exception) { this.state = ModelState.ERROR; System.err.println("Model not started correctly -- error = " + exception.getClass().getSimpleName()); System.err.println("Started instance of model = " + this.modelName); System.err.println("Error message = " + exception.getMessage()); } } /** * Send the SimRunControl message FM.2. * @param federationName the name of the federation * @throws Sim0MQException on error * @throws SerializationException on serialization problem */ private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException { long messageNumber = this.messageCount.get(); byte[] fm2Message = new FM2SimRunControlMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(), 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0, new HashMap()).createByteArray(); this.modelSocket.send(fm2Message); byte[] reply = this.modelSocket.recv(0); try { Object[] mc2Fields = Sim0MQMessage.decode(reply).createObjectArray(); System.out.println("Received\n" + Sim0MQMessage.print(mc2Fields)); MC2AckNakMessage message = new MC2AckNakMessage(mc2Fields); if (message.getStatus() && (Long) message.getReplyToId() == messageNumber) { this.state = ModelState.RUNCONTROL; } else { this.state = ModelState.ERROR; System.err.println("Model not started correctly -- state = " + message.getStatus()); System.err.println("Error message = " + message.getError()); } } catch (Exception exception) { this.state = ModelState.ERROR; System.err.println("Model not started correctly -- error = " + exception.getClass().getSimpleName()); System.err.println("Error message = " + exception.getMessage()); } } /** * Send the Parameters messages FM.3. * @param federationName the name of the federation * @throws Sim0MQException on error * @throws SerializationException on serialization problem */ private void setParameters(final String federationName) throws Sim0MQException, SerializationException { Map parameters = new LinkedHashMap<>(); parameters.put("iat", new Double(1.0)); parameters.put("servicetime", new Double(0.85)); parameters.put("seed", Math.abs(this.modelName.hashCode())); for (String parameterName : parameters.keySet()) { if (!this.state.isError()) { long messageNumber = this.messageCount.get(); byte[] fm3Message = new FM3SetParameterMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(), parameterName, parameters.get(parameterName)).createByteArray(); this.modelSocket.send(fm3Message); byte[] reply = this.modelSocket.recv(0); try { Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray(); System.out.println("Received\n" + Sim0MQMessage.print(replyFields)); MC2AckNakMessage message = new MC2AckNakMessage(replyFields); if (message.getStatus() && (Long) message.getReplyToId() == messageNumber) { this.state = ModelState.PARAMETERS; } else { this.state = ModelState.ERROR; System.err.println("Model parameter error -- status = " + message.getStatus()); System.err.println("Error message = " + message.getError()); } } catch (Exception exception) { this.state = ModelState.ERROR; System.err.println("Model parameter error = " + exception.getClass().getSimpleName()); System.err.println("Error message = " + exception.getMessage()); } } } if (!this.state.isError()) { this.state = ModelState.PARAMETERS; } } /** * Send the SimStart message FM.4. * @param federationName the name of the federation * @throws Sim0MQException on error * @throws SerializationException on serialization problem */ private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException { long messageNumber = this.messageCount.get(); byte[] fm4Message = new FM4SimStartMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement()).createByteArray(); this.modelSocket.send(fm4Message); byte[] reply = this.modelSocket.recv(0); try { Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray(); System.out.println("Received\n" + Sim0MQMessage.print(replyFields)); MC2AckNakMessage message = new MC2AckNakMessage(replyFields); if (message.getStatus() && (Long) message.getReplyToId() == messageNumber) { this.state = ModelState.SIMULATORSTARTED; } else { this.state = ModelState.ERROR; System.err.println("Model start error -- status = " + message.getStatus()); System.err.println("Error message = " + message.getError()); } } catch (Exception exception) { this.state = ModelState.ERROR; System.err.println("Model start error = " + exception.getClass().getSimpleName()); System.err.println("Error message = " + exception.getMessage()); } } /** * Wait for simulation to end using status polling with message FM.5. * @param federationName the name of the federation * @throws Sim0MQException on error * @throws SerializationException on serialization problem */ private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException { while (!this.state.isSimulatorEnded() && !this.state.isError()) { long messageNumber = this.messageCount.get(); byte[] fm5Message = new FM5RequestStatus(federationName, "FM", this.modelName, this.messageCount.getAndIncrement()).createByteArray(); this.modelSocket.send(fm5Message); byte[] reply = this.modelSocket.recv(0); try { Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray(); System.out.println("Received\n" + Sim0MQMessage.print(replyFields)); MC1StatusMessage message = new MC1StatusMessage(replyFields); if (!message.getStatus().equals("error") && !message.getStatus().equals("started") && (Long) message.getReplyToId() == messageNumber) { if (message.getStatus().equals("ended")) { this.state = ModelState.SIMULATORENDED; } else { // wait a second try { Thread.sleep(1000); } catch (InterruptedException ie) { // ignore } } } else { this.state = ModelState.ERROR; System.err.println("Model poll status error -- status = " + message.getStatus()); System.err.println("Error message = " + message.getError()); } } catch (Exception exception) { this.state = ModelState.ERROR; System.err.println("Model poll status error = " + exception.getClass().getSimpleName()); System.err.println("Error message = " + exception.getMessage()); } } } /** * Request statistics with message FM.6. * @param federationName the name of the federation * @throws Sim0MQException on error * @throws SerializationException on serialization problem */ private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException { List stats = new ArrayList<>(); stats.add("dN.average"); stats.add("qN.max"); stats.add("uN.average"); for (String statName : stats) { if (!this.state.isError()) { byte[] fm6Message = new FM6RequestStatisticsMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(), statName).createByteArray(); this.modelSocket.send(fm6Message); byte[] reply = this.modelSocket.recv(0); try { Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray(); System.out.println("Received\n" + Sim0MQMessage.print(replyFields)); if (replyFields[5].toString().equals("MC.3")) { MC3StatisticsMessage message = new MC3StatisticsMessage(replyFields); if (message.getVariableName().equals(statName)) { System.out.println( "Received statistic for " + statName + " = " + message.getVariableValue().toString()); this.statistics.put(statName, (Number) message.getVariableValue()); } else { this.state = ModelState.ERROR; System.err.println("Statistics Error: Stat variable expected = " + statName + ", got: " + message.getVariableName()); } } else if (replyFields[5].toString().equals("MC.4")) { MC4StatisticsErrorMessage message = new MC4StatisticsErrorMessage(replyFields); this.state = ModelState.ERROR; System.err.println("Statistics Error: Stat variable = " + message.getVariableName()); System.err.println("Error message = " + message.getError()); } } catch (Exception exception) { this.state = ModelState.ERROR; System.err.println("Model get statistics error = " + exception.getClass().getSimpleName()); System.err.println("Error message = " + exception.getMessage()); } } } if (!this.state.isError()) { this.state = ModelState.STATISTICSGATHERED; } } /** * Send the FM.8 message to the FederateStarter to kill the MM1 model. * @param federationName the name of the federation * @throws Sim0MQException on error * @throws SerializationException on serialization problem */ private void killFederate(final String federationName) throws Sim0MQException, SerializationException { byte[] fm8Message = new FM8KillFederateMessage(federationName, "FM", "FS", this.messageCount.getAndIncrement(), this.modelName) .createByteArray(); this.fsSocket.send(fm8Message); byte[] reply = this.fsSocket.recv(0); try { Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray(); System.out.println("Received\n" + Sim0MQMessage.print(replyFields)); FS4FederateKilledMessage message = new FS4FederateKilledMessage(replyFields); if (message.isStatus() && message.getInstanceId().equals(this.modelName)) { this.state = ModelState.TERMINATED; } else { this.state = ModelState.ERROR; System.err.println("Model not killed correctly, model = " + this.modelName); System.err.println("Error message = " + message.getError()); } } catch (Exception exception) { this.state = ModelState.ERROR; System.err.println("Model not killed correctly, error = " + exception.getClass().getSimpleName()); System.err.println("Error message = " + exception.getMessage()); } } /** * @return statistics */ public final Map getStatistics() { return this.statistics; } } }