source: ntrip/trunk/BNC/bncgetthread.cpp@ 352

Last change on this file since 352 was 352, checked in by mervart, 17 years ago

* empty log message *

File size: 9.8 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters,
3// written by Leos Mervart.
4//
5// Copyright (C) 2006
6// German Federal Agency for Cartography and Geodesy (BKG)
7// http://www.bkg.bund.de
8// Czech Technical University Prague, Department of Advanced Geodesy
9// http://www.fsv.cvut.cz
10//
11// Email: euref-ip@bkg.bund.de
12//
13// This program is free software; you can redistribute it and/or
14// modify it under the terms of the GNU General Public License
15// as published by the Free Software Foundation, version 2.
16//
17// This program is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU General Public License for more details.
21//
22// You should have received a copy of the GNU General Public License
23// along with this program; if not, write to the Free Software
24// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26/* -------------------------------------------------------------------------
27 * BKG NTRIP Client
28 * -------------------------------------------------------------------------
29 *
30 * Class: bncGetThread
31 *
32 * Purpose: Thread that retrieves data from NTRIP caster
33 *
34 * Author: L. Mervart
35 *
36 * Created: 24-Dec-2005
37 *
38 * Changes:
39 *
40 * -----------------------------------------------------------------------*/
41
42#include <stdlib.h>
43
44#include <QFile>
45#include <QTextStream>
46#include <QtNetwork>
47
48#include "bncgetthread.h"
49#include "bnctabledlg.h"
50#include "bncapp.h"
51#include "bncutils.h"
52
53#include "RTCM/RTCM2Decoder.h"
54#include "RTCM3/RTCM3Decoder.h"
55#include "RTIGS/RTIGSDecoder.h"
56
57using namespace std;
58
59// Constructor
60////////////////////////////////////////////////////////////////////////////
61bncGetThread::bncGetThread(const QUrl& mountPoint,
62 const QByteArray& format, int iMount) {
63 _decoder = 0;
64 _mountPoint = mountPoint;
65 _staID = mountPoint.path().mid(1).toAscii();
66 _staID_orig = _staID;
67 _format = format;
68 _socket = 0;
69 _timeOut = 20*1000; // 20 seconds
70 _nextSleep = 1; // 1 second
71 _iMount = iMount; // index in mountpoints array
72
73 // Check name conflict
74 // -------------------
75 QSettings settings;
76 QListIterator<QString> it(settings.value("mountPoints").toStringList());
77 int num = 0;
78 int ind = -1;
79 while (it.hasNext()) {
80 ++ind;
81 QStringList hlp = it.next().split(" ");
82 if (hlp.size() <= 1) continue;
83 QUrl url(hlp[0]);
84 if (_mountPoint.path() == url.path()) {
85 if (_iMount > ind) {
86 ++num;
87 }
88 }
89 }
90
91 if (num > 0) {
92 _staID = _staID.left(_staID.length()-1) + QString("%1").arg(num).toAscii();
93 }
94// Start Ergaenzung Perlt
95 msleep(100); //sleep 0.1 sec
96//Ende Ergaenzung Perlt
97}
98
99// Destructor
100////////////////////////////////////////////////////////////////////////////
101bncGetThread::~bncGetThread() {
102 delete _socket;
103 delete _decoder;
104}
105
106// Connect to Caster, send the Request (static)
107////////////////////////////////////////////////////////////////////////////
108QTcpSocket* bncGetThread::request(const QUrl& mountPoint, int timeOut,
109 QString& msg) {
110
111 // Connect the Socket
112 // ------------------
113 QSettings settings;
114 QString proxyHost = settings.value("proxyHost").toString();
115 int proxyPort = settings.value("proxyPort").toInt();
116
117 QTcpSocket* socket = new QTcpSocket();
118 if ( proxyHost.isEmpty() ) {
119 socket->connectToHost(mountPoint.host(), mountPoint.port());
120 }
121 else {
122 socket->connectToHost(proxyHost, proxyPort);
123 }
124 if (!socket->waitForConnected(timeOut)) {
125 msg += "Connect timeout\n";
126 delete socket;
127 return 0;
128 }
129
130 // Send Request
131 // ------------
132 QByteArray userAndPwd = mountPoint.userName().toAscii() + ":" +
133 mountPoint.password().toAscii();
134
135 QUrl hlp;
136 hlp.setScheme("http");
137 hlp.setHost(mountPoint.host());
138 hlp.setPort(mountPoint.port());
139 hlp.setPath(mountPoint.path());
140
141 QByteArray reqStr;
142 if ( proxyHost.isEmpty() ) {
143 if (hlp.path().indexOf("/") != 0) hlp.setPath("/");
144 reqStr = "GET " + hlp.path().toAscii() +
145 " HTTP/1.0\r\n"
146 "User-Agent: NTRIP BNC 1.0b\r\n"
147 "Authorization: Basic " +
148 userAndPwd.toBase64() + "\r\n\r\n";
149 } else {
150 reqStr = "GET " + hlp.toEncoded() +
151 " HTTP/1.0\r\n"
152 "User-Agent: NTRIP BNC 1.0b\r\n"
153 "Authorization: Basic " +
154 userAndPwd.toBase64() + "\r\n\r\n";
155 }
156
157 msg += reqStr;
158
159 socket->write(reqStr, reqStr.length());
160
161 if (!socket->waitForBytesWritten(timeOut)) {
162 msg += "Write timeout\n";
163 delete socket;
164 return 0;
165 }
166
167 return socket;
168}
169
170// Init Run
171////////////////////////////////////////////////////////////////////////////
172t_irc bncGetThread::initRun() {
173
174 // Send the Request
175 // ----------------
176 QString msg;
177
178 _socket = bncGetThread::request(_mountPoint, _timeOut, msg);
179
180 //// emit(newMessage(msg.toAscii()));
181
182 if (!_socket) {
183 return failure;
184 }
185
186 // Read Caster Response
187 // --------------------
188 _socket->waitForReadyRead(_timeOut);
189 if (_socket->canReadLine()) {
190 QString line = _socket->readLine();
191 if (line.indexOf("Unauthorized") != -1) {
192 QStringList table;
193 bncTableDlg::getFullTable(_mountPoint.host(), _mountPoint.port(), table);
194 QString net;
195 QStringListIterator it(table);
196 while (it.hasNext()) {
197 QString line = it.next();
198 if (line.indexOf("STR") == 0) {
199 QStringList tags = line.split(";");
200 if (tags.at(1) == _staID_orig) {
201 net = tags.at(7);
202 break;
203 }
204 }
205 }
206
207 QString reg;
208 it.toFront();
209 while (it.hasNext()) {
210 QString line = it.next();
211 if (line.indexOf("NET") == 0) {
212 QStringList tags = line.split(";");
213 if (tags.at(1) == net) {
214 reg = tags.at(7);
215 break;
216 }
217 }
218 }
219 emit(newMessage((_staID + ": Caster Response: " + line +
220 " Adjust User-ID and Password Register, see"
221 "\n " + reg).toAscii()));
222 return fatal;
223 }
224 if (line.indexOf("ICY 200 OK") != 0) {
225 emit(newMessage((_staID + ": Wrong Caster Response:\n" + line).toAscii()));
226 return failure;
227 }
228 }
229 else {
230 emit(newMessage(_staID + ": Response Timeout"));
231 return failure;
232 }
233
234 // Instantiate the filter
235 // ----------------------
236 if (!_decoder) {
237 if (_format.indexOf("RTCM_2") != -1) {
238 emit(newMessage("Get Data: " + _staID + " in RTCM 2.x format"));
239 _decoder = new RTCM2Decoder();
240 }
241 else if (_format.indexOf("RTCM_3") != -1) {
242 emit(newMessage("Get Data: " + _staID + " in RTCM 3.0 format"));
243 _decoder = new RTCM3Decoder();
244 }
245 else if (_format.indexOf("RTIGS") != -1) {
246 emit(newMessage("Get Data: " + _staID + " in RTIGS format"));
247 _decoder = new RTIGSDecoder();
248 }
249 else {
250 emit(newMessage(_staID + ": Unknown data format " + _format));
251 return fatal;
252 }
253 }
254 return success;
255}
256
257// Run
258////////////////////////////////////////////////////////////////////////////
259void bncGetThread::run() {
260
261 t_irc irc = initRun();
262
263 if (irc == fatal) {
264 QThread::exit(1);
265 return;
266 }
267 else if (irc != success) {
268 emit(newMessage(_staID + ": initRun failed, reconnecting"));
269 tryReconnect();
270 }
271
272 // Read Incoming Data
273 // ------------------
274 while (true) {
275 try {
276 if (_socket->state() != QAbstractSocket::ConnectedState) {
277 emit(newMessage(_staID + ": Socket not connected, reconnecting"));
278 tryReconnect();
279 }
280
281
282 _socket->waitForReadyRead(_timeOut);
283 qint64 nBytes = _socket->bytesAvailable();
284 if (nBytes > 0) {
285 char* data = new char[nBytes];
286 _socket->read(data, nBytes);
287 _decoder->Decode(data, nBytes);
288 delete [] data;
289 for (list<Observation*>::iterator it = _decoder->_obsList.begin();
290 it != _decoder->_obsList.end(); it++) {
291
292 // Check observation epoch
293 // -----------------------
294 int week;
295 double sec;
296 currentGPSWeeks(week, sec);
297
298 const double secPerWeek = 7.0 * 24.0 * 3600.0;
299 const double maxDt = 600.0;
300
301 if (week < (*it)->GPSWeek) {
302 week += 1;
303 sec -= secPerWeek;
304 }
305 if (week > (*it)->GPSWeek) {
306 week -= 1;
307 sec += secPerWeek;
308 }
309 double dt = fabs(sec - (*it)->GPSWeeks);
310 if (week != (*it)->GPSWeek || dt > maxDt) {
311 emit( newMessage("Wrong observation epoch") );
312 delete (*it);
313 continue;
314 }
315
316 emit newObs(_staID, *it);
317 bool firstObs = (it == _decoder->_obsList.begin());
318 _global_caster->newObs(_staID, _mountPoint, firstObs, *it, _format);
319 }
320 _decoder->_obsList.clear();
321 }
322 else {
323 emit(newMessage(_staID + ": Data Timeout, reconnecting"));
324 tryReconnect();
325 }
326 }
327 catch (const char* msg) {
328 emit(newMessage(_staID + msg));
329 tryReconnect();
330 }
331 }
332}
333
334// Exit
335////////////////////////////////////////////////////////////////////////////
336void bncGetThread::exit(int exitCode) {
337 if (exitCode!= 0) {
338 emit error(_staID);
339 }
340 QThread::exit(exitCode);
341 terminate();
342}
343
344// Try Re-Connect
345////////////////////////////////////////////////////////////////////////////
346void bncGetThread::tryReconnect() {
347 while (1) {
348 delete _socket; _socket = 0;
349 sleep(_nextSleep);
350 if ( initRun() == success ) {
351 break;
352 }
353 else {
354 _nextSleep *= 2;
355 if (_nextSleep > 128) {
356 _nextSleep = 128;
357 }
358 _nextSleep += rand() % 6;
359 }
360 }
361 _nextSleep = 1;
362}
Note: See TracBrowser for help on using the repository browser.