#include "ConnectionStatusMonitor.h" #include "ConnectionRequiredRunnable.h" #include using namespace apache::thrift::transport; using namespace apache::thrift::concurrency; ConnectionStatusMonitor::ConnectionStatusMonitor(boost::shared_ptr& transport, boost::shared_ptr& timeMgr) : listeners_(), transport_(transport), timeMgr_(timeMgr), connected_(0) { } ConnectionStatusMonitor::~ConnectionStatusMonitor() { } void ConnectionStatusMonitor::addListener(boost::shared_ptr listener) { listeners_.insert(listener); } bool ConnectionStatusMonitor::connected() { return (((long)connected_ == 0) ? false : true); //return connected_.load(); // if using C++0x } void ConnectionStatusMonitor::disconnected(const ConnectionRequiredRunnable* noticer) { bool expected=true; if ((long)connected_ != 0) { //if (connected_.compare_exchange_strong(expected,false)) // if using C++0x --connected_; std::set >::iterator listener; for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) { //for (auto listener=listeners_.begin(); listener != listeners_.end(); ++listener) // if using C++0x // The thread running the noticer is our current execution thread. If we // notify it it'll block and we'll be deadlocked. Since it noticed the // disconnect it is responsible for initiating its own wait state. if (listener->get() == noticer) continue; (*listener)->connectionLost(); } } // Try to reconnect in five seconds timeMgr_->add(boost::shared_ptr(new Reconnect(this)), 5 * 1000); } void ConnectionStatusMonitor::tryOpen() { if ((long)connected_ != 0) //if (connected_.load()) // if using C++0x return; // make sure it's closed transport_->close(); try { transport_->open(); ++connected_; //connected_.store(true); // if using C++0x std::set >::iterator listener; for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) { //for (auto listener = listeners_.begin(); listener != listeners_.end(); ++listener) { // if using C++0x (*listener)->connectionEstablished(); } return; } catch (TTransportException& e) { fprintf(stderr,"opening transport failed\n"); } // Try to reconnect in five seconds timeMgr_->add(boost::shared_ptr(new Reconnect(this)), 5 * 1000); } void ConnectionStatusMonitor::stop() { std::set >::iterator listener; for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) { //for (auto listener = listeners_.begin(); listener != listeners_.end(); ++listener) // if using C++0x (*listener)->stop(); } listeners_.clear(); }