package com.joelpm.bidiMessages.client; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; import com.joelpm.bidiMessages.generated.Message; import com.joelpm.bidiMessages.generated.MessageService; /** * The class responsible for sending messages to the server. * * @author Joel Meyer */ public class MessageSender extends ConnectionRequiredRunnable { private final MessageService.Client client; private final BlockingQueue msgSendQueue; public MessageSender( TProtocol protocol, ConnectionStatusMonitor connectionMonitor) { super(connectionMonitor, "Message Sender"); this.client = new MessageService.Client(protocol); this.msgSendQueue = new LinkedBlockingQueue(); } public void send(Message msg) { msgSendQueue.add(msg); } @Override public void run() { connectWait(); while (true) { try { Message msg = msgSendQueue.take(); try { client.sendMessage(msg); } catch (TException e) { // The message isn't lost, but it could end up being sent out of // order - not ideal. msgSendQueue.add(msg); disconnected(); } } catch (InterruptedException e) { // Thread will be interrupted if connection is lost, we should wait // for reconnection if that happens. connectWait(); } } } }