package org.opentrafficsim.demo.web;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
/**
* The LoadBalancer start listening on the given port for messages. It is started with a map of network nodes where
* FederateStarters can start models, and a capacity of each node.
*
* The program is called as follows: java -jar LocalLoadBalancer.jar key1=value1 key2=value2 ... The following keys are defined:
*
* - port: the port on which the LoadBalancer listens
* - nodeFile: the file that contains the node information
*
* The nodeFile is expected to be tab-delimited with the following columns (and an example):
*
*
* nodeAddress fsPort maxLoad priority
* 10.0.0.121 5500 8 10
* 10.0.0.122 5500 8 10
* 10.0.0.123 5500 8 10
* 127.0.0.1 5500 8 5
*
*
* Where nodeAddress is the network name of the node or its IP address, fsPort is the port where the FederateStarter can be
* found on the node, maxLoad is an int indicating the maximum number of jobs the node can handle, and priority indicates which
* nodes should be scheduled first (nodes of lower priority are only scheduled when high priority nodes are full). The example
* above shows three nodes with priority 10, and the localhost with a lower priority. So only when 24 federates are running,
* localhost will be used.
*
* Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See OTS License.
*
* @author Alexander Verbraeck
*/
public class SimpleLoadBalancer
{
// /** the port number to listen on. */
// private final int lbPort;
// /** the nodes with federate starters that can be used. */
// private final Set federateStarterNodes;
/** the 0mq socket. */
private ZMQ.Socket lbSocket;
/** the 0mq context. */
private ZContext lbContext;
/** message count. */
private long messageCount = 0;
// /**
// * @param lbPort the port number to listen on
// * @param federateStarterNodes the nodes with federate starters that can be used
// * @throws Sim0MQException on error
// * @throws SerializationException on error
// */
// public SimpleLoadBalancer(final int lbPort, final Set federateStarterNodes)
// throws Sim0MQException, SerializationException
// {
// this.lbPort = lbPort;
// this.federateStarterNodes = federateStarterNodes;
//
// this.lbContext = new ZContext(1);
//
// this.lbSocket = this.lbContext.createSocket(SocketType.ROUTER);
// this.lbSocket.bind("tcp://*:" + this.lbPort);
//
// while (!Thread.currentThread().isInterrupted())
// {
// // Wait for next request from the [web] client -- first the identity (String) and the delimiter (#0)
// String identity = this.lbSocket.recvStr();
// this.lbSocket.recvStr();
//
// byte[] request = this.lbSocket.recv(0);
// Object[] fields = SimulationMessage.decode(request);
// String messageTypeId = fields[4].toString();
// String receiverId = fields[3].toString();
//
// System.out.println("Received " + SimulationMessage.print(fields));
//
// if (receiverId.equals("LB"))
// {
// switch (messageTypeId)
// {
// case "LB.1":
// processStartModel(identity, fields);
// break;
//
// case "LB.2":
// processModelKilled(identity, fields);
// break;
//
// default:
// // wrong message
// System.err.println("Received unknown message -- not processed: " + messageTypeId);
// }
// }
// else
// {
// // wrong receiver
// System.err.println("Received message not intended for LB but for " + receiverId + " -- not processed: ");
// }
// }
// this.lbSocket.close();
// this.lbContext.destroy();
// }
//
// /**
// * Process MC.X message and send LB.Y message back.
// * @param identity reply id for REQ-ROUTER pattern
// * @param fields the message
// * @throws Sim0MQException on error
// * @throws SerializationException on error
// */
// private void processStartFederateStarter(final String identity, final Object[] fields)
// throws Sim0MQException, SerializationException
// {
// StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "LB");
// String error = "";
//
// int modelPort = findFreePortNumber();
//
// if (modelPort == -1)
// {
// error = "No free port number";
// }
//
// else
//
// {
// try
// {
// ProcessBuilder pb = new ProcessBuilder();
//
// Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
// pb.directory(workingPath.toFile());
//
// String softwareCode = "";
// if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
// {
// System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
// + " in software properties file");
// }
// else
// {
// softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
//
// List pbArgs = new ArrayList<>();
// pbArgs.add(softwareCode);
// pbArgs.add(startFederateMessage.getArgsBefore());
// pbArgs.add(startFederateMessage.getModelPath());
// pbArgs.addAll(Arrays.asList(
// startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
// pb.command(pbArgs);
//
// String stdIn = startFederateMessage.getRedirectStdin();
// String stdOut = startFederateMessage.getRedirectStdout();
// String stdErr = startFederateMessage.getRedirectStderr();
//
// if (stdIn.length() > 0)
// {
// // TODO working dir path if not absolute?
// File stdInFile = new File(stdIn);
// pb.redirectInput(stdInFile);
// }
//
// if (stdOut.length() > 0)
// {
// // TODO working dir path if not absolute?
// File stdOutFile = new File(stdOut);
// pb.redirectOutput(stdOutFile);
// }
//
// if (stdErr.length() > 0)
// {
// // TODO working dir path if not absolute?
// File stdErrFile = new File(stdErr);
// pb.redirectError(stdErrFile);
// }
//
// new Thread()
// {
// /** {@inheritDoc} */
// @Override
// public void run()
// {
// try
// {
// Process process = pb.start();
// SimpleLoadBalancer.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
// System.err.println("Process started:" + process.isAlive());
// }
// catch (IOException exception)
// {
// exception.printStackTrace();
// }
// }
// }.start();
//
// this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
// this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
//
// // Thread.sleep(1000);
//
// // wait till the model is ready...
// error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
// modelPort);
// }
// }
// catch (IOException exception)
// {
// exception.printStackTrace();
// error = exception.getMessage();
// }
// }
//
// System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
//
// // Send reply back to client
// this.lbSocket.sendMore(identity);
// this.lbSocket.sendMore("");
// //@formatter:off
// byte[] fs2Message = new FederateStartedMessage.Builder()
// .setSimulationRunId(startFederateMessage.getSimulationRunId())
// .setInstanceId(startFederateMessage.getInstanceId())
// .setSenderId("FS")
// .setReceiverId(startFederateMessage.getSenderId())
// .setMessageId(++this.messageCount)
// .setStatus(error.isEmpty() ? "started" : "error")
// .setError(error)
// .setModelPort(modelPort)
// .build()
// .createByteArray();
// this.lbSocket.send(fs2Message, 0);
// //@formatter:on
// }
//
// /**
// * Find a free port for the model.
// * @return the first free fort number in the range startPort - endPort, inclusive
// */
// private int findFreePortNumber()
// {
// for (int port = this.startPort; port <= this.endPort; port++)
// {
// if (!this.modelPortMap.containsValue(port))
// {
// // try if the port is really free
// ZMQ.Socket testSocket = null;
// try
// {
// testSocket = this.lbContext.createSocket(SocketType.REP);
// testSocket.bind("tcp://127.0.0.1:" + port);
// testSocket.unbind("tcp://127.0.0.1:" + port);
// testSocket.close();
// return port;
// }
// catch (Exception exception)
// {
// // port was not free
// if (testSocket != null)
// {
// try
// {
// testSocket.close();
// }
// catch (Exception e)
// {
// // ignore.
// }
// }
// }
// }
// }
// return -1;
// }
//
// /**
// * Wait for simulation to end using status polling with message FM.5.
// * @param federationRunId the name of the federation
// * @param modelId the String id of the model
// * @param modelPort port on which the model is listening
// * @return empty String for no error, filled String for error
// * @throws Sim0MQException on error
// * @throws SerializationException on error
// */
// private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
// throws Sim0MQException, SerializationException
// {
// boolean ok = true;
// String error = "";
// ZMQ.Socket modelSocket = null;
// try
// {
// modelSocket = this.lbContext.createSocket(SocketType.REQ);
// modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
// modelSocket.connect("tcp://127.0.0.1:" + modelPort);
// }
// catch (Exception exception)
// {
// exception.printStackTrace();
// ok = false;
// error = exception.getMessage();
// }
//
// boolean started = false;
// while (ok && !started)
// {
// byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
// MessageStatus.NEW);
// modelSocket.send(fs1Message, 0);
//
// byte[] reply = modelSocket.recv(0);
// Object[] replyMessage = SimulationMessage.decode(reply);
// System.out.println("Received\n" + SimulationMessage.print(replyMessage));
//
// if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
// && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
// {
// if (replyMessage[9].toString().equals("started"))
// {
// started = true;
// }
// else
// {
// // wait a second
// try
// {
// Thread.sleep(100);
// }
// catch (InterruptedException ie)
// {
// // ignore
// }
// }
// }
// else
// {
// ok = false;
// error = replyMessage[10].toString();
// System.err.println("Simulation start error -- status = " + replyMessage[9]);
// System.err.println("Error message = " + replyMessage[10]);
// }
// }
//
// if (modelSocket != null)
// {
// modelSocket.close();
// }
//
// return error;
// }
//
// /**
// * Process FM.8 message and send FS.4 message back.
// * @param identity reply id for REQ-ROUTER pattern
// * @param fields the message
// * @throws Sim0MQException on error
// * @throws SerializationException on error
// */
// private void processKillFederateStarter(final String identity, final Object[] fields)
// throws Sim0MQException, SerializationException
// {
// boolean status = true;
// String error = "";
//
// Object federationRunId = fields[1];
// String senderId = fields[2].toString();
//
// String modelId = fields[8].toString();
// if (!this.modelPortMap.containsKey(modelId))
// {
// status = false;
// error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
// }
// else
// {
// int modelPort = this.modelPortMap.remove(modelId);
// Process process = this.runningProcessMap.remove(modelId);
//
// try
// {
// try
// {
// ZMQ.Socket modelSocket = this.lbContext.createSocket(SocketType.REQ);
// modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
// modelSocket.connect("tcp://127.0.0.1:" + modelPort);
//
// byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
// ++this.messageCount, MessageStatus.NEW);
// modelSocket.send(fs3Message, 0);
//
// modelSocket.close();
// }
// catch (Exception exception)
// {
// exception.printStackTrace();
// status = true;
// error = exception.getMessage();
// }
//
// try
// {
// Thread.sleep(100);
// }
// catch (InterruptedException ie)
// {
// // ignore
// }
//
// if (process != null && process.isAlive())
// {
// process.destroyForcibly();
// }
//
// StartFederateMessage sfm = this.startFederateMessages.get(modelId);
// if (sfm.isDeleteStdout())
// {
// if (sfm.getRedirectStdout().length() > 0)
// {
// File stdOutFile = new File(sfm.getRedirectStdout());
// stdOutFile.delete();
// }
// }
//
// if (sfm.isDeleteStderr())
// {
// if (sfm.getRedirectStderr().length() > 0)
// {
// File stdErrFile = new File(sfm.getRedirectStderr());
// stdErrFile.delete();
// }
// }
//
// if (sfm.isDeleteWorkingDirectory())
// {
// File workingDir = new File(sfm.getWorkingDirectory());
// workingDir.delete();
// }
// }
// catch (Exception exception)
// {
// exception.printStackTrace();
// status = false;
// error = exception.getMessage();
// }
//
// byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
// MessageStatus.NEW, modelId, status, error);
// this.lbSocket.sendMore(identity);
// this.lbSocket.sendMore("");
// this.lbSocket.send(fs4Message, 0);
// }
// }
//
// /**
// * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
// * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
// * @param args the federation name and port on which the FederateStarter is listening
// * @throws Sim0MQException on error
// * @throws SerializationException on error
// */
// public static void main(final String[] args) throws Sim0MQException, SerializationException
// {
// if (args.length < 4)
// {
// System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
// System.exit(-1);
// }
//
// String sPort = args[0];
// int port = 0;
// try
// {
// port = Integer.parseInt(sPort);
// }
// catch (NumberFormatException nfe)
// {
// System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
// System.exit(-1);
// }
// if (port == 0 || port > 65535)
// {
// System.err.println("PortNumber should be between 1 and 65535");
// System.exit(-1);
// }
//
// String propertiesFile = args[1];
// Properties softwareProperties = new Properties();
// InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
// try
// {
// softwareProperties.load(propertiesStream);
// }
// catch (IOException | NullPointerException e)
// {
// System.err.println("Could not find or read software properties file " + propertiesFile);
// System.exit(-1);
// }
//
// String sStartPort = args[2];
// int startPort = 0;
// try
// {
// startPort = Integer.parseInt(sStartPort);
// }
// catch (NumberFormatException nfe)
// {
// System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
// System.exit(-1);
// }
// if (startPort == 0 || startPort > 65535)
// {
// System.err.println("startPort should be between 1 and 65535");
// System.exit(-1);
// }
//
// String sEndPort = args[3];
// int endPort = 0;
// try
// {
// endPort = Integer.parseInt(sEndPort);
// }
// catch (NumberFormatException nfe)
// {
// System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
// System.exit(-1);
// }
// if (endPort == 0 || endPort > 65535)
// {
// System.err.println("endPort should be between 1 and 65535");
// System.exit(-1);
// }
//
// new SimpleLoadBalancer(port, softwareProperties, startPort, endPort);
// }
//
// /** Record with information about the nodes that have a FederateStarter running. */
// static class FederateStarterNode implements Comparable
// {
// /** the node name or IP address where the Federate Starter resides. */
// private final String fsNodeName;
//
// /** the port of the Federate Starter. */
// private final int port;
//
// /** the maximum load on the node (e.g., the maximum number of concurrent models). */
// private final int maxLoad;
//
// /** the current load on the node. */
// private int currentLoad = 0;
//
// /** the priority of the node. Higher value is higher priority. */
// private int priority;
//
// /**
// * @param fsNodeName String; the node name or IP address where the Federate Starter resides
// * @param port int; the port of the Federate Starter
// * @param maxLoad int; the maximum load on the node (e.g., the maximum number of concurrent models)
// * @param priority int; the priority of the node. Higher value is higher priority.
// */
// FederateStarterNode(final String fsNodeName, final int port, final int maxLoad, final int priority)
// {
// this.fsNodeName = fsNodeName;
// this.port = port;
// this.maxLoad = maxLoad;
// this.priority = priority;
// }
//
// /**
// * @return currentLoad
// */
// public final int getCurrentLoad()
// {
// return this.currentLoad;
// }
//
// /**
// * @param currentLoad set currentLoad
// */
// public final void setCurrentLoad(final int currentLoad)
// {
// this.currentLoad = currentLoad;
// }
//
// /**
// * @return priority
// */
// public final int getPriority()
// {
// return this.priority;
// }
//
// /**
// * @param priority set priority
// */
// public final void setPriority(final int priority)
// {
// this.priority = priority;
// }
//
// /**
// * @return fsNodeName
// */
// public final String getFsNodeName()
// {
// return this.fsNodeName;
// }
//
// /**
// * @return port
// */
// public final int getPort()
// {
// return this.port;
// }
//
// /**
// * @return maxLoad
// */
// public final int getMaxLoad()
// {
// return this.maxLoad;
// }
//
// /** {@inheritDoc} */
// @Override
// public int hashCode()
// {
// final int prime = 31;
// int result = 1;
// result = prime * result + ((this.fsNodeName == null) ? 0 : this.fsNodeName.hashCode());
// result = prime * result + this.port;
// return result;
// }
//
// /** {@inheritDoc} */
// @SuppressWarnings("checkstyle:needbraces")
// @Override
// public boolean equals(final Object obj)
// {
// if (this == obj)
// return true;
// if (obj == null)
// return false;
// if (getClass() != obj.getClass())
// return false;
// FederateStarterNode other = (FederateStarterNode) obj;
// if (this.fsNodeName == null)
// {
// if (other.fsNodeName != null)
// return false;
// }
// else if (!this.fsNodeName.equals(other.fsNodeName))
// return false;
// if (this.port != other.port)
// return false;
// return true;
// }
//
// /** {@inheritDoc} */
// @SuppressWarnings("checkstyle:needbraces")
// @Override
// public int compareTo(final FederateStarterNode o)
// {
// // Compares this object with the specified object for order. Returns a
// // negative integer, zero, or a positive integer as this object is less
// // than, equal to, or greater than the specified object.
// if (this.equals(o))
// return 0;
//
// // higher priority number means higher priority
// if (this.priority > o.priority)
// return 1;
// if (this.priority < o.priority)
// return -1;
//
// // higher remaining load means higher priority
// if (this.maxLoad - this.currentLoad > o.maxLoad - o.currentLoad)
// return 1;
// if (this.maxLoad - this.currentLoad < o.maxLoad - o.currentLoad)
// return -1;
//
// if (this.hashCode() > o.hashCode())
// return 1;
// if (this.hashCode() < o.hashCode())
// return -1;
// //
// return 0;
// }
// }
}