1 | package com.joelpm.bidiMessages.server;
|
---|
2 |
|
---|
3 | import java.util.ArrayList;
|
---|
4 | import java.util.Iterator;
|
---|
5 | import java.util.List;
|
---|
6 | import java.util.concurrent.BlockingQueue;
|
---|
7 | import java.util.concurrent.LinkedBlockingQueue;
|
---|
8 |
|
---|
9 | import org.apache.log4j.Logger;
|
---|
10 |
|
---|
11 | import org.apache.thrift.TException;
|
---|
12 | import com.joelpm.bidiMessages.generated.Message;
|
---|
13 | import com.joelpm.bidiMessages.generated.MessageService.Iface;
|
---|
14 |
|
---|
15 | /**
|
---|
16 | * A simple class that uses a blocking queue to accept and publish
|
---|
17 | * messages. This class should be run in its own thread to ensure
|
---|
18 | * that message sending doesn't hijack the message receiving thread.
|
---|
19 | *
|
---|
20 | * @author Joel Meyer
|
---|
21 | */
|
---|
22 | public class MessageDistributor implements Iface, Runnable {
|
---|
23 | private static final Logger LOGGER = Logger.getLogger(MessageDistributor.class);
|
---|
24 |
|
---|
25 | private final BlockingQueue<Message> messageQueue;
|
---|
26 | private final List<MessageServiceClient> clients;
|
---|
27 |
|
---|
28 | public MessageDistributor() {
|
---|
29 | this.messageQueue = new LinkedBlockingQueue<Message>();
|
---|
30 | this.clients = new ArrayList<MessageServiceClient>();
|
---|
31 | }
|
---|
32 |
|
---|
33 | public void addClient(MessageServiceClient client) {
|
---|
34 | // There should be some synchronization around this list
|
---|
35 | clients.add(client);
|
---|
36 | LOGGER.info(String.format("Added client at %s", client.getAddy()));
|
---|
37 | }
|
---|
38 |
|
---|
39 | @Override
|
---|
40 | public void run() {
|
---|
41 | while (true) {
|
---|
42 | try {
|
---|
43 | Message msg = messageQueue.take();
|
---|
44 |
|
---|
45 | Iterator<MessageServiceClient> clientItr = clients.iterator();
|
---|
46 | while (clientItr.hasNext()) {
|
---|
47 | MessageServiceClient client = clientItr.next();
|
---|
48 | try {
|
---|
49 | client.sendMessage(msg);
|
---|
50 | } catch (TException te) {
|
---|
51 | // Most likely client disconnected, should remove it from the list
|
---|
52 | clientItr.remove();
|
---|
53 | LOGGER.info(String.format("Removing %s from client list.", client.getAddy()));
|
---|
54 | LOGGER.debug(te);
|
---|
55 | }
|
---|
56 | }
|
---|
57 | } catch (InterruptedException ie) {
|
---|
58 | LOGGER.debug(ie);
|
---|
59 | }
|
---|
60 | }
|
---|
61 | }
|
---|
62 |
|
---|
63 | @Override
|
---|
64 | public void sendMessage(Message msg) throws TException {
|
---|
65 | messageQueue.add(msg);
|
---|
66 | LOGGER.info(String.format("Adding message to queue:\n%s", msg));
|
---|
67 | }
|
---|
68 | }
|
---|
69 |
|
---|