[4940] | 1 | package com.joelpm.bidiMessages.server;
|
---|
| 2 |
|
---|
| 3 | import java.util.ArrayList;
|
---|
| 4 | import java.util.Iterator;
|
---|
| 5 | import java.util.List;
|
---|
| 6 | import java.util.concurrent.BlockingQueue;
|
---|
| 7 | import java.util.concurrent.LinkedBlockingQueue;
|
---|
| 8 |
|
---|
| 9 | import org.apache.log4j.Logger;
|
---|
| 10 |
|
---|
| 11 | import org.apache.thrift.TException;
|
---|
| 12 | import com.joelpm.bidiMessages.generated.Message;
|
---|
| 13 | import com.joelpm.bidiMessages.generated.MessageService.Iface;
|
---|
| 14 |
|
---|
| 15 | /**
|
---|
| 16 | * A simple class that uses a blocking queue to accept and publish
|
---|
| 17 | * messages. This class should be run in its own thread to ensure
|
---|
| 18 | * that message sending doesn't hijack the message receiving thread.
|
---|
| 19 | *
|
---|
| 20 | * @author Joel Meyer
|
---|
| 21 | */
|
---|
| 22 | public class MessageDistributor implements Iface, Runnable {
|
---|
| 23 | private static final Logger LOGGER = Logger.getLogger(MessageDistributor.class);
|
---|
| 24 |
|
---|
| 25 | private final BlockingQueue<Message> messageQueue;
|
---|
| 26 | private final List<MessageServiceClient> clients;
|
---|
| 27 |
|
---|
| 28 | public MessageDistributor() {
|
---|
| 29 | this.messageQueue = new LinkedBlockingQueue<Message>();
|
---|
| 30 | this.clients = new ArrayList<MessageServiceClient>();
|
---|
| 31 | }
|
---|
| 32 |
|
---|
| 33 | public void addClient(MessageServiceClient client) {
|
---|
| 34 | // There should be some synchronization around this list
|
---|
| 35 | clients.add(client);
|
---|
| 36 | LOGGER.info(String.format("Added client at %s", client.getAddy()));
|
---|
| 37 | }
|
---|
| 38 |
|
---|
| 39 | @Override
|
---|
| 40 | public void run() {
|
---|
| 41 | while (true) {
|
---|
| 42 | try {
|
---|
| 43 | Message msg = messageQueue.take();
|
---|
| 44 |
|
---|
| 45 | Iterator<MessageServiceClient> clientItr = clients.iterator();
|
---|
| 46 | while (clientItr.hasNext()) {
|
---|
| 47 | MessageServiceClient client = clientItr.next();
|
---|
| 48 | try {
|
---|
| 49 | client.sendMessage(msg);
|
---|
| 50 | } catch (TException te) {
|
---|
| 51 | // Most likely client disconnected, should remove it from the list
|
---|
| 52 | clientItr.remove();
|
---|
| 53 | LOGGER.info(String.format("Removing %s from client list.", client.getAddy()));
|
---|
| 54 | LOGGER.debug(te);
|
---|
| 55 | }
|
---|
| 56 | }
|
---|
| 57 | } catch (InterruptedException ie) {
|
---|
| 58 | LOGGER.debug(ie);
|
---|
| 59 | }
|
---|
| 60 | }
|
---|
| 61 | }
|
---|
| 62 |
|
---|
| 63 | @Override
|
---|
| 64 | public void sendMessage(Message msg) throws TException {
|
---|
| 65 | messageQueue.add(msg);
|
---|
| 66 | LOGGER.info(String.format("Adding message to queue:\n%s", msg));
|
---|
| 67 | }
|
---|
| 68 | }
|
---|
| 69 |
|
---|