source: ntrip/trunk/BNC/bncgetthread.cpp@ 349

Last change on this file since 349 was 349, checked in by mervart, 17 years ago

* empty log message *

File size: 10.3 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters,
3// written by Leos Mervart.
4//
5// Copyright (C) 2006
6// German Federal Agency for Cartography and Geodesy (BKG)
7// http://www.bkg.bund.de
8// Czech Technical University Prague, Department of Advanced Geodesy
9// http://www.fsv.cvut.cz
10//
11// Email: euref-ip@bkg.bund.de
12//
13// This program is free software; you can redistribute it and/or
14// modify it under the terms of the GNU General Public License
15// as published by the Free Software Foundation, version 2.
16//
17// This program is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU General Public License for more details.
21//
22// You should have received a copy of the GNU General Public License
23// along with this program; if not, write to the Free Software
24// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26/* -------------------------------------------------------------------------
27 * BKG NTRIP Client
28 * -------------------------------------------------------------------------
29 *
30 * Class: bncGetThread
31 *
32 * Purpose: Thread that retrieves data from NTRIP caster
33 *
34 * Author: L. Mervart
35 *
36 * Created: 24-Dec-2005
37 *
38 * Changes:
39 *
40 * -----------------------------------------------------------------------*/
41
42#include <stdlib.h>
43
44#include <QFile>
45#include <QTextStream>
46#include <QtNetwork>
47
48#include "bncgetthread.h"
49#include "bnctabledlg.h"
50#include "bncapp.h"
51#include "bncutils.h"
52
53#include "RTCM/RTCM2Decoder.h"
54#include "RTCM3/RTCM3Decoder.h"
55#include "RTIGS/RTIGSDecoder.h"
56
57using namespace std;
58
59// Constructor
60////////////////////////////////////////////////////////////////////////////
61bncGetThread::bncGetThread(const QUrl& mountPoint,
62 const QByteArray& format, int iMount) {
63 _decoder = 0;
64 _mountPoint = mountPoint;
65 _staID = mountPoint.path().mid(1).toAscii();
66 _staID_orig = _staID;
67 _format = format;
68 _socket = 0;
69 _timeOut = 20*1000; // 20 seconds
70 _nextSleep = 1; // 1 second
71 _iMount = iMount; // index in mountpoints array
72 _rinexWriter = 0;
73
74 // Check name conflict
75 // -------------------
76 QSettings settings;
77 QListIterator<QString> it(settings.value("mountPoints").toStringList());
78 int num = 0;
79 int ind = -1;
80 while (it.hasNext()) {
81 ++ind;
82 QStringList hlp = it.next().split(" ");
83 if (hlp.size() <= 1) continue;
84 QUrl url(hlp[0]);
85 if (_mountPoint.path() == url.path()) {
86 if (_iMount > ind) {
87 ++num;
88 }
89 }
90 }
91
92 _samplingRate = settings.value("rnxSampl").toInt();
93
94 if (num > 0) {
95 _staID = _staID.left(_staID.length()-1) + QString("%1").arg(num).toAscii();
96 }
97// Start Ergaenzung Perlt
98 msleep(100); //sleep 0.1 sec
99//Ende Ergaenzung Perlt
100}
101
102// Destructor
103////////////////////////////////////////////////////////////////////////////
104bncGetThread::~bncGetThread() {
105 delete _socket;
106 delete _decoder;
107}
108
109// Connect to Caster, send the Request (static)
110////////////////////////////////////////////////////////////////////////////
111QTcpSocket* bncGetThread::request(const QUrl& mountPoint, int timeOut,
112 QString& msg) {
113
114 // Connect the Socket
115 // ------------------
116 QSettings settings;
117 QString proxyHost = settings.value("proxyHost").toString();
118 int proxyPort = settings.value("proxyPort").toInt();
119
120 QTcpSocket* socket = new QTcpSocket();
121 if ( proxyHost.isEmpty() ) {
122 socket->connectToHost(mountPoint.host(), mountPoint.port());
123 }
124 else {
125 socket->connectToHost(proxyHost, proxyPort);
126 }
127 if (!socket->waitForConnected(timeOut)) {
128 msg += "Connect timeout\n";
129 delete socket;
130 return 0;
131 }
132
133 // Send Request
134 // ------------
135 QByteArray userAndPwd = mountPoint.userName().toAscii() + ":" +
136 mountPoint.password().toAscii();
137
138 QUrl hlp;
139 hlp.setScheme("http");
140 hlp.setHost(mountPoint.host());
141 hlp.setPort(mountPoint.port());
142 hlp.setPath(mountPoint.path());
143
144 QByteArray reqStr;
145 if ( proxyHost.isEmpty() ) {
146 if (hlp.path().indexOf("/") != 0) hlp.setPath("/");
147 reqStr = "GET " + hlp.path().toAscii() +
148 " HTTP/1.0\r\n"
149 "User-Agent: NTRIP BNC 1.0b\r\n"
150 "Authorization: Basic " +
151 userAndPwd.toBase64() + "\r\n\r\n";
152 } else {
153 reqStr = "GET " + hlp.toEncoded() +
154 " HTTP/1.0\r\n"
155 "User-Agent: NTRIP BNC 1.0b\r\n"
156 "Authorization: Basic " +
157 userAndPwd.toBase64() + "\r\n\r\n";
158 }
159
160 msg += reqStr;
161
162 socket->write(reqStr, reqStr.length());
163
164 if (!socket->waitForBytesWritten(timeOut)) {
165 msg += "Write timeout\n";
166 delete socket;
167 return 0;
168 }
169
170 return socket;
171}
172
173// Init Run
174////////////////////////////////////////////////////////////////////////////
175t_irc bncGetThread::initRun() {
176
177 // Send the Request
178 // ----------------
179 QString msg;
180
181 _socket = bncGetThread::request(_mountPoint, _timeOut, msg);
182
183 //// emit(newMessage(msg.toAscii()));
184
185 if (!_socket) {
186 return failure;
187 }
188
189 // Read Caster Response
190 // --------------------
191 _socket->waitForReadyRead(_timeOut);
192 if (_socket->canReadLine()) {
193 QString line = _socket->readLine();
194 if (line.indexOf("Unauthorized") != -1) {
195 QStringList table;
196 bncTableDlg::getFullTable(_mountPoint.host(), _mountPoint.port(), table);
197 QString net;
198 QStringListIterator it(table);
199 while (it.hasNext()) {
200 QString line = it.next();
201 if (line.indexOf("STR") == 0) {
202 QStringList tags = line.split(";");
203 if (tags.at(1) == _staID_orig) {
204 net = tags.at(7);
205 break;
206 }
207 }
208 }
209
210 QString reg;
211 it.toFront();
212 while (it.hasNext()) {
213 QString line = it.next();
214 if (line.indexOf("NET") == 0) {
215 QStringList tags = line.split(";");
216 if (tags.at(1) == net) {
217 reg = tags.at(7);
218 break;
219 }
220 }
221 }
222 emit(newMessage((_staID + ": Caster Response: " + line +
223 " Adjust User-ID and Password Register, see"
224 "\n " + reg).toAscii()));
225 return fatal;
226 }
227 if (line.indexOf("ICY 200 OK") != 0) {
228 emit(newMessage((_staID + ": Wrong Caster Response:\n" + line).toAscii()));
229 return failure;
230 }
231 }
232 else {
233 emit(newMessage(_staID + ": Response Timeout"));
234 return failure;
235 }
236
237 // Instantiate the filter
238 // ----------------------
239 if (!_decoder) {
240 if (_format.indexOf("RTCM_2") != -1) {
241 emit(newMessage("Get Data: " + _staID + " in RTCM 2.x format"));
242 _decoder = new RTCM2Decoder();
243 }
244 else if (_format.indexOf("RTCM_3") != -1) {
245 emit(newMessage("Get Data: " + _staID + " in RTCM 3.0 format"));
246 _decoder = new RTCM3Decoder();
247 }
248 else if (_format.indexOf("RTIGS") != -1) {
249 emit(newMessage("Get Data: " + _staID + " in RTIGS format"));
250 _decoder = new RTIGSDecoder();
251 }
252 else {
253 emit(newMessage(_staID + ": Unknown data format " + _format));
254 return fatal;
255 }
256 }
257 return success;
258}
259
260// Run
261////////////////////////////////////////////////////////////////////////////
262void bncGetThread::run() {
263
264 t_irc irc = initRun();
265
266 if (irc == fatal) {
267 QThread::exit(1);
268 return;
269 }
270 else if (irc != success) {
271 emit(newMessage(_staID + ": initRun failed, reconnecting"));
272 tryReconnect();
273 }
274
275 // Read Incoming Data
276 // ------------------
277 while (true) {
278 try {
279 if (_socket->state() != QAbstractSocket::ConnectedState) {
280 emit(newMessage(_staID + ": Socket not connected, reconnecting"));
281 tryReconnect();
282 }
283
284
285 _socket->waitForReadyRead(_timeOut);
286 qint64 nBytes = _socket->bytesAvailable();
287 if (nBytes > 0) {
288 char* data = new char[nBytes];
289 _socket->read(data, nBytes);
290 _decoder->Decode(data, nBytes);
291 delete [] data;
292 for (list<Observation*>::iterator it = _decoder->_obsList.begin();
293 it != _decoder->_obsList.end(); it++) {
294
295 // Check observation epoch
296 // -----------------------
297 int week;
298 double sec;
299 currentGPSWeeks(week, sec);
300
301 const double secPerWeek = 7.0 * 24.0 * 3600.0;
302 const double maxDt = 600.0;
303
304 if (week < (*it)->GPSWeek) {
305 week += 1;
306 sec -= secPerWeek;
307 }
308 if (week > (*it)->GPSWeek) {
309 week -= 1;
310 sec += secPerWeek;
311 }
312 double dt = fabs(sec - (*it)->GPSWeeks);
313 if (week != (*it)->GPSWeek || dt > maxDt) {
314 emit( newMessage("Wrong observation epoch") );
315 delete (*it);
316 continue;
317 }
318
319 emit newObs(_staID, *it);
320 bool firstObs = (it == _decoder->_obsList.begin());
321 if ( _global_caster->newObs(_staID, firstObs, *it) == 0 ) {
322
323 if (_rinexWriter == 0) {
324 _rinexWriter = new bncRinex((*it)->StatID, _mountPoint, _format);
325 }
326
327 long iSec = long(floor((*it)->GPSWeeks+0.5));
328 long newTime = (*it)->GPSWeek * 7*24*3600 + iSec;
329
330 if (_samplingRate == 0 || iSec % _samplingRate == 0) {
331 _rinexWriter->deepCopy(*it);
332 }
333 _rinexWriter->dumpEpoch(newTime);
334 }
335
336
337 }
338 _decoder->_obsList.clear();
339 }
340 else {
341 emit(newMessage(_staID + ": Data Timeout, reconnecting"));
342 tryReconnect();
343 }
344 }
345 catch (const char* msg) {
346 emit(newMessage(_staID + msg));
347 tryReconnect();
348 }
349 }
350}
351
352// Exit
353////////////////////////////////////////////////////////////////////////////
354void bncGetThread::exit(int exitCode) {
355 if (exitCode!= 0) {
356 emit error(_staID);
357 }
358 QThread::exit(exitCode);
359 terminate();
360}
361
362// Try Re-Connect
363////////////////////////////////////////////////////////////////////////////
364void bncGetThread::tryReconnect() {
365 while (1) {
366 delete _socket; _socket = 0;
367 sleep(_nextSleep);
368 if ( initRun() == success ) {
369 break;
370 }
371 else {
372 _nextSleep *= 2;
373 if (_nextSleep > 128) {
374 _nextSleep = 128;
375 }
376 _nextSleep += rand() % 6;
377 }
378 }
379 _nextSleep = 1;
380}
Note: See TracBrowser for help on using the repository browser.