source: ntrip/trunk/BNC/bnccaster.cpp@ 1225

Last change on this file since 1225 was 1225, checked in by mervart, 15 years ago

* empty log message *

File size: 14.5 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <math.h>
42#include <unistd.h>
43
44#include "bnccaster.h"
45#include "bncapp.h"
46#include "bncgetthread.h"
47#include "bncutils.h"
48#include "RTCM/GPSDecoder.h"
49
50// Constructor
51////////////////////////////////////////////////////////////////////////////
52bncCaster::bncCaster(const QString& outFileName, int port) {
53
54 QSettings settings;
55
56 if ( !outFileName.isEmpty() ) {
57 QString lName = outFileName;
58 expandEnvVar(lName);
59 _outFile = new QFile(lName);
60 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
61 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
62 }
63 else {
64 _outFile->open(QIODevice::WriteOnly);
65 }
66 _out = new QTextStream(_outFile);
67 _out->setRealNumberNotation(QTextStream::FixedNotation);
68 }
69 else {
70 _outFile = 0;
71 _out = 0;
72 }
73
74 _port = port;
75
76 if (_port != 0) {
77 _server = new QTcpServer;
78 _server->listen(QHostAddress::Any, _port);
79 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
80 _sockets = new QList<QTcpSocket*>;
81 }
82 else {
83 _server = 0;
84 _sockets = 0;
85 }
86
87 int uPort = settings.value("outUPort").toInt();
88 if (uPort != 0) {
89 _uServer = new QTcpServer;
90 _uServer->listen(QHostAddress::Any, uPort);
91 connect(_uServer, SIGNAL(newConnection()), this, SLOT(slotNewUConnection()));
92 _uSockets = new QList<QTcpSocket*>;
93 }
94 else {
95 _uServer = 0;
96 _uSockets = 0;
97 }
98
99 _epochs = new QMultiMap<long, p_obs>;
100
101 _lastDumpSec = 0;
102
103 _confTimer = 0;
104
105 connect(this, SIGNAL(newMessage(QByteArray)),
106 (bncApp*) qApp, SLOT(slotMessage(const QByteArray)));
107}
108
109// Destructor
110////////////////////////////////////////////////////////////////////////////
111bncCaster::~bncCaster() {
112 QListIterator<bncGetThread*> it(_threads);
113 while(it.hasNext()){
114 bncGetThread* thread = it.next();
115 thread->terminate();
116 thread->wait();
117 delete thread;
118 }
119 delete _out;
120 delete _outFile;
121 delete _server;
122 delete _sockets;
123 delete _uServer;
124 delete _uSockets;
125 if (_epochs) {
126 QListIterator<p_obs> it(_epochs->values());
127 while (it.hasNext()) {
128 delete it.next();
129 }
130 delete _epochs;
131 }
132}
133
134// New Observations
135////////////////////////////////////////////////////////////////////////////
136void bncCaster::newObs(const QByteArray staID, bool firstObs, p_obs obs) {
137
138 QMutexLocker locker(&_mutex);
139
140 obs->_status = t_obs::received;
141
142 long iSec = long(floor(obs->_o.GPSWeeks+0.5));
143 long newTime = obs->_o.GPSWeek * 7*24*3600 + iSec;
144
145 // Rename the Station
146 // ------------------
147 strncpy(obs->_o.StatID, staID.constData(),sizeof(obs->_o.StatID));
148 obs->_o.StatID[sizeof(obs->_o.StatID)-1] = '\0';
149
150 const char begObs[] = "BEGOBS";
151 const int begObsNBytes = sizeof(begObs) - 1;
152
153 // Output into the socket
154 // ----------------------
155 if (_uSockets) {
156 QMutableListIterator<QTcpSocket*> is(*_uSockets);
157 while (is.hasNext()) {
158 QTcpSocket* sock = is.next();
159 if (sock->state() == QAbstractSocket::ConnectedState) {
160 bool ok = true;
161 if (myWrite(sock, begObs, begObsNBytes) != begObsNBytes) {
162 ok = false;
163 }
164 int numBytes = sizeof(obs->_o);
165 if (myWrite(sock, (const char*)(&obs->_o), numBytes) != numBytes) {
166 ok = false;
167 }
168 if (!ok) {
169 delete sock;
170 is.remove();
171 }
172 }
173 else if (sock->state() != QAbstractSocket::ConnectingState) {
174 delete sock;
175 is.remove();
176 }
177 }
178 }
179
180 // First time, set the _lastDumpSec immediately
181 // --------------------------------------------
182 if (_lastDumpSec == 0) {
183 _lastDumpSec = newTime - 1;
184 }
185
186 // An old observation - throw it away
187 // ----------------------------------
188 if (newTime <= _lastDumpSec) {
189 if (firstObs) {
190 QSettings settings;
191 if ( !settings.value("outFile").toString().isEmpty() ||
192 !settings.value("outPort").toString().isEmpty() ) {
193 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
194 .arg(staID.data()).arg(iSec).toAscii()) );
195 }
196 }
197 delete obs;
198 return;
199 }
200
201 // Save the observation
202 // --------------------
203 _epochs->insert(newTime, obs);
204
205 // Dump Epochs
206 // -----------
207 if (newTime - _waitTime > _lastDumpSec) {
208 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
209 _lastDumpSec = newTime - _waitTime;
210 }
211}
212
213// New Connection
214////////////////////////////////////////////////////////////////////////////
215void bncCaster::slotNewConnection() {
216 _sockets->push_back( _server->nextPendingConnection() );
217 emit( newMessage(QString("New Connection # %1")
218 .arg(_sockets->size()).toAscii()) );
219}
220
221void bncCaster::slotNewUConnection() {
222 _uSockets->push_back( _uServer->nextPendingConnection() );
223 emit( newMessage(QString("New Connection (usync) # %1")
224 .arg(_uSockets->size()).toAscii()) );
225}
226
227// Add New Thread
228////////////////////////////////////////////////////////////////////////////
229void bncCaster::addGetThread(bncGetThread* getThread) {
230
231 qRegisterMetaType<p_obs>("p_obs");
232
233 connect(getThread, SIGNAL(newObs(QByteArray, bool, p_obs)),
234 this, SLOT(newObs(QByteArray, bool, p_obs)));
235
236 connect(getThread, SIGNAL(error(QByteArray)),
237 this, SLOT(slotGetThreadError(QByteArray)));
238
239 _staIDs.push_back(getThread->staID());
240 _threads.push_back(getThread);
241
242 getThread->start();
243}
244
245// Error in get thread
246////////////////////////////////////////////////////////////////////////////
247void bncCaster::slotGetThreadError(QByteArray staID) {
248 QMutexLocker locker(&_mutex);
249 _staIDs.removeAll(staID);
250 emit( newMessage(
251 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
252 if (_staIDs.size() == 0) {
253 emit(newMessage("bncCaster:: last get thread terminated"));
254 emit getThreadErrors();
255 }
256}
257
258// Dump Complete Epochs
259////////////////////////////////////////////////////////////////////////////
260void bncCaster::dumpEpochs(long minTime, long maxTime) {
261
262 const char begEpoch[] = "BEGEPOCH";
263 const char endEpoch[] = "ENDEPOCH";
264
265 const int begEpochNBytes = sizeof(begEpoch) - 1;
266 const int endEpochNBytes = sizeof(endEpoch) - 1;
267
268 for (long sec = minTime; sec <= maxTime; sec++) {
269
270 bool first = true;
271 QList<p_obs> allObs = _epochs->values(sec);
272 QListIterator<p_obs> it(allObs);
273 while (it.hasNext()) {
274 p_obs obs = it.next();
275
276 if (_samplingRate == 0 || sec % _samplingRate == 0) {
277
278 // Output into the file
279 // --------------------
280 if (_out) {
281 if (first) {
282 _out->setFieldWidth(1); *_out << begEpoch << endl;;
283 }
284 _out->setFieldWidth(0); *_out << obs->_o.StatID;
285 _out->setFieldWidth(1); *_out << " " << obs->_o.satSys;
286 _out->setPadChar('0');
287 _out->setFieldWidth(2); *_out << obs->_o.satNum;
288 _out->setPadChar(' ');
289 _out->setFieldWidth(1); *_out << " ";
290 _out->setFieldWidth(4); *_out << obs->_o.GPSWeek;
291 _out->setFieldWidth(1); *_out << " ";
292 _out->setFieldWidth(14); _out->setRealNumberPrecision(7); *_out << obs->_o.GPSWeeks;
293 _out->setFieldWidth(1); *_out << " ";
294 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C1;
295 _out->setFieldWidth(1); *_out << " ";
296 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C2;
297 _out->setFieldWidth(1); *_out << " ";
298 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P1;
299 _out->setFieldWidth(1); *_out << " ";
300 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P2;
301 _out->setFieldWidth(1); *_out << " ";
302 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L1;
303 _out->setFieldWidth(1); *_out << " ";
304 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L2;
305 _out->setFieldWidth(1); *_out << " ";
306 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S1;
307 _out->setFieldWidth(1); *_out << " ";
308 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S2;
309 _out->setFieldWidth(1);
310 *_out << " " << obs->_o.SNR1 << " " << obs->_o.SNR2 << endl;
311 if (!it.hasNext()) {
312 _out->setFieldWidth(1); *_out << endEpoch << endl;
313 }
314 _out->flush();
315 }
316
317 // Output into the socket
318 // ----------------------
319 if (_sockets) {
320 QMutableListIterator<QTcpSocket*> is(*_sockets);
321 while (is.hasNext()) {
322 QTcpSocket* sock = is.next();
323 if (sock->state() == QAbstractSocket::ConnectedState) {
324 bool ok = true;
325 if (first) {
326 if (myWrite(sock, begEpoch, begEpochNBytes) != begEpochNBytes) {
327 ok = false;
328 }
329 }
330 int numBytes = sizeof(obs->_o);
331 if (myWrite(sock, (const char*)(&obs->_o), numBytes) != numBytes) {
332 ok = false;
333 }
334 if (!it.hasNext()) {
335 if (myWrite(sock, endEpoch, endEpochNBytes) != endEpochNBytes) {
336 ok = false;
337 }
338 }
339 if (!ok) {
340 delete sock;
341 is.remove();
342 }
343 }
344 else if (sock->state() != QAbstractSocket::ConnectingState) {
345 delete sock;
346 is.remove();
347 }
348 }
349 }
350 }
351
352 delete obs;
353 _epochs->remove(sec);
354 first = false;
355 }
356 }
357}
358
359// Reread configuration
360////////////////////////////////////////////////////////////////////////////
361void bncCaster::slotReadMountPoints() {
362
363 QSettings settings;
364
365 // Reread several options
366 // ----------------------
367 _samplingRate = settings.value("binSampl").toInt();
368 _waitTime = settings.value("waitTime").toInt();
369 if (_waitTime < 1) {
370 _waitTime = 1;
371 }
372
373 // Add new mountpoints
374 // -------------------
375 int iMount = -1;
376 QListIterator<QString> it(settings.value("mountPoints").toStringList());
377 while (it.hasNext()) {
378 ++iMount;
379 QStringList hlp = it.next().split(" ");
380 if (hlp.size() <= 1) continue;
381 QUrl url(hlp[0]);
382
383 // Does it already exist?
384 // ----------------------
385 bool existFlg = false;
386 QListIterator<bncGetThread*> iTh(_threads);
387 while (iTh.hasNext()) {
388 bncGetThread* thread = iTh.next();
389 if (thread->mountPoint() == url) {
390 existFlg = true;
391 break;
392 }
393 }
394
395 // New bncGetThread
396 // ----------------
397 if (!existFlg) {
398 QByteArray format = hlp[1].toAscii();
399 QByteArray latitude = hlp[2].toAscii();
400 QByteArray longitude = hlp[3].toAscii();
401 QByteArray nmea = hlp[4].toAscii();
402
403 bncGetThread* getThread = new bncGetThread(url, format, latitude,
404 longitude, nmea, iMount);
405 addGetThread(getThread);
406 }
407 }
408
409 // Remove mountpoints
410 // ------------------
411 QListIterator<bncGetThread*> iTh(_threads);
412 while (iTh.hasNext()) {
413 bncGetThread* thread = iTh.next();
414
415 bool existFlg = false;
416 QListIterator<QString> it(settings.value("mountPoints").toStringList());
417 while (it.hasNext()) {
418 QStringList hlp = it.next().split(" ");
419 if (hlp.size() <= 1) continue;
420 QUrl url(hlp[0]);
421
422 if (thread->mountPoint() == url) {
423 existFlg = true;
424 break;
425 }
426 }
427
428 if (!existFlg) {
429 disconnect(thread, 0, 0, 0);
430 _staIDs.removeAll(thread->staID());
431 _threads.removeAll(thread);
432 thread->terminate();
433 thread->wait();
434 delete thread;
435 }
436 }
437
438 emit mountPointsRead(_threads);
439 emit( newMessage(QString("Configuration reread: %1 stream(s)")
440 .arg(_threads.count()).toAscii()) );
441
442 // (Re-) Start the configuration timer
443 // -----------------------------------
444 int ms = 0;
445
446 if (_confTimer) {
447 ms = 1000 * _confInterval;
448 }
449 else {
450 _confTimer = new QTimer();
451 connect(_confTimer, SIGNAL(timeout()), this, SLOT(slotReadMountPoints()));
452
453 QTime currTime = currentDateAndTimeGPS().time();
454 QTime nextShotTime;
455
456 if (settings.value("onTheFlyInterval").toString() == "1 min") {
457 _confInterval = 60;
458 nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
459 }
460 else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
461 _confInterval = 3600;
462 nextShotTime = QTime(currTime.hour()+1, 0, 0);
463 }
464 else {
465 _confInterval = 86400;
466 nextShotTime = QTime(23, 59, 59, 999);
467 }
468
469 ms = currTime.msecsTo(nextShotTime);
470 if (ms < 30000) {
471 ms = 30000;
472 }
473 }
474
475 _confTimer->start(ms);
476}
477
478//
479////////////////////////////////////////////////////////////////////////////
480int bncCaster::myWrite(QTcpSocket* sock, const char* buf, int bufLen) {
481 int bytesWritten = 0;
482 for (;;) {
483 int newBytes = sock->write(buf+bytesWritten, bufLen-bytesWritten);
484 if (newBytes < 0) {
485 return newBytes;
486 }
487 else {
488 bytesWritten += newBytes;
489 }
490 if (bytesWritten == bufLen) {
491 return bytesWritten;
492 }
493 }
494}
Note: See TracBrowser for help on using the repository browser.