package nl.tno.imb; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; import java.util.Arrays; /** * The connection to the framework and starting point to use IMB. All further actions are started through a new object of this * class. Main actions are:
* subscribe to events to receive events
* publish events to send events (called signal). if autoPublish is true you start sending events immediately
* set global framework variables
* send streams over the framework
* set/update the current status for the connected client
* optionally define the owner (specific: connected model name and id)
* optionally set specific socket and connection options
*
* Subscribe and publish return TEventEntry objects that can be used to set handlers for receiving specific events.
* Default a reading thread is started to handle all socket reading. The thread calls registered event handlers for the received * events.
* @author hans.cornelissen@tno.nl */ public class TConnection { // constructors /** * Create an IMB connection to the framework * @param aRemoteHost String; IP address or DNS name of the IMB hub to connect to * @param aRemotePort int; TCP port of the IMB hub to connect to * @param aOwnerName String; optional description of the connecting client * @param aOwnerID int; optional id of the connecting client * @param aFederation federation to connect with; this is default prefixed to subscribed and published event names */ public TConnection(String aRemoteHost, int aRemotePort, String aOwnerName, int aOwnerID, String aFederation) { this(aRemoteHost, aRemotePort, aOwnerName, aOwnerID, aFederation, true); } /** * Create an IMB connection to the framework * @param aRemoteHost String; IP address or DNS name of the IMB hub to connect to * @param aRemotePort int; TCP port of the IMB hub to connect to * @param aOwnerName String; optional description of the connecting client * @param aOwnerID int; optional id of the connecting client * @param aFederation federation to connect with; this is default prefixed to subscribed and published event names * @param aStartReadingThread boolean; use an internal reader thread for processing events and commands from the connected * hub */ public TConnection(String aRemoteHost, int aRemotePort, String aOwnerName, int aOwnerID, String aFederation, boolean aStartReadingThread) { this.ffederation = aFederation; this.fownerName = aOwnerName; this.fownerID = aOwnerID; open(aRemoteHost, aRemotePort, aStartReadingThread); } // destructor protected void finalize() { close(); } // internals/privates /** event id's on the hub are different from local id's. this list is used to translate incoming hub event id's */ private class TEventTranslation { public final static int INVALID_TRANSLATED_EVENT_ID = -1; private int[] feventTranslation; public TEventTranslation() { this.feventTranslation = new int[32]; // mark all entries as invalid for (int i = 0; i < this.feventTranslation.length; i++) this.feventTranslation[i] = INVALID_TRANSLATED_EVENT_ID; } public int getTranslateEventID(int aRxEventID) { if ((0 <= aRxEventID) && (aRxEventID < this.feventTranslation.length)) return this.feventTranslation[aRxEventID]; else return INVALID_TRANSLATED_EVENT_ID; } public void setEventTranslation(int aRxEventID, int aTxEventID) { if (aRxEventID >= 0) { // grow event translation list until it can contain the // requested id while (aRxEventID >= this.feventTranslation.length) { int FormerSize = this.feventTranslation.length; // resize event translation array to double the size this.feventTranslation = Arrays.copyOf(this.feventTranslation, this.feventTranslation.length * 2); // mark all new entries as invalid for (int i = FormerSize; i < this.feventTranslation.length; i++) this.feventTranslation[i] = INVALID_TRANSLATED_EVENT_ID; } this.feventTranslation[aRxEventID] = aTxEventID; } } } /** internal list of all local events */ private class TEventEntryList { TEventEntryList(int aInitialSize) { this.FCount = 0; this.fevents = new TEventEntry[aInitialSize]; } private TEventEntry[] fevents; private int FCount = 0; public TEventEntry getEventEntry(int aEventID) { if (0 <= aEventID && aEventID < this.FCount) return this.fevents[aEventID]; else return null; } public String getEventName(int aEventID) { if (0 <= aEventID && aEventID < this.FCount) { if (this.fevents[aEventID] != null) return this.fevents[aEventID].getEventName(); else return null; } else return ""; } public TEventEntry addEvent(TConnection aConnection, String aEventName) { this.FCount++; if (this.FCount > this.fevents.length) this.fevents = Arrays.copyOf(this.fevents, this.fevents.length * 2); this.fevents[this.FCount - 1] = new TEventEntry(aConnection, this.FCount - 1, aEventName); return this.fevents[this.FCount - 1]; } public TEventEntry getEventEntryOnName(String aEventName) { int i = this.FCount - 1; while (i >= 0 && !getEventName(i).equals(aEventName)) i--; if (i >= 0) return this.fevents[i]; else return null; } } // TODO: description public static final String EVENT_FILTER_POST_FIX = "*"; /** postfix of the variable name for the model status */ private static final String MODEL_Status_VAR_NAME = "ModelStatus"; /** separator char for the postfix of the variable name for the model status */ private static final String MODEL_STATUS_VAR_SEP_CHAR = "|"; // constants /** magic bytes to identify the start of a valid IMB packet */ public static final byte[] MAGIC_BYTES = new byte[] {0x2F, 0x47, 0x61, 0x71, (byte) 0x95, (byte) 0xAD, (byte) 0xC5, (byte) 0xFB}; // private static final long MagicBytesInt64 = 0xFBC5AD957161472FL; /** magic bytes to identify the end of the payload on a valid IMB packet (as 32 bit integer) */ private static final int MAGIC_STRING_CHECK_INT32 = 0x10F13467; /** magic bytes to identify the end of the payload on a valid IMB packet (as array of bytes) */ public static final byte[] MAGIC_STRING_CHECK = new byte[] {0x67, 0x34, (byte) 0xF1, 0x10}; // fields /** TCP Socket the connection is based on */ private Socket fsocket = null; /** output stream linked to the socket */ private OutputStream foutputStream = null; /** input stream linked to the socket */ private InputStream finputStream = null; /** address the socket is connected to */ private String fremoteHost = ""; /** TCP port the socket is connected to */ private int fremotePort = 0; /** optional reader thread */ private Thread freadingThread = null; /** event id's on the hub are different from local id's. this list is used to translate incoming hub event id's */ private TEventTranslation feventTranslation = new TEventTranslation(); /** internal list of all local events */ private TEventEntryList feventEntryList = new TEventEntryList(8); /** active federation */ private String ffederation = DEFAULT_FEDERATION; // standard event references private static final String FOCUS_EVENT_NAME = "Focus"; private TEventEntry ffocusEvent = null; private static final String CHANGE_FEDERATION_EVENT_NAME = "META_CurrentSession"; private TEventEntry fchangeFederationEvent = null; private TEventEntry flogEvent = null; // broker time // private long fbrokerAbsoluteTime = 0; // private int fbrokerTick = 0; // private int fbrokerTickDelta = 0; private int funiqueClientID = 0; private int fclientHandle = 0; private int fownerID = 0; private String fownerName = ""; private TEventEntry eventIDToEventL(int aEventID) { synchronized (this.feventEntryList) { return this.feventEntryList.getEventEntry(aEventID); } } private TEventEntry addEvent(String aEventName) { int EventID = 0; TEventEntry Event; while (EventID < this.feventEntryList.FCount && !this.feventEntryList.getEventEntry(EventID).isEmpty()) EventID += 1; if (EventID < this.feventEntryList.FCount) { Event = this.feventEntryList.getEventEntry(EventID); Event.feventName = aEventName; Event.fparent = null; } else Event = this.feventEntryList.addEvent(this, aEventName); return Event; } private TEventEntry addEventL(String aEventName) { TEventEntry Event; synchronized (this.feventEntryList) { Event = addEvent(aEventName); } return Event; } private TEventEntry findOrAddEventL(String aEventName) { synchronized (this.feventEntryList) { TEventEntry Event = this.feventEntryList.getEventEntryOnName(aEventName); if (Event == null) { int EventID = 0; while (EventID < this.feventEntryList.FCount && !this.feventEntryList.getEventEntry(EventID).isEmpty()) EventID += 1; if (EventID < this.feventEntryList.FCount) { Event = this.feventEntryList.getEventEntry(EventID); Event.feventName = aEventName; } else Event = this.feventEntryList.addEvent(this, aEventName); } return Event; } } private TEventEntry findEventL(String aEventName) { synchronized (this.feventEntryList) { return this.feventEntryList.getEventEntryOnName(aEventName); } } private TEventEntry findEventParentL(String aEventName) { String ParentEventName; String EventName; ParentEventName = ""; TEventEntry Event = null; synchronized (this.feventEntryList) { for (int EventID = 0; EventID < this.feventEntryList.FCount; EventID++) { EventName = this.feventEntryList.getEventName(EventID); if (EventName.endsWith(EVENT_FILTER_POST_FIX)) { EventName = EventName.substring(0, EventName.length() - 2); if (aEventName.startsWith(EventName)) { if (ParentEventName.length() < EventName.length()) { Event = this.feventEntryList.getEventEntry(EventID); ParentEventName = EventName; } } } } return Event; } } private TEventEntry findEventAutoPublishL(String aEventName) { TEventEntry Event = findEventL(aEventName); if (Event == null && this.autoPublish) Event = publish(aEventName, false); return Event; } private int readBytesFromNetStream(TByteBuffer aBuffer) { try { int Count = 0; int NumBytesRead = -1; while (aBuffer.getwriteAvailable() > 0 && NumBytesRead != 0) { NumBytesRead = this.finputStream.read(aBuffer.getBuffer(), aBuffer.getWriteCursor(), aBuffer.getwriteAvailable()); aBuffer.written(NumBytesRead); Count += NumBytesRead; } return Count; } catch (IOException ex) { return 0; // signal connection error } } // function returns payload of command, fills found command and returns // problems during read in result // commandmagic + command + payloadsize [ + payload + payloadmagic] private int readCommand(TByteBuffer aFixedCommandPart, TByteBuffer aPayload, TByteBuffer aPayloadCheck) throws IOException { aFixedCommandPart.clear(aFixedCommandPart.getLength()); int NumBytesRead = readBytesFromNetStream(aFixedCommandPart); // int NumBytesRead = finputStream.read(aFixedCommandPart.getBuffer(), 0, aFixedCommandPart.getLength()); if (NumBytesRead > 0) { while (!aFixedCommandPart.compare(MAGIC_BYTES, 0)) { int rbr = this.finputStream.read(); // skipped bytes because of invalid magic in read command if (rbr != -1) aFixedCommandPart.shiftLeftOneByte((byte) rbr); else return TEventEntry.IC_END_OF_SESSION; } // we found the magic in the stream int aCommand = aFixedCommandPart.peekInt32(MAGIC_BYTES.length); int PayloadSize = aFixedCommandPart.peekInt32(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32); if (PayloadSize <= MAX_PAYLOAD_SIZE) { aPayload.clear(PayloadSize); if (PayloadSize > 0) { int Len = readBytesFromNetStream(aPayload); if (Len == aPayload.getLength()) { // NumBytesRead = finputStream.read(aPayloadCheck.getBuffer(), 0, aPayloadCheck.getLength()); aPayloadCheck.clear(TByteBuffer.SIZE_OF_INT32); NumBytesRead = readBytesFromNetStream(aPayloadCheck); if (NumBytesRead == TByteBuffer.SIZE_OF_INT32 && aPayloadCheck.compare(MAGIC_STRING_CHECK, 0)) return aCommand; else return TEventEntry.IC_INVALID_COMMAND; } else // error, payload size mismatch return TEventEntry.IC_INVALID_COMMAND; } else return aCommand; // OK, no payload } else return TEventEntry.IC_INVALID_COMMAND; // error, payload is over max size } else return TEventEntry.IC_END_OF_SESSION; // error, no valid connection } /** place holder for the Lock for writing commands to the framework */ private Integer fwiteCommandLock = new Integer(0); /** * Write a single command to the framework * @param aCommand int; * @param aPayload byte[]; * @return see ICE_* constants */ protected int writeCommand(int aCommand, byte[] aPayload) { synchronized (this.fwiteCommandLock) { TByteBuffer Buffer = new TByteBuffer(); Buffer.prepare(MAGIC_BYTES); Buffer.prepare(aCommand); if ((aPayload != null) && (aPayload.length > 0)) { Buffer.prepare(aPayload.length); Buffer.prepare(aPayload); Buffer.prepare(MAGIC_STRING_CHECK_INT32); } else Buffer.prepare((int) 0); // payload size=0 Buffer.prepareApply(); Buffer.qWrite(MAGIC_BYTES); Buffer.qWrite(aCommand); if ((aPayload != null) && (aPayload.length > 0)) { Buffer.qWrite(aPayload.length); Buffer.qWrite(aPayload); Buffer.qWrite(MAGIC_STRING_CHECK_INT32); } else Buffer.qWrite((int) 0); // send buffer over socket if (isConnected()) { try { this.foutputStream.write(Buffer.getBuffer(), 0, Buffer.getLength()); return Buffer.getLength(); } catch (Exception ex) { close(); return ICE_CONNECTION_CLOSED; } } else { return ICE_CONNECTION_CLOSED; } } } protected String prefixFederation(String aName) { return prefixFederation(aName, true); } protected String prefixFederation(String aName, boolean aUseFederationPrefix) { if (!this.ffederation.equals("") && aUseFederationPrefix) return this.ffederation + "." + aName; else return aName; } /** * Main framework command dispatcher * @param aCommand int; * @param aPayload TByteBuffer; */ private void handleCommand(int aCommand, TByteBuffer aPayload) { switch (aCommand) { case TEventEntry.IC_EVENT: handleCommandEvent(aPayload); break; case TEventEntry.IC_SET_VARIABLE: handleCommandVariable(aPayload); break; case TEventEntry.IC_SET_EVENT_ID_TRANSLATION: this.feventTranslation.setEventTranslation(aPayload.peekInt32(0, TEventTranslation.INVALID_TRANSLATED_EVENT_ID), aPayload.peekInt32(TByteBuffer.SIZE_OF_INT32, TEventTranslation.INVALID_TRANSLATED_EVENT_ID)); break; case TEventEntry.IC_UNIQUE_CLIENT_ID: this.funiqueClientID = aPayload.readInt32(); this.fclientHandle = aPayload.readInt32(); break; /* * case icTimeStamp: // ignore for now, only when using and syncing local time (we trust hub time for now) * fbrokerAbsoluteTime = aPayload.ReadInt64(); fbrokerTick = aPayload.ReadInt32(); fbrokerTickDelta = * aPayload.ReadInt32(); break; */ case TEventEntry.IC_EVENT_NAMES: handleCommandEventNames(aPayload); break; case TEventEntry.IC_END_OF_SESSION: close(); break; case TEventEntry.IC_SUBSCRIBE: case TEventEntry.IC_PUBLISH: case TEventEntry.IC_UNSUBSCRIBE: case TEventEntry.IC_UNPUBLISH: handleSubAndPub(aCommand, aPayload); break; default: handleCommandOther(aCommand, aPayload); break; } } private void handleCommandEvent(TByteBuffer aPayload) { int TxEventID = this.feventTranslation.getTranslateEventID(aPayload.readInt32()); if (TxEventID != TEventTranslation.INVALID_TRANSLATED_EVENT_ID) eventIDToEventL(TxEventID).handleEvent(aPayload); } private void handleCommandVariable(TByteBuffer aPayload) { if (this.onVariable != null || this.onStatusUpdate != null) { String VarName = aPayload.readString(); // check if it is a status update // TODO: model name is prefixed by federation. Is this correct? if (VarName.toUpperCase().endsWith(MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME.toUpperCase())) { VarName = VarName.substring(0, VarName.length() - (MODEL_STATUS_VAR_SEP_CHAR.length() + MODEL_Status_VAR_NAME.length())); String ModelName = VarName.substring(8, VarName.length()); String ModelUniqueClientID = VarName.substring(0, 8); aPayload.readInt32(); int Status = aPayload.readInt32(-1); int Progress = aPayload.readInt32(-1); if (this.onStatusUpdate != null) this.onStatusUpdate.dispatch(this, ModelUniqueClientID, ModelName, Progress, Status); } else { TByteBuffer VarValue = aPayload.readByteBuffer(); TByteBuffer PrevValue = new TByteBuffer(); if (this.onVariable != null) this.onVariable.dispatch(this, VarName, VarValue.getBuffer(), PrevValue.getBuffer()); } } } private void handleCommandEventNames(TByteBuffer aPayload) { if (this.onEventNames != null) { int ec = aPayload.readInt32(); TEventNameEntry[] EventNames = new TEventNameEntry[ec]; for (int en = 0; en < EventNames.length; en++) { EventNames[en] = new TEventNameEntry(); EventNames[en].eventName = aPayload.readString(); EventNames[en].publishers = aPayload.readInt32(); EventNames[en].subscribers = aPayload.readInt32(); EventNames[en].timers = aPayload.readInt32(); } this.onEventNames.dispatch(this, EventNames); } } private void handleSubAndPub(int aCommand, TByteBuffer aPayload) { String EventName; TEventEntry EE; TEventEntry EP; switch (aCommand) { case TEventEntry.IC_SUBSCRIBE: case TEventEntry.IC_PUBLISH: aPayload.readInt32(); // event id aPayload.readInt32(); // event entry type EventName = aPayload.readString(); EE = findEventL(EventName); if (EE == null) { // find parent EP = findEventParentL(EventName); if (EP != null) { EE = addEventL(EventName); EE.fparent = EP; EE.copyHandlersFrom(EP); } } else { if ((this.onSubAndPub != null) && !EE.isEmpty()) this.onSubAndPub.dispatch(this, aCommand, EventName); } if (EE != null) EE.handleSubAndPub(aCommand); break; case TEventEntry.IC_UNSUBSCRIBE: case TEventEntry.IC_UNPUBLISH: EventName = aPayload.readString(); if (this.onSubAndPub != null) this.onSubAndPub.dispatch(this, aCommand, EventName); EE = findEventL(EventName); if (EE != null) EE.handleSubAndPub(aCommand); break; } } protected void handleCommandOther(int aCommand, TByteBuffer aPayload) { // override to implement protocol extensions } private int requestUniqueClientID() { TByteBuffer Payload = new TByteBuffer(); Payload.prepare((int) 0); Payload.prepare((int) 0); Payload.prepareApply(); Payload.qWrite((int) 0); Payload.qWrite((int) 0); return writeCommand(TEventEntry.IC_UNIQUE_CLIENT_ID, Payload.getBuffer()); } private int setOwner() { if (isConnected()) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(this.fownerID); Payload.prepare(this.fownerName); Payload.prepareApply(); Payload.qWrite(this.fownerID); Payload.qWrite(this.fownerName); return writeCommand(TEventEntry.IC_SET_CLIENT_INFO, Payload.getBuffer()); } else return ICE_CONNECTION_CLOSED; } private void readCommands() { // TODO: more like Delphi code int Command = TEventEntry.IC_INVALID_COMMAND; // define once // magic + command + payload size TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32 + TByteBuffer.SIZE_OF_INT32); TByteBuffer Payload = new TByteBuffer(); TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32); do { try { try { Command = readCommand(FixedCommandPart, Payload, PayloadCheck); if (Command != TEventEntry.IC_INVALID_COMMAND) handleCommand(Command, Payload); } catch (ThreadDeath ex) { Command = TEventEntry.IC_END_OF_SESSION; } } catch (Exception ex) { if (isConnected()) { System.out.println("## Exception in ReadCommands loop: " + ex.getMessage()); ex.printStackTrace(); } } } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected()); } protected enum TConnectionState { icsUninitialized(0), icsInitialized(1), icsClient(2), icsHub(3), icsEnded(4), // room for extensions .. // gateway values are used over network and should be same over all connected clients/brokers icsGateway(100), // equal icsGatewayClient(101), // this gateway acts as a client; subscribes are not received icsGatewayServer(102); // this gateway treats connected broker as client public final int value; TConnectionState(int aValue) { this.value = aValue; } } protected void setState(TConnectionState aState) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aState.ordinal()); Payload.prepareApply(); Payload.qWrite(aState.ordinal()); writeCommand(TEventEntry.IC_SET_STATE, Payload.getBuffer()); } protected boolean open(String aHost, int aPort) { return open(aHost, aPort, true); } protected boolean open(String aHost, int aPort, boolean aStartReadingThread) { close(); try { this.fremoteHost = aHost; this.fremotePort = aPort; this.fsocket = new Socket(this.fremoteHost, this.fremotePort); if (this.fsocket.isConnected()) { this.foutputStream = this.fsocket.getOutputStream(); this.finputStream = this.fsocket.getInputStream(); // FClient.Connect(FRemoteHost, FRemotePort); // FNetStream = FClient.GetStream(); if (aStartReadingThread) { this.freadingThread = new Thread(new Runnable() { public void run() { readCommands(); } }); this.freadingThread.setName("imb command reader"); this.freadingThread.start(); } if (this.imb2Compatible) requestUniqueClientID(); // SetState(State.icsClient); setOwner(); // request all variables if delegates defined if (this.onVariable != null || this.onStatusUpdate != null) writeCommand(TEventEntry.IC_ALL_VARIABLES, null); } return this.fsocket.isConnected(); } catch (Exception ex) { return false; } } // publics /** returned object (array) on an event name list query */ public class TEventNameEntry { public String eventName; public int publishers; public int subscribers; public int timers; } public enum TVarPrefix { vpUniqueClientID, vpClientHandle } // constants /** The maximum size of the payload in a low level IMB command */ public static final int MAX_PAYLOAD_SIZE = 10 * 1024 * 1024; // in bytes /** value to be used when no specific remote server is used */ public static final String DEFAULT_HUB = "localhost"; /** value to be used when no specific port is used */ public static final int DEFAULT_PORT = 4000; /** value to be used when no specific federation is used */ public static final String DEFAULT_FEDERATION = "TNOdemo"; // command results /** command result: the connection is closed */ public static final int ICE_CONNECTION_CLOSED = -1; /** command result: the event was not published */ public static final int ICE_EVENT_NOT_PUBLISHED = -2; // fields /** Returns the current federation */ public String getFederation() { return this.ffederation; } /** * Set the current federation. All subscribed and published events are unsubscribed/unpublished, then the federation is * changed and all previously subscribed/publuished events are re-subscribed/re-published * @param aFederation String; the new federation */ public void setFederation(String aFederation) { String OldFederation = this.ffederation; TEventEntry Event; if (isConnected() && (OldFederation != "")) { // un-publish and un-subscribe all for (int i = 0; i < this.feventEntryList.FCount; i++) { String EventName = this.feventEntryList.getEventName(i); if (!EventName.equals("") && EventName.startsWith(OldFederation + ".")) { Event = this.feventEntryList.getEventEntry(i); if (Event.isSubscribed()) Event.unSubscribe(false); if (Event.isPublished()) Event.unPublish(false); } } } this.ffederation = aFederation; if (isConnected() && (OldFederation != "")) { // publish and subscribe all for (int i = 0; i < this.feventEntryList.FCount; i++) { String EventName = this.feventEntryList.getEventName(i); if (!EventName.equals("") && EventName.startsWith(OldFederation + ".")) { Event = this.feventEntryList.getEventEntry(i); Event.feventName = this.ffederation + Event.feventName.substring(0, OldFederation.length()); if (Event.isSubscribed()) Event.subscribe(); if (Event.isPublished()) Event.publish(); } } } } /** when true events send on not-publuished events are automatically published */ public boolean autoPublish = true; // TODO: check what should be the default, for now backwards compatible? /** when true IMB2 features are used if possible to emulate IMB3 behavior */ public boolean imb2Compatible = true; // connection /** Returns the IP address or DNS name of the currently connected hub */ public String getRemoteHost() { return this.fremoteHost; } /** Returns the TCP port of the currently connected hub */ public int getRemotePort() { return this.fremotePort; } /** * Returns the state of the NAGLE algorithm on the connected socket * @return if true NAGLE is disabled (default false) * @throws SocketException */ public boolean getNoDelay() throws SocketException { if (isConnected()) return this.fsocket.getTcpNoDelay(); else return false; } /** * Sets the state of the NAGLE algorithm on the socket * @param aValue boolean; if true the NAGLE algorithm is DISABLED (default false) * @throws SocketException */ public void setNoDelay(boolean aValue) throws SocketException { if (isConnected()) this.fsocket.setTcpNoDelay(aValue); } /** * Returns the status of the linger option on the connected socket * @return if true the linger option is enabled * @throws SocketException */ public boolean getLinger() throws SocketException { if (isConnected()) return this.fsocket.getSoLinger() != -1; else return false; } /** * Sets the status of the linger option on the connected socket * @param aValue boolean; if true the linger option is enabled with a linger time of 2 seconds * @throws SocketException */ public void setLinger(boolean aValue) throws SocketException { if (isConnected()) this.fsocket.setSoLinger(aValue, 2); // set linger time to 2 seconds } /** Returns the connected state of the connection */ public boolean isConnected() { return (this.fsocket != null) && this.fsocket.isConnected(); } /** Closes the connection and cleans up socket, streams and thread */ public void close() { if ((this.fsocket != null) && this.fsocket.isConnected()) { if (this.onDisconnect != null) this.onDisconnect.dispatch(this); writeCommand(TEventEntry.IC_END_OF_SESSION, null); try { this.foutputStream.close(); this.foutputStream = null; this.finputStream.close(); this.finputStream = null; this.fsocket.close(); this.fsocket = null; this.freadingThread = null; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** Override dispatch to implement a disconnect handler */ public interface TOnDisconnect { public void dispatch(TConnection aConnection); } /** Handler to be called on a disconnect */ public TOnDisconnect onDisconnect = null; /** * Throttle down buffer events send to this client if specific flags are set on events * @param aThrottle int; */ public void setThrottle(int aThrottle) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aThrottle); Payload.prepareApply(); Payload.qWrite(aThrottle); writeCommand(TEventEntry.IC_SET_THROTTLE, Payload.getBuffer()); } /** * Manually reading commands when not using a reader thread. Commands are read until connection is idle. * @throws IOException */ public void readCommandsNonBlocking() throws IOException { if (this.finputStream.available() != 0) { int Command = TEventEntry.IC_INVALID_COMMAND; // define once // magic + command + payload size TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32 + TByteBuffer.SIZE_OF_INT32); TByteBuffer Payload = new TByteBuffer(); TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32); do { try { try { Command = readCommand(FixedCommandPart, Payload, PayloadCheck); if (Command != TEventEntry.IC_INVALID_COMMAND) handleCommand(Command, Payload); } catch (ThreadDeath ex) { Command = TEventEntry.IC_END_OF_SESSION; } } catch (Exception ex) { if (isConnected()) System.out.println("## Exception in ReadCommands loop: " + ex.getMessage()); } } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected() && (this.finputStream.available() != 0)); } } /** * Manually reading commands when not using a reader thread. Commands are processed until the reading on the connection * times out * @param aTimeOut int; * @throws SocketException */ public void readCommandsNonThreaded(int aTimeOut) throws SocketException { this.fsocket.setSoTimeout(aTimeOut); int Command = TEventEntry.IC_INVALID_COMMAND; // define once // magic + command + payload size TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32 + TByteBuffer.SIZE_OF_INT32); TByteBuffer Payload = new TByteBuffer(); TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32); do { try { try { Command = readCommand(FixedCommandPart, Payload, PayloadCheck); if (Command != TEventEntry.IC_INVALID_COMMAND) handleCommand(Command, Payload); } catch (ThreadDeath ex) { Command = TEventEntry.IC_END_OF_SESSION; } } catch (Exception ex) { if (isConnected()) System.out.println("## Exception in ReadCommands loop: " + ex.getMessage()); } } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected()); } // owner /** Returns the currently specified owner id */ public int getOwnerID() { return this.fownerID; } /** * Changes the owner id * @param aValue int; the new owner id */ public void setOwnerID(int aValue) { if (this.fownerID != aValue) { this.fownerID = aValue; setOwner(); } } /** Returns the currently specified owner name */ public String getOwnerName() { return this.fownerName; } /** * Changes the owner name * @param aValue String; the new owner name */ public void setOwnerName(String aValue) { if (this.fownerName != aValue) { this.fownerName = aValue; setOwner(); } } /** Returns the unique client id the hub assigned to this connection */ public int getUniqueClientID() { return this.funiqueClientID; } /** Returns the client handle the hub assigned to this connection */ public int getClientHandle() { return this.fclientHandle; } // subscribe/publish /** * Subscribe to the specified event * @param aEventName String; the event name to subscribe to (it will be prefixed with the current federation) * @return event entry that is to be used to assign the handler for the received events */ public TEventEntry subscribe(String aEventName) { return subscribe(aEventName, true); } /** * Subscribe to the specified event * @param aEventName String; the event name to subscribe to * @param aUseFederationPrefix boolean; if true the given event name will be prefixed with the current federation * @return event entry that is to be used to assign the handler for the received events */ public TEventEntry subscribe(String aEventName, boolean aUseFederationPrefix) { TEventEntry Event = findOrAddEventL(prefixFederation(aEventName, aUseFederationPrefix)); if (!Event.isSubscribed()) Event.subscribe(); return Event; } /** * Publishes on the specified event * @param aEventName String; the event name to publish on (it will be prefixed with the current federation) * @return event entry that is to be used to signal events on */ public TEventEntry publish(String aEventName) { return publish(aEventName, true); } /** * Publishes on the specified event * @param aEventName String; the event name to publish on * @param aUseFederationPrefix boolean; if true the given event name will be prefixed with the current federation * @return event entry that is to be used to signal events on */ public TEventEntry publish(String aEventName, boolean aUseFederationPrefix) { TEventEntry Event = findOrAddEventL(prefixFederation(aEventName, aUseFederationPrefix)); if (!Event.isPublished()) Event.publish(); return Event; } /** * Unsubscribe from the specified event * @param aEventName String; the event name to unsubscribe from (it will be prefixed with the current federation) */ public void unSubscribe(String aEventName) { unSubscribe(aEventName, true); } /** * Unsubscribe from the specified event * @param aEventName String; the event name to unsubscribe from * @param aUseFederationPrefix boolean; if true the given event name will be prefixed with the current federation */ public void unSubscribe(String aEventName, boolean aUseFederationPrefix) { TEventEntry Event = findEventL(prefixFederation(aEventName, aUseFederationPrefix)); if (Event != null && Event.isSubscribed()) Event.unSubscribe(true); } /** * Unpublish on the specified event. * @param aEventName String; the event name to unpublish on (it will be prefixed with the current federation) */ public void unPublish(String aEventName) { unPublish(aEventName, true); } /** * Unpublish on the specified event. * @param aEventName String; the event name to unpublish on * @param aUseFederationPrefix boolean; if true the given event name will be prefixed with the current federation */ public void unPublish(String aEventName, boolean aUseFederationPrefix) { TEventEntry Event = findEventL(prefixFederation(aEventName, aUseFederationPrefix)); if (Event != null && Event.isPublished()) Event.unPublish(true); } /** * Send an event to the framework. This is the simple way to send events. More performance can be gained by using the * returned event entry from publish(). * @param aEventName String; * @param aEventKind int; * @param aEventPayload TByteBuffer; * @return result of the command (see ICE_* constants) */ public int signalEvent(String aEventName, int aEventKind, TByteBuffer aEventPayload) { return signalEvent(aEventName, aEventKind, aEventPayload, true); } /** * Send an event to the framework. This is the simple way to send events. More performance can be gained by using the * returned event entry from publish(). * @param aEventName String; * @param aEventKind int; * @param aEventPayload TByteBuffer; * @param aUseFederationPrefix boolean; if true the given event name will be prefixed with the current federation * @return result of the command (see ICE_* constants) */ public int signalEvent(String aEventName, int aEventKind, TByteBuffer aEventPayload, boolean aUseFederationPrefix) { TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix)); if (Event != null) return Event.signalEvent(aEventKind, aEventPayload.getBuffer()); else return ICE_EVENT_NOT_PUBLISHED; } /** * Send a buffer event to the framework. This is the simple way to send events. More performance can be gained by using the * returned event entry from publish(). * @param aEventName String; * @param aBufferID int; * @param aBuffer byte[]; * @return result of the command (see ICE_* constants) */ public int signalBuffer(String aEventName, int aBufferID, byte[] aBuffer) { return signalBuffer(aEventName, aBufferID, aBuffer, 0, true); } /** * Send a buffer event to the framework. This is the simple way to send events. More performance can be gained by using the * returned event entry from publish(). * @param aEventName String; * @param aBufferID int; * @param aBuffer byte[]; * @param aEventFlags int; * @param aUseFederationPrefix boolean; if true the given event name will be prefixed with the current federation * @return result of the command (see ICE_* constants) */ public int signalBuffer(String aEventName, int aBufferID, byte[] aBuffer, int aEventFlags, boolean aUseFederationPrefix) { TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix)); if (Event != null) return Event.signalBuffer(aBufferID, aBuffer, aEventFlags); else return ICE_EVENT_NOT_PUBLISHED; } /** * Send a ChangeObject event to the framework This is the simple way to send events. More performance can be gained by using * the returned event entry from publish(). * @param aEventName String; * @param aAction int; * @param aObjectID int; * @param aAttribute String; * @return result of the command (see ICE_* constants) */ public int signalChangeObject(String aEventName, int aAction, int aObjectID, String aAttribute) { return signalChangeObject(aEventName, aAction, aObjectID, aAttribute, true); } /** * Send a ChangeObject event to the framework This is the simple way to send events. More performance can be gained by using * the returned event entry from publish(). * @param aEventName String; * @param aAction int; * @param aObjectID int; * @param aAttribute String; * @param aUseFederationPrefix boolean; if true the given event name will be prefixed with the current federation * @return result of the command (see ICE_* constants) */ public int signalChangeObject(String aEventName, int aAction, int aObjectID, String aAttribute, boolean aUseFederationPrefix) { TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix)); if (Event != null) return Event.signalChangeObject(aAction, aObjectID, aAttribute); else return ICE_EVENT_NOT_PUBLISHED; } /** * Send a stream to the framework * @param aEventName String; * @param aStreamName String; name of the stream to identify the stream by the receiver * @param aStream InputStream; * @return result of the command (see ICE_* constants) */ public int signalStream(String aEventName, String aStreamName, InputStream aStream) { return signalStream(aEventName, aStreamName, aStream, true); } /** * Send a stream to the framework * @param aEventName String; * @param aStreamName String; * @param aStream InputStream; * @param aUseFederationPrefix boolean; * @return result of the command (see ICE_* constants) */ public int signalStream(String aEventName, String aStreamName, InputStream aStream, boolean aUseFederationPrefix) { TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix)); if (Event != null) return Event.signalStream(aStreamName, aStream); else return ICE_EVENT_NOT_PUBLISHED; } // variables /** Override dispatch to implement a variable change handler */ public interface TOnVariable { public void dispatch(TConnection aConnection, String aVarName, byte[] aVarValue, byte[] aPrevValue); } // extra calls when adding a delegate to OnVariable: requestAllVariables(); /** Handler to be called on a variable change */ private TOnVariable onVariable = null; /** * Set the callback handler for framework variable changes * @param aValue TOnVariable; */ public void setOnVariable(TOnVariable aValue) { this.onVariable = aValue; requestAllVariables(); } /** Send a request to the framework to send all variables with their contents to this client */ protected void requestAllVariables() { writeCommand(TEventEntry.IC_ALL_VARIABLES, null); // request all variables for initial values } /** * Set the value of a global framework variable * @param aVarName String; * @param aVarValue String; */ public void setVariableValue(String aVarName, String aVarValue) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aVarName); Payload.prepare(aVarValue); Payload.prepareApply(); Payload.qWrite(aVarName); Payload.qWrite(aVarValue); writeCommand(TEventEntry.IC_SET_VARIABLE, Payload.getBuffer()); } /** * Set the value of a global framework variable * @param aVarName String; * @param aVarValue TByteBuffer; */ public void setVariableValue(String aVarName, TByteBuffer aVarValue) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aVarName); Payload.prepare(aVarValue); Payload.prepareApply(); Payload.qWrite(aVarName); Payload.qWrite(aVarValue); writeCommand(TEventEntry.IC_SET_VARIABLE, Payload.getBuffer()); } /** * Set the value of a global framework variable * @param aVarName String; * @param aVarValue String; * @param aVarPrefix TVarPrefix; */ public void setVariableValue(String aVarName, String aVarValue, TVarPrefix aVarPrefix) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aVarPrefix.ordinal()); Payload.prepare(aVarName); Payload.prepare(aVarValue); Payload.prepareApply(); Payload.qWrite(aVarPrefix.ordinal()); Payload.qWrite(aVarName); Payload.qWrite(aVarValue); writeCommand(TEventEntry.IC_SET_VARIABLE_PREFIXED, Payload.getBuffer()); } /** * Set the value of a global framework variable * @param aVarName String; * @param aVarValue TByteBuffer; * @param aVarPrefix TVarPrefix; */ public void setVariableValue(String aVarName, TByteBuffer aVarValue, TVarPrefix aVarPrefix) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aVarPrefix.ordinal()); Payload.prepare(aVarName); Payload.prepare(aVarValue); Payload.prepareApply(); Payload.qWrite(aVarPrefix.ordinal()); Payload.qWrite(aVarName); Payload.qWrite(aVarValue); writeCommand(TEventEntry.IC_SET_VARIABLE_PREFIXED, Payload.getBuffer()); } /** Override dispatch to implement a status change handler */ public interface TOnStatusUpdate { public void dispatch(TConnection aConnection, String aModelUniqueClientID, String aModelName, int aProgress, int aStatus); } // TODO: extra calls when adding a delegate to OnVariable: RequestAllVariables(); /** Handler to be called on a status update */ private TOnStatusUpdate onStatusUpdate = null; /** * Set the callback handler for status updates * @param aValue TOnStatusUpdate; */ public void setOnStatusUpdate(TOnStatusUpdate aValue) { this.onStatusUpdate = aValue; requestAllVariables(); } // status for UpdateStatus /** signal client status: ready (see updateStatus) */ public final static int STATUS_READY = 0; // R /** signal client status: calculating (see updateStatus) */ public final static int STATUS_CALCULATING = 1; // C /** signal client status: busy (see updateStatus) */ public final static int STATUS_BUSY = 2; // B /** * Update the central status for this client * @param aProgress int; the progress, if available, from 0 to 100 or counting down to 0 * @param aStatus int; the current status of the client (see STATUS_* constants) * @throws InterruptedException */ public void updateStatus(int aProgress, int aStatus) throws InterruptedException { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aStatus); Payload.prepare(aProgress); Payload.prepareApply(); Payload.qWrite(aStatus); Payload.qWrite(aProgress); if (this.imb2Compatible) { // wait for unique client id if (this.funiqueClientID == 0) { int SpinCount = 10; // 10*500 ms while (this.funiqueClientID == 0 && SpinCount > 0) { Thread.sleep(500); SpinCount--; } } // set variable using unique client id setVariableValue(Integer.toHexString(this.funiqueClientID) + prefixFederation(this.fownerName).toUpperCase() + MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME, Payload); } else setVariableValue( prefixFederation(this.fownerName).toUpperCase() + MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME, Payload, TVarPrefix.vpUniqueClientID); } /** Removes the current status for this client */ public void removeStatus() { if (this.imb2Compatible) setVariableValue(Integer.toHexString(this.funiqueClientID) + prefixFederation(this.fownerName) + MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME, ""); else setVariableValue(prefixFederation(this.fownerName) + MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME, "", TVarPrefix.vpUniqueClientID); } // TODO: delegates /** * Subscribe to focus events and registers the callback handler for these events. * @param aOnFocus TEventEntry.TOnFocus; callback event handler */ public void subscribeOnFocus(TEventEntry.TOnFocus aOnFocus) { if (this.ffocusEvent == null) this.ffocusEvent = subscribe(FOCUS_EVENT_NAME); this.ffocusEvent.onFocus = aOnFocus; } /** * Signal a new focus point to the framework * @param aX double; * @param aY double; * @return result of the command (see ICE_* constants) */ public int signalFocus(double aX, double aY) { if (this.ffocusEvent == null) this.ffocusEvent = findEventAutoPublishL(prefixFederation(FOCUS_EVENT_NAME)); if (this.ffocusEvent != null) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aX); Payload.prepare(aY); Payload.prepareApply(); Payload.qWrite(aX); Payload.qWrite(aY); return this.ffocusEvent.signalEvent(TEventEntry.EK_CHANGE_OBJECT_EVENT, Payload.getBuffer()); } else return ICE_EVENT_NOT_PUBLISHED; } // IMB 2 change federation /** * Subscribe to federation change events and register the callback handler for these events * @param aOnChangeFederation TEventEntry.TOnChangeFederation; */ public void subscribeOnFederationChange(TEventEntry.TOnChangeFederation aOnChangeFederation) { if (this.fchangeFederationEvent == null) this.fchangeFederationEvent = subscribe(CHANGE_FEDERATION_EVENT_NAME); this.fchangeFederationEvent.onChangeFederation = aOnChangeFederation; } /** * Signal a new federation to the framework * @param aNewFederationID int; * @param aNewFederation String; * @return result of the command (see ICE_* constants) */ public int signalChangeFederation(int aNewFederationID, String aNewFederation) { if (this.fchangeFederationEvent == null) this.fchangeFederationEvent = findEventAutoPublishL(prefixFederation(CHANGE_FEDERATION_EVENT_NAME)); if (this.fchangeFederationEvent != null) return this.fchangeFederationEvent.signalChangeObject(TEventEntry.ACTION_CHANGE, aNewFederationID, aNewFederation); else return ICE_EVENT_NOT_PUBLISHED; } // log /** * Log an entry to the framework * @param aLogEventName String; * @param aLine String; * @param aLevel TEventEntry.TLogLevel; * @return result of the command (see ICE_* constants) */ public int logWriteLn(String aLogEventName, String aLine, TEventEntry.TLogLevel aLevel) { if (this.flogEvent == null) this.flogEvent = findEventAutoPublishL(prefixFederation(aLogEventName)); if (this.flogEvent != null) return this.flogEvent.logWriteLn(aLine, aLevel); else return ICE_EVENT_NOT_PUBLISHED; } // remote event info // TODO: delegates /** Override dispatch to implement an event names request callback handler */ public interface TOnEventnames { public void dispatch(TConnection aConnection, TEventNameEntry[] aEventNames); } /** Handler to be called on a event names request callback */ public TOnEventnames onEventNames = null; // TODO: description public interface TOnSubAndPub { public void dispatch(TConnection aConnection, int aCommand, String aEventName); } // TODO: description public TOnSubAndPub onSubAndPub = null; // event filters /** request event name filter: requesting publisher counts */ public static final int EF_PUBLISHERS = 1; /** request event name filter: requesting subscriber counts */ public static final int EF_SUBSCRIBERS = 2; /** request event name filter: requesting timer counts */ public static final int EF_TIMERS = 4; /** * Query the framework for registered event names * @param aEventNameFilter String; * @param aEventFilters int; * @return result of the command (see ICE_* constants) */ public int requestEventname(String aEventNameFilter, int aEventFilters) { TByteBuffer Payload = new TByteBuffer(); Payload.prepare(aEventNameFilter); Payload.prepare(aEventFilters); Payload.prepareApply(); Payload.qWrite(aEventNameFilter); Payload.qWrite(aEventFilters); return writeCommand(TEventEntry.IC_REQUEST_EVENT_NAMES, Payload.getBuffer()); } /** {@inheritDoc} */ @Override public final String toString() { return "TConnection [remoteHost=" + this.fremoteHost + ", remotePort=" + this.fremotePort + ", federation=" + this.ffederation + ", isConnected()=" + this.isConnected() + "]"; } }