[4941] | 1 | #include "ConnectionStatusMonitor.h"
|
---|
| 2 | #include "ConnectionRequiredRunnable.h"
|
---|
| 3 |
|
---|
| 4 | #include <cstdio>
|
---|
| 5 |
|
---|
| 6 | using namespace apache::thrift::transport;
|
---|
| 7 | using namespace apache::thrift::concurrency;
|
---|
| 8 |
|
---|
| 9 | ConnectionStatusMonitor::ConnectionStatusMonitor(boost::shared_ptr<TTransport>& transport,
|
---|
| 10 | boost::shared_ptr<TimerManager>& timeMgr)
|
---|
| 11 | : listeners_(),
|
---|
| 12 | transport_(transport),
|
---|
| 13 | timeMgr_(timeMgr),
|
---|
| 14 | connected_(0)
|
---|
| 15 | {
|
---|
| 16 | }
|
---|
| 17 |
|
---|
| 18 | ConnectionStatusMonitor::~ConnectionStatusMonitor() {
|
---|
| 19 | }
|
---|
| 20 |
|
---|
| 21 | void
|
---|
| 22 | ConnectionStatusMonitor::addListener(boost::shared_ptr<ConnectionRequiredRunnable> listener) {
|
---|
| 23 | listeners_.insert(listener);
|
---|
| 24 | }
|
---|
| 25 |
|
---|
| 26 | bool
|
---|
| 27 | ConnectionStatusMonitor::connected() {
|
---|
| 28 | return (((long)connected_ == 0) ? false : true);
|
---|
| 29 | //return connected_.load(); // if using C++0x
|
---|
| 30 | }
|
---|
| 31 |
|
---|
| 32 | void
|
---|
| 33 | ConnectionStatusMonitor::disconnected(const ConnectionRequiredRunnable* noticer) {
|
---|
| 34 | bool expected=true;
|
---|
| 35 | if ((long)connected_ != 0) {
|
---|
| 36 | //if (connected_.compare_exchange_strong(expected,false)) // if using C++0x
|
---|
| 37 | --connected_;
|
---|
| 38 | std::set<boost::shared_ptr<ConnectionRequiredRunnable> >::iterator listener;
|
---|
| 39 | for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) {
|
---|
| 40 | //for (auto listener=listeners_.begin(); listener != listeners_.end(); ++listener) // if using C++0x
|
---|
| 41 | // The thread running the noticer is our current execution thread. If we
|
---|
| 42 | // notify it it'll block and we'll be deadlocked. Since it noticed the
|
---|
| 43 | // disconnect it is responsible for initiating its own wait state.
|
---|
| 44 | if (listener->get() == noticer) continue;
|
---|
| 45 | (*listener)->connectionLost();
|
---|
| 46 | }
|
---|
| 47 | }
|
---|
| 48 | // Try to reconnect in five seconds
|
---|
| 49 | timeMgr_->add(boost::shared_ptr<Runnable>(new Reconnect(this)), 5 * 1000);
|
---|
| 50 | }
|
---|
| 51 |
|
---|
| 52 | void
|
---|
| 53 | ConnectionStatusMonitor::tryOpen() {
|
---|
| 54 | if ((long)connected_ != 0)
|
---|
| 55 | //if (connected_.load()) // if using C++0x
|
---|
| 56 | return;
|
---|
| 57 |
|
---|
| 58 | // make sure it's closed
|
---|
| 59 | transport_->close();
|
---|
| 60 | try {
|
---|
| 61 | transport_->open();
|
---|
| 62 | ++connected_;
|
---|
| 63 | //connected_.store(true); // if using C++0x
|
---|
| 64 | std::set<boost::shared_ptr<ConnectionRequiredRunnable> >::iterator listener;
|
---|
| 65 | for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) {
|
---|
| 66 | //for (auto listener = listeners_.begin(); listener != listeners_.end(); ++listener) { // if using C++0x
|
---|
| 67 | (*listener)->connectionEstablished();
|
---|
| 68 | }
|
---|
| 69 | return;
|
---|
| 70 | } catch (TTransportException& e) {
|
---|
| 71 | fprintf(stderr,"opening transport failed\n");
|
---|
| 72 | }
|
---|
| 73 | // Try to reconnect in five seconds
|
---|
| 74 | timeMgr_->add(boost::shared_ptr<Runnable>(new Reconnect(this)), 5 * 1000);
|
---|
| 75 | }
|
---|
| 76 |
|
---|
| 77 | void
|
---|
| 78 | ConnectionStatusMonitor::stop() {
|
---|
| 79 | std::set<boost::shared_ptr<ConnectionRequiredRunnable> >::iterator listener;
|
---|
| 80 | for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) {
|
---|
| 81 | //for (auto listener = listeners_.begin(); listener != listeners_.end(); ++listener) // if using C++0x
|
---|
| 82 | (*listener)->stop();
|
---|
| 83 | }
|
---|
| 84 | listeners_.clear();
|
---|
| 85 | }
|
---|
| 86 |
|
---|