1 | #include "ConnectionStatusMonitor.h"
|
---|
2 | #include "ConnectionRequiredRunnable.h"
|
---|
3 |
|
---|
4 | #include <cstdio>
|
---|
5 |
|
---|
6 | using namespace apache::thrift::transport;
|
---|
7 | using namespace apache::thrift::concurrency;
|
---|
8 |
|
---|
9 | ConnectionStatusMonitor::ConnectionStatusMonitor(boost::shared_ptr<TTransport>& transport,
|
---|
10 | boost::shared_ptr<TimerManager>& timeMgr)
|
---|
11 | : listeners_(),
|
---|
12 | transport_(transport),
|
---|
13 | timeMgr_(timeMgr),
|
---|
14 | connected_(0)
|
---|
15 | {
|
---|
16 | }
|
---|
17 |
|
---|
18 | ConnectionStatusMonitor::~ConnectionStatusMonitor() {
|
---|
19 | }
|
---|
20 |
|
---|
21 | void
|
---|
22 | ConnectionStatusMonitor::addListener(boost::shared_ptr<ConnectionRequiredRunnable> listener) {
|
---|
23 | listeners_.insert(listener);
|
---|
24 | }
|
---|
25 |
|
---|
26 | bool
|
---|
27 | ConnectionStatusMonitor::connected() {
|
---|
28 | return (((long)connected_ == 0) ? false : true);
|
---|
29 | //return connected_.load(); // if using C++0x
|
---|
30 | }
|
---|
31 |
|
---|
32 | void
|
---|
33 | ConnectionStatusMonitor::disconnected(const ConnectionRequiredRunnable* noticer) {
|
---|
34 | bool expected=true;
|
---|
35 | if ((long)connected_ != 0) {
|
---|
36 | //if (connected_.compare_exchange_strong(expected,false)) // if using C++0x
|
---|
37 | --connected_;
|
---|
38 | std::set<boost::shared_ptr<ConnectionRequiredRunnable> >::iterator listener;
|
---|
39 | for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) {
|
---|
40 | //for (auto listener=listeners_.begin(); listener != listeners_.end(); ++listener) // if using C++0x
|
---|
41 | // The thread running the noticer is our current execution thread. If we
|
---|
42 | // notify it it'll block and we'll be deadlocked. Since it noticed the
|
---|
43 | // disconnect it is responsible for initiating its own wait state.
|
---|
44 | if (listener->get() == noticer) continue;
|
---|
45 | (*listener)->connectionLost();
|
---|
46 | }
|
---|
47 | }
|
---|
48 | // Try to reconnect in five seconds
|
---|
49 | timeMgr_->add(boost::shared_ptr<Runnable>(new Reconnect(this)), 5 * 1000);
|
---|
50 | }
|
---|
51 |
|
---|
52 | void
|
---|
53 | ConnectionStatusMonitor::tryOpen() {
|
---|
54 | if ((long)connected_ != 0)
|
---|
55 | //if (connected_.load()) // if using C++0x
|
---|
56 | return;
|
---|
57 |
|
---|
58 | // make sure it's closed
|
---|
59 | transport_->close();
|
---|
60 | try {
|
---|
61 | transport_->open();
|
---|
62 | ++connected_;
|
---|
63 | //connected_.store(true); // if using C++0x
|
---|
64 | std::set<boost::shared_ptr<ConnectionRequiredRunnable> >::iterator listener;
|
---|
65 | for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) {
|
---|
66 | //for (auto listener = listeners_.begin(); listener != listeners_.end(); ++listener) { // if using C++0x
|
---|
67 | (*listener)->connectionEstablished();
|
---|
68 | }
|
---|
69 | return;
|
---|
70 | } catch (TTransportException& e) {
|
---|
71 | fprintf(stderr,"opening transport failed\n");
|
---|
72 | }
|
---|
73 | // Try to reconnect in five seconds
|
---|
74 | timeMgr_->add(boost::shared_ptr<Runnable>(new Reconnect(this)), 5 * 1000);
|
---|
75 | }
|
---|
76 |
|
---|
77 | void
|
---|
78 | ConnectionStatusMonitor::stop() {
|
---|
79 | std::set<boost::shared_ptr<ConnectionRequiredRunnable> >::iterator listener;
|
---|
80 | for (listener=listeners_.begin(); listener != listeners_.end(); ++listener) {
|
---|
81 | //for (auto listener = listeners_.begin(); listener != listeners_.end(); ++listener) // if using C++0x
|
---|
82 | (*listener)->stop();
|
---|
83 | }
|
---|
84 | listeners_.clear();
|
---|
85 | }
|
---|
86 |
|
---|