package org.opentrafficsim.sim0mq.publisher; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.djunits.Throw; import org.djutils.event.EventInterface; import org.djutils.event.EventListenerInterface; import org.djutils.event.EventProducerInterface; import org.djutils.event.TimedEvent; import org.djutils.event.TimedEventType; import org.djutils.metadata.MetaData; import org.djutils.metadata.ObjectDescriptor; import org.djutils.serialization.SerializationException; import org.sim0mq.Sim0MQException; /** * Data collection that can be listed and has subscription to change events. *
* Copyright (c) 2020-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See OpenTrafficSim License.
*
addedEventType
, removedEventType
, or changeEventType
events
* @param addedEventType EventType; event type that signals that a new element has been added, should be null if there is no
* added event type for the data
* @param removedEventType EventType; event type that signals that an element has been removed, should be null if there is
* no removed event type for the data
* @param changeEventType EventType; event type that signals that an element has been changed, should be null if there is no
* change event type for the data
* @param elementSubscriptionHandler SubscriptionHandler; SubscriptionHandler for events produced by the underlying elements
*/
SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
final LookupEventProducerInterface eventProducerForAddRemoveOrChange, final TimedEventType addedEventType,
final TimedEventType removedEventType, final TimedEventType changeEventType,
final SubscriptionHandler elementSubscriptionHandler)
{
Throw.whenNull(id, "Id may not be null");
Throw.when(
null == eventProducerForAddRemoveOrChange
&& (addedEventType != null || removedEventType != null || changeEventType != null),
NullPointerException.class,
"eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
this.id = id;
this.listTransceiver = listTransceiver;
this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
this.addedEventType = addedEventType;
this.removedEventType = removedEventType;
this.changeEventType = changeEventType;
this.elementSubscriptionHandler = elementSubscriptionHandler;
}
/**
* Report what payload is required to retrieve a list of all elements, or data and what format a result would have.
* @return MetaData; description of the payload required to retrieve a list of all elements, or data and what format a
* result would have
*/
public MetaData listRequestMetaData()
{
return this.listTransceiver.getAddressFields();
}
/**
* Report what the payload format of the result of the list transceiver.
* @return MetaData; the payload format of the result of the list transceiver
*/
public MetaData listResultMetaData()
{
return this.listTransceiver.getResultFields();
}
/**
* Retrieve a data collection.
* @param address Object[]; address of the requested data collection
* @param returnWrapper ReturnWrapper; to send back the result
* @throws RemoteException when communication fails
* @throws SerializationException on context error
* @throws Sim0MQException on DSOL error
*/
public void get(final Object[] address, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
}
/**
* Retrieve the list transceiver (only for testing).
* @return TransceiverInterface; the list transceiver
*/
public TransceiverInterface getListTransceiver()
{
return this.listTransceiver;
}
/**
* Return the set of supported commands.
* @return EnumSet<Command>; the set of supported commands.
*/
public final EnumSetcommandString
is not a valid Command
*/
public static Command lookupCommand(final String commandString)
{
if ("GET_ADDRESS_META_DATA".equals(commandString))
{
return Command.GET_ADDRESS_META_DATA;
}
else if ("GET_CURRENT".equals(commandString))
{
return Command.GET_CURRENT;
}
else if ("GET_RESULT_META_DATA".equals(commandString))
{
return Command.GET_RESULT_META_DATA;
}
else if ("GET_RESULT_META_DATA".equals(commandString))
{
return Command.GET_RESULT_META_DATA;
}
else if ("SUBSCRIBE_TO_ADD".equals(commandString))
{
return Command.SUBSCRIBE_TO_ADD;
}
else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
{
return Command.SUBSCRIBE_TO_CHANGE;
}
else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
{
return Command.SUBSCRIBE_TO_REMOVE;
}
else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_ADD;
}
else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_REMOVE;
}
else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_CHANGE;
}
else if ("GET_LIST".contentEquals(commandString))
{
return Command.GET_LIST;
}
else if ("GET_COMMANDS".contentEquals(commandString))
{
return Command.GET_COMMANDS;
}
System.err.println("Could not find command with name \"" + commandString + "\"");
return null;
}
/**
* Execute one command.
* @param command Command; the command
* @param address Object[] the address of the object on which the command must be applied
* @param returnWrapper ReturnWrapper; envelope generator for replies
* @throws RemoteException on communication failure
* @throws SerializationException on illegal type in serialization
* @throws Sim0MQException on communication error
*/
public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
Throw.whenNull(command, "Command may not be null");
Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
switch (command)
{
case SUBSCRIBE_TO_ADD:
subscribeTo(address, this.addedEventType, returnWrapper);
break;
case SUBSCRIBE_TO_CHANGE:
subscribeTo(address, this.changeEventType, returnWrapper);
break;
case SUBSCRIBE_TO_REMOVE:
subscribeTo(address, this.removedEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_ADD:
unsubscribeFrom(address, this.addedEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_CHANGE:
unsubscribeFrom(address, this.changeEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_REMOVE:
unsubscribeFrom(address, this.removedEventType, returnWrapper);
break;
case GET_CURRENT:
{
Object[] result = this.listTransceiver.get(address, returnWrapper);
if (null != result)
{
sendResult(result, returnWrapper);
}
// TODO else?
break;
}
case GET_ADDRESS_META_DATA:
sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
returnWrapper);
break;
case GET_RESULT_META_DATA:
sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
returnWrapper);
break;
case GET_LIST:
{
if (this.listTransceiver.hasIdSource())
{
sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
returnWrapper);
}
else
{
sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
}
break;
}
case GET_COMMANDS:
Listlookup
method.
* @return MetaData; to be used to verify the correctness of an address for the lookup
method
*/
MetaData getAddressMetaData();
}
/**
* Handles one subscription.
*/
class Subscription implements EventListenerInterface
{
/** ... */
private static final long serialVersionUID = 20200428L;
/** Generates envelopes for the messages sent over Sim0MQ. */
private final ReturnWrapper returnWrapper;
/**
* Construct a new Subscription.
* @param returnWrapper ReturnWrapper; envelope generator for the messages
*/
Subscription(final ReturnWrapper returnWrapper)
{
this.returnWrapper = returnWrapper;
}
/** {@inheritDoc} */
@Override
public void notify(final EventInterface event) throws RemoteException
{
MetaData metaData = event.getType().getMetaData();
int additionalFields = event.getType() instanceof TimedEventType ? 1 : 0;
Object[] result = new Object[additionalFields + metaData.size()];
// result[0] = event.getType().getName();
if (additionalFields > 0)
{
result[0] = ((TimedEvent>) event).getTimeStamp();
}
Object payload = event.getContent();
if (payload instanceof Object[])
{
for (int index = 0; index < event.getType().getMetaData().size(); index++)
{
result[additionalFields + index] = ((Object[]) payload)[index];
}
}
else
{
result[additionalFields] = payload;
}
// TODO verify the composition of the result. Problem: no access to the metadata here
try
{
this.returnWrapper.encodeReplyAndTransmit(result);
}
catch (Sim0MQException | SerializationException e)
{
e.printStackTrace();
}
}
}