Index: trunk/GnssCenter/thrift/bidi_java/BidiMessages.thrift
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/BidiMessages.thrift	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/BidiMessages.thrift	(revision 4940)
@@ -0,0 +1,12 @@
+#!/usr/local/bin/thrift --gen java:beans:hashcode -O ../
+
+namespace java com.joelpm.bidiMessages.generated
+
+struct Message {
+  1: string clientName,
+  2: string message
+}
+
+service MessageService {
+  oneway void sendMessage(Message msg),
+}
Index: trunk/GnssCenter/thrift/bidi_java/client/Client.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/client/Client.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/client/Client.java	(revision 4940)
@@ -0,0 +1,86 @@
+package com.joelpm.bidiMessages.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import com.joelpm.bidiMessages.generated.Message;
+import com.joelpm.bidiMessages.generated.MessageService;
+
+/**
+* Client that connects to the server and handles the sending and receiving
+* of message objects. Will also attempt to reconnect if the server disappears.
+*
+* @author Joel Meyer
+*/
+public class Client implements MessageService.Iface {
+  private final ConnectionStatusMonitor connectionMonitor;
+  private final MessageSender sender;
+  private final MessageReceiver receiver;
+  
+  private final String name;
+  
+  private final TTransport transport;
+  private final TProtocol protocol;
+  
+  private final List<MessageListener> listeners;
+  
+  public Client(String name, String server, int port, MessageService.Iface messageHandler) {
+    this.name = name;
+    this.transport = new TSocket(server, port);
+    this.protocol = new TBinaryProtocol(transport);
+   
+    this.connectionMonitor = new ConnectionStatusMonitor(transport);
+   
+    this.sender = new MessageSender(protocol, connectionMonitor);
+    this.receiver = new MessageReceiver(protocol, messageHandler, connectionMonitor);
+   
+    new Thread(sender).start();
+    new Thread(receiver).start();
+   
+    this.connectionMonitor.tryOpen();
+   
+    this.listeners = new ArrayList<MessageListener>();
+  }
+  
+  public void addListener(MessageListener listener) {
+    listeners.add(listener);
+  }
+  
+  public void sendMessageToServer(String msg) {
+    sender.send(new Message(name, msg));
+  }
+  
+  @Override
+  public void sendMessage(Message msg) throws TException {
+    for (MessageListener listener : listeners) {
+      listener.messageReceived(msg);
+    }
+  }
+  
+  /**
+* @param args
+*/
+  public static void main(String[] args) throws Exception {
+    MessageService.Iface handler = new MessageService.Iface() {
+      @Override
+      public void sendMessage(Message msg) throws TException {
+        System.out.println("Got msg: " + msg);
+      }
+    };
+
+    Client client = new Client(args[0], args[1], Integer.parseInt(args[2]), handler);
+    
+    client.sendMessageToServer("Hello there!");
+    
+    for (int i = 0; i < 100; i++) {
+      client.sendMessageToServer(String.format("Message %s", i));
+      Thread.sleep(1000);
+    }
+  }
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/client/ConnectionRequiredRunnable.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/client/ConnectionRequiredRunnable.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/client/ConnectionRequiredRunnable.java	(revision 4940)
@@ -0,0 +1,74 @@
+package com.joelpm.bidiMessages.client;
+
+import org.apache.log4j.Logger;
+
+/**
+* An abstract class that can be extended by tasks requiring a connection
+* to the server. Provides utility methods for a task to notify others that
+* the connection has been dropped as well as be paused until the connection
+* is resumed.
+*
+* @author Joel Meyer
+*/
+public abstract class ConnectionRequiredRunnable implements ConnectionStatusListener, Runnable {
+  private static final Logger LOGGER = Logger.getLogger(ConnectionRequiredRunnable.class);
+  protected final ConnectionStatusMonitor connectionMonitor;
+  protected final String threadName;
+  protected Thread executingThread;
+  
+  public ConnectionRequiredRunnable(ConnectionStatusMonitor connectionMonitor, String name) {
+    this.connectionMonitor = connectionMonitor;
+    this.connectionMonitor.addListener(this);
+    this.threadName = name;
+  }
+  
+  /**
+* Should be called if the task determines that the connection has
+* been dropped.
+*/
+  protected void disconnected() {
+    LOGGER.info(String.format("%s detected a disconnect from the server.", threadName));
+    connectionMonitor.disconnected(this);
+    connectWait();
+  }
+
+  /**
+* Can be called by the task upon startup to halt execution until
+* the connection to the server has been established.
+*/
+  protected synchronized void connectWait() {
+    executingThread = Thread.currentThread();
+    try {
+      LOGGER.info(String.format("%s waiting for connection to be established.", threadName));
+      wait();
+    } catch (InterruptedException e) {
+      LOGGER.debug(String.format("%s caught InterruptedException:", threadName));
+      LOGGER.debug(e);
+    }
+    LOGGER.info(String.format("%s notified of connection, resuming execution", threadName));
+  }
+  
+  /**
+* Interrupts the executing thread which is most likely blocked on
+* 1) socket read (in the case of the MessageReceiver)
+* 2) queue read (in the case of the MessageSender)
+* Regardless of what it's up to, when the thread is interrupted
+* it will wait until notified of reconnection, at which point it
+* will resume sending/receiving.
+*
+* @see com.joelpm.bidiMessages.client.ConnectionStatusListener#connectionLost()
+*/
+  public synchronized void connectionLost() {
+    executingThread.interrupt();
+  }
+  
+  /**
+* Notifies the waiting thread so that execution is resumed.
+*
+* @see com.joelpm.bidiMessages.client.ConnectionStatusListener#connectionEstablished()
+*/
+  public synchronized void connectionEstablished() {
+    notifyAll();
+  }
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/client/ConnectionStatusListener.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/client/ConnectionStatusListener.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/client/ConnectionStatusListener.java	(revision 4940)
@@ -0,0 +1,20 @@
+package com.joelpm.bidiMessages.client;
+
+/**
+* Interface implemented by classes that need to be notified
+* when the connection is lost or established.
+*
+* @author Joel Meyer
+*/
+public interface ConnectionStatusListener {
+  /**
+* Called when the connection has been lost.
+*/
+  public void connectionLost();
+  
+  /**
+* Called when the connection has been established.
+*/
+  public void connectionEstablished();
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/client/ConnectionStatusMonitor.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/client/ConnectionStatusMonitor.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/client/ConnectionStatusMonitor.java	(revision 4940)
@@ -0,0 +1,85 @@
+package com.joelpm.bidiMessages.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+* This class is responsible for notifying others when the
+* connection is lost or established and for attempting to
+* reconnect when the connection is lost.
+*
+* @author Joel Meyer
+*/
+public class ConnectionStatusMonitor {
+  /**
+* Simple task used to attempt a reconnect every few seconds.
+*/
+  private class RetryTask extends TimerTask {
+    @Override public void run() {
+      tryOpen();
+    }
+  }
+  
+  private final Timer timer;
+  
+  private final TTransport transport;
+  private final AtomicBoolean connected;
+  private final List<ConnectionStatusListener> listeners;
+  
+  public ConnectionStatusMonitor(TTransport transport) {
+    this.transport = transport;
+    this.connected = new AtomicBoolean(false);
+    this.listeners = new ArrayList<ConnectionStatusListener>();
+    
+    this.timer = new Timer();
+  }
+  
+  public void addListener(ConnectionStatusListener listener) {
+    listeners.add(listener);
+  }
+  
+  public void disconnected(ConnectionStatusListener noticer) {
+    if (connected.compareAndSet(true, false)) {
+      for (ConnectionStatusListener listener : listeners) {
+        // The thread running the noticer is our current execution thread. If we
+        // notify him he'll block and we'll be deadlocked. Since he noticed the
+        // disconnect he is responsible for initiating his own wait state.
+        if (listener == noticer) continue;
+        listener.connectionLost();
+      }
+      
+      // Try to reconnect in five seconds
+      timer.schedule(new RetryTask(), 5 * 1000);
+    }
+  }
+  
+  /**
+* Attempts to reconnect to the server.
+*/
+  public void tryOpen() {
+    if (connected.get()) return;
+    
+    // Make sure it's closed
+    transport.close();
+    
+    try {
+      transport.open();
+      connected.set(true);
+      for (ConnectionStatusListener listener : listeners) {
+        listener.connectionEstablished();
+      }
+      return;
+    } catch (TTransportException e) {
+      
+    }
+    
+    timer.schedule(new RetryTask(), 5 * 1000);
+  }
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/client/MessageListener.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/client/MessageListener.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/client/MessageListener.java	(revision 4940)
@@ -0,0 +1,18 @@
+package com.joelpm.bidiMessages.client;
+
+import com.joelpm.bidiMessages.generated.Message;
+
+/**
+* Interface implemented by classes that want to be notified when
+* new messages are received.
+*
+* @author Joel Meyer
+*/
+public interface MessageListener {
+  /**
+* Called when a new message is received.
+* @param msg The message that was received.
+*/
+  public void messageReceived(Message msg);
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/client/MessageReceiver.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/client/MessageReceiver.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/client/MessageReceiver.java	(revision 4940)
@@ -0,0 +1,38 @@
+package com.joelpm.bidiMessages.client;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import com.joelpm.bidiMessages.generated.MessageService;
+
+/**
+* The class responsible for reading and deserializing incoming messages.
+* Should be run in its own thread.
+*
+* @author Joel Meyer
+*/
+public class MessageReceiver extends ConnectionRequiredRunnable {
+  private final MessageService.Processor processor;
+  private final TProtocol protocol;
+  
+  public MessageReceiver(
+      TProtocol protocol,
+      MessageService.Iface messageService,
+      ConnectionStatusMonitor connectionMonitor) {
+    super(connectionMonitor, "Message Receiver");
+    this.protocol = protocol;
+    this.processor = new MessageService.Processor(messageService);
+  }
+  
+  @Override
+  public void run() {
+    connectWait();
+    while (true) {
+      try {
+        while (processor.process(protocol, protocol) == true) { }
+      } catch (TException e) {
+        disconnected();
+      }
+    }
+  }
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/client/MessageSender.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/client/MessageSender.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/client/MessageSender.java	(revision 4940)
@@ -0,0 +1,54 @@
+package com.joelpm.bidiMessages.client;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import com.joelpm.bidiMessages.generated.Message;
+import com.joelpm.bidiMessages.generated.MessageService;
+
+/**
+* The class responsible for sending messages to the server.
+*
+* @author Joel Meyer
+*/
+public class MessageSender extends ConnectionRequiredRunnable {
+  private final MessageService.Client client;
+  private final BlockingQueue<Message> msgSendQueue;
+  
+  public MessageSender(
+      TProtocol protocol,
+      ConnectionStatusMonitor connectionMonitor) {
+    super(connectionMonitor, "Message Sender");
+    this.client = new MessageService.Client(protocol);
+    this.msgSendQueue = new LinkedBlockingQueue<Message>();
+  }
+  
+  public void send(Message msg) {
+    msgSendQueue.add(msg);
+  }
+  
+  @Override
+  public void run() {
+    connectWait();
+    while (true) {
+      try {
+        Message msg = msgSendQueue.take();
+        try {
+          client.sendMessage(msg);
+        } catch (TException e) {
+          // The message isn't lost, but it could end up being sent out of
+          // order - not ideal.
+          msgSendQueue.add(msg);
+          disconnected();
+        }
+      } catch (InterruptedException e) {
+        // Thread will be interrupted if connection is lost, we should wait
+        // for reconnection if that happens.
+        connectWait();
+      }
+    }
+  }
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/server/MessageDistributor.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/server/MessageDistributor.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/server/MessageDistributor.java	(revision 4940)
@@ -0,0 +1,69 @@
+package com.joelpm.bidiMessages.server;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.thrift.TException;
+import com.joelpm.bidiMessages.generated.Message;
+import com.joelpm.bidiMessages.generated.MessageService.Iface;
+
+/**
+* A simple class that uses a blocking queue to accept and publish
+* messages. This class should be run in its own thread to ensure
+* that message sending doesn't hijack the message receiving thread.
+*
+* @author Joel Meyer
+*/
+public class MessageDistributor implements Iface, Runnable {
+  private static final Logger LOGGER = Logger.getLogger(MessageDistributor.class);
+  
+  private final BlockingQueue<Message> messageQueue;
+  private final List<MessageServiceClient> clients;
+  
+  public MessageDistributor() {
+    this.messageQueue = new LinkedBlockingQueue<Message>();
+    this.clients = new ArrayList<MessageServiceClient>();
+  }
+  
+  public void addClient(MessageServiceClient client) {
+    // There should be some synchronization around this list
+    clients.add(client);
+    LOGGER.info(String.format("Added client at %s", client.getAddy()));
+  }
+  
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        Message msg = messageQueue.take();
+
+        Iterator<MessageServiceClient> clientItr = clients.iterator();
+        while (clientItr.hasNext()) {
+          MessageServiceClient client = clientItr.next();
+          try {
+            client.sendMessage(msg);
+          } catch (TException te) {
+            // Most likely client disconnected, should remove it from the list
+            clientItr.remove();
+            LOGGER.info(String.format("Removing %s from client list.", client.getAddy()));
+            LOGGER.debug(te);
+          }
+        }
+      } catch (InterruptedException ie) {
+        LOGGER.debug(ie);
+      }
+    }
+  }
+
+  @Override
+  public void sendMessage(Message msg) throws TException {
+    messageQueue.add(msg);
+    LOGGER.info(String.format("Adding message to queue:\n%s", msg));
+  }
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/server/MessageServiceClient.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/server/MessageServiceClient.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/server/MessageServiceClient.java	(revision 4940)
@@ -0,0 +1,41 @@
+package com.joelpm.bidiMessages.server;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import com.joelpm.bidiMessages.generated.Message;
+import com.joelpm.bidiMessages.generated.MessageService;
+import com.joelpm.bidiMessages.generated.MessageService.Iface;
+
+/**
+* This class is a stub that the server can use to send messages back
+* to the client.
+*
+* @author Joel Meyer
+*/
+public class MessageServiceClient implements Iface {
+  protected final TTransport transport;
+  protected final String addy;
+  protected final int port;
+  protected final MessageService.Client client;
+  
+  public MessageServiceClient(TTransport transport) {
+    TSocket tsocket = (TSocket)transport;
+    this.transport = transport;
+    
+    this.client = new MessageService.Client(new TBinaryProtocol(transport));
+    this.addy = tsocket.getSocket().getInetAddress().getHostAddress();
+    this.port = tsocket.getSocket().getPort();
+    
+  }
+  
+  public String getAddy() {
+    return addy;
+  }
+  
+  public void sendMessage(Message msg) throws TException {
+    this.client.sendMessage(msg);
+  }
+}
+
Index: trunk/GnssCenter/thrift/bidi_java/server/Server.java
===================================================================
--- trunk/GnssCenter/thrift/bidi_java/server/Server.java	(revision 4940)
+++ trunk/GnssCenter/thrift/bidi_java/server/Server.java	(revision 4940)
@@ -0,0 +1,46 @@
+package com.joelpm.bidiMessages.server;
+
+import org.apache.log4j.Logger;
+
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import com.joelpm.bidiMessages.generated.MessageService;
+
+/**
+* A simple server that accepts messages from clients and broadcasts
+* them out to all connected clients.
+*
+* @author Joel Meyer
+*/
+public class Server {
+  private static final Logger LOGGER = Logger.getLogger(Server.class);
+  
+  public static void main(String[] args) throws Exception {
+    final int port = Integer.parseInt(args[0]);
+    
+    final MessageDistributor messageDistributor = new MessageDistributor();
+
+    new Thread(messageDistributor).start();
+    
+    // Using our own TProcessorFactory gives us an opportunity to get
+    // access to the transport right after the client connection is
+    // accepted.
+    TProcessorFactory processorFactory = new TProcessorFactory(null) {
+      @Override
+      public TProcessor getProcessor(TTransport trans) {
+        messageDistributor.addClient(new MessageServiceClient(trans));
+        return new MessageService.Processor(messageDistributor);
+      }
+    };
+
+    TServerTransport serverTransport = new TServerSocket(port);
+    TServer server = new TThreadPoolServer(processorFactory, serverTransport);
+    LOGGER.info("Server started");
+    server.serve();
+  }
+}
