source: ntrip/trunk/BNC/bncgetthread.cpp@ 278

Last change on this file since 278 was 278, checked in by mervart, 17 years ago

* empty log message *

File size: 8.0 KB
Line 
1
2/* -------------------------------------------------------------------------
3 * BKG NTRIP Client
4 * -------------------------------------------------------------------------
5 *
6 * Class: bncGetThread
7 *
8 * Purpose: Thread that retrieves data from NTRIP caster
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include <stdlib.h>
19
20#include <QFile>
21#include <QTextStream>
22#include <QtNetwork>
23
24#include "bncgetthread.h"
25#include "bnctabledlg.h"
26#include "bncapp.h"
27
28#include "RTCM/RTCM2Decoder.h"
29#include "RTCM3/rtcm3.h"
30#include "RTIGS/rtigs.h"
31
32using namespace std;
33
34// Constructor
35////////////////////////////////////////////////////////////////////////////
36bncGetThread::bncGetThread(const QUrl& mountPoint,
37 const QByteArray& format, int iMount) {
38 _decoder = 0;
39 _mountPoint = mountPoint;
40 _staID = mountPoint.path().mid(1).toAscii();
41 _staID_orig = _staID;
42 _format = format;
43 _socket = 0;
44 _timeOut = 20*1000; // 20 seconds
45 _nextSleep = 1; // 1 second
46 _iMount = iMount; // index in mountpoints array
47
48 // Check name conflict
49 // -------------------
50 QSettings settings;
51 QListIterator<QString> it(settings.value("mountPoints").toStringList());
52 int num = 0;
53 int ind = -1;
54 while (it.hasNext()) {
55 ++ind;
56 QStringList hlp = it.next().split(" ");
57 if (hlp.size() <= 1) continue;
58 QUrl url(hlp[0]);
59 if (_mountPoint.path() == url.path()) {
60 if (_iMount > ind) {
61 ++num;
62 }
63 }
64 }
65
66 if (num > 0) {
67 _staID = _staID.left(_staID.length()-1) + QString("%1").arg(num).toAscii();
68 }
69}
70
71// Destructor
72////////////////////////////////////////////////////////////////////////////
73bncGetThread::~bncGetThread() {
74 delete _socket;
75 delete _decoder;
76}
77
78// Connect to Caster, send the Request (static)
79////////////////////////////////////////////////////////////////////////////
80QTcpSocket* bncGetThread::request(const QUrl& mountPoint, int timeOut,
81 QString& msg) {
82
83 // Connect the Socket
84 // ------------------
85 QSettings settings;
86 QString proxyHost = settings.value("proxyHost").toString();
87 int proxyPort = settings.value("proxyPort").toInt();
88
89 QTcpSocket* socket = new QTcpSocket();
90 if ( proxyHost.isEmpty() ) {
91 socket->connectToHost(mountPoint.host(), mountPoint.port());
92 }
93 else {
94 socket->connectToHost(proxyHost, proxyPort);
95 }
96 if (!socket->waitForConnected(timeOut)) {
97 msg += "Connect timeout\n";
98 delete socket;
99 return 0;
100 }
101
102 // Send Request
103 // ------------
104 QByteArray userAndPwd = mountPoint.userName().toAscii() + ":" +
105 mountPoint.password().toAscii();
106
107 QUrl hlp;
108 hlp.setScheme("http");
109 hlp.setHost(mountPoint.host());
110 hlp.setPort(mountPoint.port());
111 hlp.setPath(mountPoint.path());
112
113 QByteArray reqStr;
114 if ( proxyHost.isEmpty() ) {
115 if (hlp.path().indexOf("/") != 0) hlp.setPath("/");
116 reqStr = "GET " + hlp.path().toAscii() +
117 " HTTP/1.0\r\n"
118 "User-Agent: NTRIP BNC 1.0b\r\n"
119 "Authorization: Basic " +
120 userAndPwd.toBase64() + "\r\n\r\n";
121 } else {
122 reqStr = "GET " + hlp.toEncoded() +
123 " HTTP/1.0\r\n"
124 "User-Agent: NTRIP BNC 1.0b\r\n"
125 "Authorization: Basic " +
126 userAndPwd.toBase64() + "\r\n\r\n";
127 }
128
129 msg += reqStr;
130
131 socket->write(reqStr, reqStr.length());
132
133 if (!socket->waitForBytesWritten(timeOut)) {
134 msg += "Write timeout\n";
135 delete socket;
136 return 0;
137 }
138
139 return socket;
140}
141
142// Init Run
143////////////////////////////////////////////////////////////////////////////
144t_irc bncGetThread::initRun() {
145
146 // Send the Request
147 // ----------------
148 QString msg;
149
150 _socket = bncGetThread::request(_mountPoint, _timeOut, msg);
151
152 //// emit(newMessage(msg.toAscii()));
153
154 if (!_socket) {
155 return failure;
156 }
157
158 // Read Caster Response
159 // --------------------
160 _socket->waitForReadyRead(_timeOut);
161 if (_socket->canReadLine()) {
162 QString line = _socket->readLine();
163 if (line.indexOf("Unauthorized") != -1) {
164 QStringList table;
165 bncTableDlg::getFullTable(_mountPoint.host(), _mountPoint.port(), table);
166 QString net;
167 QStringListIterator it(table);
168 while (it.hasNext()) {
169 QString line = it.next();
170 if (line.indexOf("STR") == 0) {
171 QStringList tags = line.split(";");
172 if (tags.at(1) == _staID_orig) {
173 net = tags.at(7);
174 break;
175 }
176 }
177 }
178
179 QString reg;
180 it.toFront();
181 while (it.hasNext()) {
182 QString line = it.next();
183 if (line.indexOf("NET") == 0) {
184 QStringList tags = line.split(";");
185 if (tags.at(1) == net) {
186 reg = tags.at(7);
187 break;
188 }
189 }
190 }
191 emit(newMessage((_staID + ": Caster Response: " + line +
192 " Adjust User-ID and Password Register, see"
193 "\n " + reg).toAscii()));
194 return fatal;
195 }
196 if (line.indexOf("ICY 200 OK") != 0) {
197 emit(newMessage((_staID + ": Wrong Caster Response:\n" + line).toAscii()));
198 return failure;
199 }
200 }
201 else {
202 emit(newMessage(_staID + ": Response Timeout"));
203 return failure;
204 }
205
206 // Instantiate the filter
207 // ----------------------
208 if (!_decoder) {
209 if (_format.indexOf("RTCM_2") != -1) {
210 emit(newMessage("Get Data: " + _staID + " in RTCM 2.x format"));
211 _decoder = new RTCM2Decoder();
212 }
213 else if (_format.indexOf("RTCM_3") != -1) {
214 emit(newMessage("Get Data: " + _staID + " in RTCM 3.0 format"));
215 _decoder = new rtcm3();
216 }
217 else if (_format.indexOf("RTIGS") != -1) {
218 emit(newMessage("Get Data: " + _staID + " in RTIGS format"));
219 _decoder = new rtigs();
220 }
221 else {
222 emit(newMessage(_staID + ": Unknown data format " + _format));
223 return fatal;
224 }
225 }
226 return success;
227}
228
229// Run
230////////////////////////////////////////////////////////////////////////////
231void bncGetThread::run() {
232
233 t_irc irc = initRun();
234
235 if (irc == fatal) {
236 QThread::exit(1);
237 return;
238 }
239 else if (irc != success) {
240 emit(newMessage(_staID + ": initRun failed, reconnecting"));
241 tryReconnect();
242 }
243
244 // Read Incoming Data
245 // ------------------
246 while (true) {
247 try {
248 if (_socket->state() != QAbstractSocket::ConnectedState) {
249 emit(newMessage(_staID + ": Socket not connected, reconnecting"));
250 tryReconnect();
251 }
252
253
254 _socket->waitForReadyRead(_timeOut);
255 qint64 nBytes = _socket->bytesAvailable();
256 if (nBytes > 0) {
257 char* data = new char[nBytes];
258 _socket->read(data, nBytes);
259 _decoder->Decode(data, nBytes);
260 delete data;
261 for (list<Observation*>::iterator it = _decoder->_obsList.begin();
262 it != _decoder->_obsList.end(); it++) {
263 emit newObs(_staID, *it);
264 bool firstObs = (it == _decoder->_obsList.begin());
265 _global_caster->newObs(_staID, _mountPoint, firstObs, *it);
266 }
267 _decoder->_obsList.clear();
268 }
269 else {
270 emit(newMessage(_staID + ": Data Timeout, reconnecting"));
271 tryReconnect();
272 }
273 }
274 catch (const char* msg) {
275 emit(newMessage(_staID + msg));
276 tryReconnect();
277 }
278 }
279}
280
281// Exit
282////////////////////////////////////////////////////////////////////////////
283void bncGetThread::exit(int exitCode) {
284 if (exitCode!= 0) {
285 emit error(_staID);
286 }
287 QThread::exit(exitCode);
288 terminate();
289}
290
291// Try Re-Connect
292////////////////////////////////////////////////////////////////////////////
293void bncGetThread::tryReconnect() {
294 while (1) {
295 delete _socket; _socket = 0;
296 sleep(_nextSleep);
297 if ( initRun() == success ) {
298 break;
299 }
300 else {
301 _nextSleep *= 2;
302 if (_nextSleep > 128) {
303 _nextSleep = 128;
304 }
305 _nextSleep += rand() % 6;
306 }
307 }
308 _nextSleep = 1;
309}
Note: See TracBrowser for help on using the repository browser.