package org.sim0mq.test.dealerrouter;
import java.util.Random;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
/**
* 0MQ DEALER-ROUTER example. Based on
* http://zguide.zeromq.org/java:asyncsrv or in the JeroMQ example at
*
* https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/asyncsrv.java.
* In the 0MQ manual this example can be found at
*
* http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern.
*
* Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See Sim0MQ License.
*
* @author Alexander Verbraeck
*/
public final class DealerRouter
{
/** */
private DealerRouter()
{
// utility class
}
/** random stream. */
@SuppressWarnings("checkstyle:visibilitymodifier")
static Random rand = new Random(System.nanoTime());
/**
* This is our client task. It connects to the server, and then sends a request once per second. It collects responses as
* they arrive, and it prints them out. We will run several client tasks in parallel, each with a different random ID.
*/
static class ClientTask implements Runnable
{
/** {@inheritDoc} */
@Override
public void run()
{
try (ZContext ctx = new ZContext())
{
Socket client = ctx.createSocket(SocketType.DEALER);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.connect("tcp://localhost:5570");
Poller poller = ctx.createPoller(1);
poller.register(client, Poller.POLLIN);
int requestNr = 0;
while (!Thread.currentThread().isInterrupted())
{
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++)
{
poller.poll(10);
if (poller.pollin(0))
{
ZMsg msg = ZMsg.recvMsg(client);
msg.getLast().print(identity);
msg.destroy();
}
}
client.send(String.format("request #%d", ++requestNr), 0);
}
}
}
}
/**
* This is our server task. It uses the multithreaded server model to deal requests out to a pool of workers and route
* replies back to clients. One worker can handle one request at a time but one client can talk to multiple workers at once.
*/
static class ServerTask implements Runnable
{
/** {@inheritDoc} */
@Override
public void run()
{
try (ZContext ctx = new ZContext())
{
// Frontend socket talks to clients over TCP
Socket frontend = ctx.createSocket(SocketType.ROUTER);
frontend.bind("tcp://*:5570");
// Backend socket talks to workers over inproc
Socket backend = ctx.createSocket(SocketType.DEALER);
backend.bind("inproc://backend");
// Launch pool of worker threads, precise number is not critical
for (int threadNbr = 0; threadNbr < 5; threadNbr++)
{
new Thread(new ServerWorker(ctx)).start();
}
// Connect backend to frontend via a proxy
ZMQ.proxy(frontend, backend, null);
}
}
}
/**
* Each worker task works on one request at a time and sends a random number of replies back, with random delays between
* replies.
*/
private static class ServerWorker implements Runnable
{
/** context. */
private ZContext ctx;
/**
* Construct a worker.
* @param ctx the 0MQ context to use
*/
ServerWorker(final ZContext ctx)
{
this.ctx = ctx;
}
/** {@inheritDoc} */
@Override
public void run()
{
Socket worker = this.ctx.createSocket(SocketType.DEALER);
worker.connect("inproc://backend");
while (!Thread.currentThread().isInterrupted())
{
// The DEALER socket gives us the address envelope and message
ZMsg msg = ZMsg.recvMsg(worker);
ZFrame address = msg.pop();
ZFrame content = msg.pop();
assert (content != null);
msg.destroy();
// Send 0..4 replies back
int replies = rand.nextInt(5);
for (int reply = 0; reply < replies; reply++)
{
// Sleep for some fraction of a second
try
{
Thread.sleep(rand.nextInt(1000) + 1);
}
catch (InterruptedException e)
{
// ignore
}
address.send(worker, ZFrame.REUSE + ZFrame.MORE);
content.send(worker, ZFrame.REUSE);
}
address.destroy();
content.destroy();
}
this.ctx.destroy();
}
}
/**
* The main thread simply starts several clients, and a server, and then waits for the server to finish.
* @param args unused
* @throws Exception on error
*/
public static void main(final String[] args) throws Exception
{
new Thread(new ClientTask()).start();
new Thread(new ClientTask()).start();
new Thread(new ClientTask()).start();
new Thread(new ServerTask()).start();
// Run for 20 seconds then quit
Thread.sleep(20 * 1000);
System.exit(0);
}
}