package org.sim0mq.test.dealerrouter; import java.util.Random; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZFrame; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Poller; import org.zeromq.ZMQ.Socket; import org.zeromq.ZMsg; /** * 0MQ DEALER-ROUTER example. Based on * http://zguide.zeromq.org/java:asyncsrv or in the JeroMQ example at * * https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/asyncsrv.java.
* In the 0MQ manual this example can be found at * * http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern. *

* Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* BSD-style license. See Sim0MQ License. *

* @author Alexander Verbraeck */ public final class DealerRouter { /** */ private DealerRouter() { // utility class } /** random stream. */ @SuppressWarnings("checkstyle:visibilitymodifier") static Random rand = new Random(System.nanoTime()); /** * This is our client task. It connects to the server, and then sends a request once per second. It collects responses as * they arrive, and it prints them out. We will run several client tasks in parallel, each with a different random ID. */ static class ClientTask implements Runnable { /** {@inheritDoc} */ @Override public void run() { try (ZContext ctx = new ZContext()) { Socket client = ctx.createSocket(SocketType.DEALER); // Set random identity to make tracing easier String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt()); client.setIdentity(identity.getBytes(ZMQ.CHARSET)); client.connect("tcp://localhost:5570"); Poller poller = ctx.createPoller(1); poller.register(client, Poller.POLLIN); int requestNr = 0; while (!Thread.currentThread().isInterrupted()) { // Tick once per second, pulling in arriving messages for (int centitick = 0; centitick < 100; centitick++) { poller.poll(10); if (poller.pollin(0)) { ZMsg msg = ZMsg.recvMsg(client); msg.getLast().print(identity); msg.destroy(); } } client.send(String.format("request #%d", ++requestNr), 0); } } } } /** * This is our server task. It uses the multithreaded server model to deal requests out to a pool of workers and route * replies back to clients. One worker can handle one request at a time but one client can talk to multiple workers at once. */ static class ServerTask implements Runnable { /** {@inheritDoc} */ @Override public void run() { try (ZContext ctx = new ZContext()) { // Frontend socket talks to clients over TCP Socket frontend = ctx.createSocket(SocketType.ROUTER); frontend.bind("tcp://*:5570"); // Backend socket talks to workers over inproc Socket backend = ctx.createSocket(SocketType.DEALER); backend.bind("inproc://backend"); // Launch pool of worker threads, precise number is not critical for (int threadNbr = 0; threadNbr < 5; threadNbr++) { new Thread(new ServerWorker(ctx)).start(); } // Connect backend to frontend via a proxy ZMQ.proxy(frontend, backend, null); } } } /** * Each worker task works on one request at a time and sends a random number of replies back, with random delays between * replies. */ private static class ServerWorker implements Runnable { /** context. */ private ZContext ctx; /** * Construct a worker. * @param ctx the 0MQ context to use */ ServerWorker(final ZContext ctx) { this.ctx = ctx; } /** {@inheritDoc} */ @Override public void run() { Socket worker = this.ctx.createSocket(SocketType.DEALER); worker.connect("inproc://backend"); while (!Thread.currentThread().isInterrupted()) { // The DEALER socket gives us the address envelope and message ZMsg msg = ZMsg.recvMsg(worker); ZFrame address = msg.pop(); ZFrame content = msg.pop(); assert (content != null); msg.destroy(); // Send 0..4 replies back int replies = rand.nextInt(5); for (int reply = 0; reply < replies; reply++) { // Sleep for some fraction of a second try { Thread.sleep(rand.nextInt(1000) + 1); } catch (InterruptedException e) { // ignore } address.send(worker, ZFrame.REUSE + ZFrame.MORE); content.send(worker, ZFrame.REUSE); } address.destroy(); content.destroy(); } this.ctx.destroy(); } } /** * The main thread simply starts several clients, and a server, and then waits for the server to finish. * @param args unused * @throws Exception on error */ public static void main(final String[] args) throws Exception { new Thread(new ClientTask()).start(); new Thread(new ClientTask()).start(); new Thread(new ClientTask()).start(); new Thread(new ServerTask()).start(); // Run for 20 seconds then quit Thread.sleep(20 * 1000); System.exit(0); } }