source: ntrip/trunk/GnssCenter/thrift/bidi_java/server/MessageDistributor.java@ 10460

Last change on this file since 10460 was 4940, checked in by mervart, 12 years ago
File size: 2.1 KB
RevLine 
[4940]1package com.joelpm.bidiMessages.server;
2
3import java.util.ArrayList;
4import java.util.Iterator;
5import java.util.List;
6import java.util.concurrent.BlockingQueue;
7import java.util.concurrent.LinkedBlockingQueue;
8
9import org.apache.log4j.Logger;
10
11import org.apache.thrift.TException;
12import com.joelpm.bidiMessages.generated.Message;
13import com.joelpm.bidiMessages.generated.MessageService.Iface;
14
15/**
16* A simple class that uses a blocking queue to accept and publish
17* messages. This class should be run in its own thread to ensure
18* that message sending doesn't hijack the message receiving thread.
19*
20* @author Joel Meyer
21*/
22public class MessageDistributor implements Iface, Runnable {
23 private static final Logger LOGGER = Logger.getLogger(MessageDistributor.class);
24
25 private final BlockingQueue<Message> messageQueue;
26 private final List<MessageServiceClient> clients;
27
28 public MessageDistributor() {
29 this.messageQueue = new LinkedBlockingQueue<Message>();
30 this.clients = new ArrayList<MessageServiceClient>();
31 }
32
33 public void addClient(MessageServiceClient client) {
34 // There should be some synchronization around this list
35 clients.add(client);
36 LOGGER.info(String.format("Added client at %s", client.getAddy()));
37 }
38
39 @Override
40 public void run() {
41 while (true) {
42 try {
43 Message msg = messageQueue.take();
44
45 Iterator<MessageServiceClient> clientItr = clients.iterator();
46 while (clientItr.hasNext()) {
47 MessageServiceClient client = clientItr.next();
48 try {
49 client.sendMessage(msg);
50 } catch (TException te) {
51 // Most likely client disconnected, should remove it from the list
52 clientItr.remove();
53 LOGGER.info(String.format("Removing %s from client list.", client.getAddy()));
54 LOGGER.debug(te);
55 }
56 }
57 } catch (InterruptedException ie) {
58 LOGGER.debug(ie);
59 }
60 }
61 }
62
63 @Override
64 public void sendMessage(Message msg) throws TException {
65 messageQueue.add(msg);
66 LOGGER.info(String.format("Adding message to queue:\n%s", msg));
67 }
68}
69
Note: See TracBrowser for help on using the repository browser.