package org.opentrafficsim.sim0mq.publisher; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.djunits.Throw; import org.djutils.event.EventInterface; import org.djutils.event.EventListenerInterface; import org.djutils.event.EventProducerInterface; import org.djutils.event.TimedEvent; import org.djutils.event.TimedEventType; import org.djutils.metadata.MetaData; import org.djutils.metadata.ObjectDescriptor; import org.djutils.serialization.SerializationException; import org.sim0mq.Sim0MQException; /** * Data collection that can be listed and has subscription to change events. *
* Copyright (c) 2020-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See OpenTrafficSim License.
*
* $LastChangedDate: 2020-02-13 11:08:16 +0100 (Thu, 13 Feb 2020) $, @version $Revision: 6383 $, by $Author: pknoppers $,
* @author Alexander Verbraeck
* @author Peter Knoppers
*/
public class SubscriptionHandler
{
/** Id of this SubscriptionHandler. */
private final String id;
/** Transceiver to retrieve the data right now; e.g. GTUIdTransceiver. */
private final TransceiverInterface listTransceiver;
/** Event producer for add, remove, or change events; e.g. the OTSNetwork. */
private final LookupEventProducerInterface eventProducerForAddRemoveOrChange;
/** TimedEventType to subscribe to in order to receive creation of added object events; e.g. NETWORK.GTU_ADD_EVENT. */
private final TimedEventType addedEventType;
/** TimedEventType to subscribe to in order to receive removed object events; e.g. NETWORK.GTU_REMOVE_EVENT. */
private final TimedEventType removedEventType;
/** TimedEventType to subscript to in order to receive change of the collection, or object events. */
private final TimedEventType changeEventType;
/** SubscriptionHandler that handles subscriptions to individual objects; e.g. GTU.MOVE_EVENT. */
private final SubscriptionHandler elementSubscriptionHandler;
/** The currently active subscriptions. */
private final MapaddedEventType
, removedEventType
, or changeEventType
events
* @param addedEventType EventType; event type that signals that a new element has been added, should be null if there is no
* added event type for the data
* @param removedEventType EventType; event type that signals that an element has been removed, should be null if there is
* no removed event type for the data
* @param changeEventType EventType; event type that signals that an element has been changed, should be null if there is no
* change event type for the data
* @param elementSubscriptionHandler SubscriptionHandler; SubscriptionHandler for events produced by the underlying elements
*/
public SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
final LookupEventProducerInterface eventProducerForAddRemoveOrChange, final TimedEventType addedEventType,
final TimedEventType removedEventType, final TimedEventType changeEventType,
final SubscriptionHandler elementSubscriptionHandler)
{
Throw.whenNull(id, "Id may not be null");
Throw.when(
null == eventProducerForAddRemoveOrChange
&& (addedEventType != null || removedEventType != null || changeEventType != null),
NullPointerException.class,
"eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
this.id = id;
this.listTransceiver = listTransceiver;
this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
this.addedEventType = addedEventType;
this.removedEventType = removedEventType;
this.changeEventType = changeEventType;
this.elementSubscriptionHandler = elementSubscriptionHandler;
}
/**
* Report what payload is required to retrieve a list of all elements, or data and what format a result would have.
* @return MetaData; description of the payload required to retrieve a list of all elements, or data and what format a
* result would have
*/
public MetaData listRequestMetaData()
{
return this.listTransceiver.getAddressFields();
}
/**
* Report what the payload format of the result of the list transceiver.
* @return MetaData; the payload format of the result of the list transceiver
*/
public MetaData listResultMetaData()
{
return this.listTransceiver.getResultFields();
}
/**
* Retrieve a data collection.
* @param address Object[]; address of the requested data collection
* @param returnWrapper ReturnWrapper; to send back the result
* @throws RemoteException when communication fails
* @throws SerializationException on context error
* @throws Sim0MQException on DSOL error
*/
public void get(final Object[] address, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
}
/**
* Retrieve the list transceiver (only for testing).
* @return TransceiverInterface; the list transceiver
*/
public TransceiverInterface getListTransceiver()
{
return this.listTransceiver;
}
/**
* Return the set of supported commands.
* @return EnumSet<Command>; the set of supported commands.
*/
public final EnumSetcommandString
is not a valid Command
*/
public static Command lookupCommand(final String commandString)
{
if ("GET_ADDRESS_META_DATA".equals(commandString))
{
return Command.GET_ADDRESS_META_DATA;
}
else if ("GET_CURRENT".equals(commandString))
{
return Command.GET_CURRENT;
}
else if ("GET_RESULT_META_DATA".equals(commandString))
{
return Command.GET_RESULT_META_DATA;
}
else if ("GET_RESULT_META_DATA".equals(commandString))
{
return Command.GET_RESULT_META_DATA;
}
else if ("SUBSCRIBE_TO_ADD".equals(commandString))
{
return Command.SUBSCRIBE_TO_ADD;
}
else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
{
return Command.SUBSCRIBE_TO_CHANGE;
}
else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
{
return Command.SUBSCRIBE_TO_REMOVE;
}
else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_ADD;
}
else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_REMOVE;
}
else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_CHANGE;
}
else if ("GET_LIST".contentEquals(commandString))
{
return Command.GET_LIST;
}
else if ("GET_COMMANDS".contentEquals(commandString))
{
return Command.GET_COMMANDS;
}
System.err.println("Could not find command with name \"" + commandString + "\"");
return null;
}
/**
* Execute one command.
* @param command Command; the command
* @param address Object[] the address of the object on which the command must be applied
* @param returnWrapper ReturnWrapper; envelope generator for replies
* @throws RemoteException on communication failure
* @throws SerializationException on illegal type in serialization
* @throws Sim0MQException on communication error
*/
public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
Throw.whenNull(command, "Command may not be null");
Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
switch (command)
{
case SUBSCRIBE_TO_ADD:
subscribeTo(address, this.addedEventType, returnWrapper);
break;
case SUBSCRIBE_TO_CHANGE:
subscribeTo(address, this.changeEventType, returnWrapper);
break;
case SUBSCRIBE_TO_REMOVE:
subscribeTo(address, this.removedEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_ADD:
unsubscribeFrom(address, this.addedEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_CHANGE:
unsubscribeFrom(address, this.changeEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_REMOVE:
unsubscribeFrom(address, this.removedEventType, returnWrapper);
break;
case GET_CURRENT:
{
Object[] result = this.listTransceiver.get(address, returnWrapper);
if (null != result)
{
sendResult(result, returnWrapper);
}
// TODO else?
break;
}
case GET_ADDRESS_META_DATA:
if (null == this.listTransceiver)
{
returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
}
sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
returnWrapper);
break;
case GET_RESULT_META_DATA:
if (null == this.listTransceiver)
{
returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
}
sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
returnWrapper);
break;
case GET_LIST:
{
if (this.listTransceiver.hasIdSource())
{
sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
returnWrapper);
}
else
{
sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
}
break;
}
case GET_COMMANDS:
List