source: ntrip/trunk/BNC/src/bnccaster.cpp@ 6041

Last change on this file since 6041 was 5996, checked in by mervart, 10 years ago
File size: 17.6 KB
RevLine 
[280]1// Part of BNC, a utility for retrieving decoding and
[464]2// converting GNSS data streams from NTRIP broadcasters.
[280]3//
[464]4// Copyright (C) 2007
[280]5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
[464]7// Czech Technical University Prague, Department of Geodesy
[280]8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
[35]24
25/* -------------------------------------------------------------------------
[93]26 * BKG NTRIP Client
[35]27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
[222]41#include <math.h>
[636]42#include <unistd.h>
[2831]43#include <iostream>
44#include <iomanip>
45#include <sstream>
[222]46
[35]47#include "bnccaster.h"
[2697]48#include "bncrinex.h"
[5070]49#include "bnccore.h"
[35]50#include "bncgetthread.h"
[134]51#include "bncutils.h"
[1535]52#include "bncsettings.h"
[5738]53#include "GPSDecoder.h"
[35]54
[2831]55using namespace std;
56
[35]57// Constructor
58////////////////////////////////////////////////////////////////////////////
[5528]59bncCaster::bncCaster() {
[35]60
[1535]61 bncSettings settings;
[275]62
[1299]63 connect(this, SIGNAL(newMessage(QByteArray,bool)),
[5068]64 BNC_CORE, SLOT(slotMessage(const QByteArray,bool)));
[1228]65
[5528]66 _outFile = 0;
67 _out = 0;
68 reopenOutFile();
[35]69
[5528]70 _port = settings.value("outPort").toInt();
[35]71
72 if (_port != 0) {
73 _server = new QTcpServer;
[1228]74 if ( !_server->listen(QHostAddress::Any, _port) ) {
[1450]75 emit newMessage("bncCaster: Cannot listen on sync port", true);
[1228]76 }
[35]77 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
78 _sockets = new QList<QTcpSocket*>;
79 }
80 else {
81 _server = 0;
82 _sockets = 0;
83 }
84
[1222]85 int uPort = settings.value("outUPort").toInt();
86 if (uPort != 0) {
87 _uServer = new QTcpServer;
[1228]88 if ( !_uServer->listen(QHostAddress::Any, uPort) ) {
[1450]89 emit newMessage("bncCaster: Cannot listen on usync port", true);
[1228]90 }
[1222]91 connect(_uServer, SIGNAL(newConnection()), this, SLOT(slotNewUConnection()));
92 _uSockets = new QList<QTcpSocket*>;
93 }
94 else {
95 _uServer = 0;
96 _uSockets = 0;
97 }
98
[5992]99 int nmeaPort = settings.value("PPP/nmeaPort").toInt();
[2183]100 if (nmeaPort != 0) {
101 _nmeaServer = new QTcpServer;
102 if ( !_nmeaServer->listen(QHostAddress::Any, nmeaPort) ) {
103 emit newMessage("bncCaster: Cannot listen on port", true);
104 }
105 connect(_nmeaServer, SIGNAL(newConnection()), this, SLOT(slotNewNMEAConnection()));
[5996]106 connect(BNC_CORE, SIGNAL(newNMEAstr(QByteArray, QByteArray)),
107 this, SLOT(slotNewNMEAstr(QByteArray, QByteArray)));
[2183]108 _nmeaSockets = new QList<QTcpSocket*>;
109 }
110 else {
111 _nmeaServer = 0;
112 _nmeaSockets = 0;
113 }
114
[2711]115 _epochs = new QMultiMap<long, t_obs>;
[35]116
[3543]117 _samplingRate = settings.value("binSampl").toInt();
118 _waitTime = settings.value("waitTime").toInt();
[3034]119 _lastDumpSec = 0;
[1705]120 _confInterval = -1;
[5647]121
[5649]122 // Miscellaneous output port
[5647]123 // -------------------------
124 _miscMount = settings.value("miscMount").toString();
125 _miscPort = settings.value("miscPort").toInt();
126 if (!_miscMount.isEmpty() && _miscPort != 0) {
127 _miscServer = new QTcpServer;
128 if ( !_miscServer->listen(QHostAddress::Any, _miscPort) ) {
129 emit newMessage("bncCaster: Cannot listen on Miscellaneous Output Port", true);
130 }
131 connect(_miscServer, SIGNAL(newConnection()), this, SLOT(slotNewMiscConnection()));
132 _miscSockets = new QList<QTcpSocket*>;
133 }
134 else {
135 _miscServer = 0;
136 _miscSockets = 0;
137 }
[35]138}
139
140// Destructor
141////////////////////////////////////////////////////////////////////////////
142bncCaster::~bncCaster() {
[2647]143
144 QMutexLocker locker(&_mutex);
145
[186]146 QListIterator<bncGetThread*> it(_threads);
147 while(it.hasNext()){
148 bncGetThread* thread = it.next();
[1525]149 thread->terminate();
[186]150 }
[35]151 delete _out;
152 delete _outFile;
[187]153 delete _server;
[631]154 delete _sockets;
[1222]155 delete _uServer;
156 delete _uSockets;
[2186]157 delete _nmeaServer;
158 delete _nmeaSockets;
[2711]159 delete _epochs;
[5647]160 delete _miscServer;
161 delete _miscSockets;
[35]162}
163
164// New Observations
165////////////////////////////////////////////////////////////////////////////
[5524]166void bncCaster::slotNewObs(const QByteArray staID, QList<t_obs> obsList) {
[35]167
[243]168 QMutexLocker locker(&_mutex);
169
[5528]170 reopenOutFile();
171
[5527]172 unsigned index = 0;
[5524]173 QMutableListIterator<t_obs> it(obsList);
174 while (it.hasNext()) {
[5527]175 ++index;
[5524]176 t_obs& obs = it.next();
[2833]177
[5524]178 long iSec = long(floor(obs.GPSWeeks+0.5));
[5527]179 long newTime = obs.GPSWeek * 7*24*3600 + iSec;
[5524]180
181 // Rename the Station
182 // ------------------
183 strncpy(obs.StatID, staID.constData(),sizeof(obs.StatID));
184 obs.StatID[sizeof(obs.StatID)-1] = '\0';
185
186 // Output into the socket
187 // ----------------------
188 if (_uSockets) {
189
190 ostringstream oStr;
191 oStr.setf(ios::showpoint | ios::fixed);
[5526]192 oStr << obs.StatID << " "
193 << setw(4) << obs.GPSWeek << " "
194 << setw(14) << setprecision(7) << obs.GPSWeeks << " "
[5524]195 << bncRinex::asciiSatLine(obs) << endl;
196
197 string hlpStr = oStr.str();
198
199 QMutableListIterator<QTcpSocket*> is(*_uSockets);
200 while (is.hasNext()) {
201 QTcpSocket* sock = is.next();
202 if (sock->state() == QAbstractSocket::ConnectedState) {
203 int numBytes = hlpStr.length();
204 if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
205 delete sock;
206 is.remove();
207 }
208 }
209 else if (sock->state() != QAbstractSocket::ConnectingState) {
[1222]210 delete sock;
211 is.remove();
212 }
213 }
214 }
[5524]215
216 // First time, set the _lastDumpSec immediately
217 // --------------------------------------------
218 if (_lastDumpSec == 0) {
219 _lastDumpSec = newTime - 1;
220 }
221
222 // An old observation - throw it away
223 // ----------------------------------
224 if (newTime <= _lastDumpSec) {
[5527]225 if (index == 1) {
226 bncSettings settings;
227 if ( !settings.value("outFile").toString().isEmpty() ||
228 !settings.value("outPort").toString().isEmpty() ) {
[5524]229
[5527]230 QTime enomtime = QTime(0,0,0).addSecs(iSec);
[5524]231
[5527]232 emit( newMessage(QString("%1: Old epoch %2 (%3) thrown away")
233 .arg(staID.data()).arg(iSec)
234 .arg(enomtime.toString("HH:mm:ss"))
235 .toAscii(), true) );
236 }
[257]237 }
[5527]238 continue;
[181]239 }
[5524]240
241 // Save the observation
242 // --------------------
243 _epochs->insert(newTime, obs);
[35]244
[5527]245 // Dump Epochs
246 // -----------
247 if (newTime - _waitTime > _lastDumpSec) {
248 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
249 _lastDumpSec = newTime - _waitTime;
250 }
[252]251 }
[35]252}
253
254// New Connection
255////////////////////////////////////////////////////////////////////////////
256void bncCaster::slotNewConnection() {
257 _sockets->push_back( _server->nextPendingConnection() );
[1450]258 emit( newMessage(QString("New client connection on sync port: # %1")
[1299]259 .arg(_sockets->size()).toAscii(), true) );
[35]260}
261
[1222]262void bncCaster::slotNewUConnection() {
263 _uSockets->push_back( _uServer->nextPendingConnection() );
[1450]264 emit( newMessage(QString("New client connection on usync port: # %1")
[1299]265 .arg(_uSockets->size()).toAscii(), true) );
[1222]266}
267
[2183]268void bncCaster::slotNewNMEAConnection() {
269 _nmeaSockets->push_back( _nmeaServer->nextPendingConnection() );
270 emit( newMessage(QString("New PPP client on port: # %1")
271 .arg(_nmeaSockets->size()).toAscii(), true) );
272}
273
[35]274// Add New Thread
275////////////////////////////////////////////////////////////////////////////
[2528]276void bncCaster::addGetThread(bncGetThread* getThread, bool noNewThread) {
[624]277
[2711]278 qRegisterMetaType<t_obs>("t_obs");
[5524]279 qRegisterMetaType< QList<t_obs> >("QList<t_obs>");
[2585]280 qRegisterMetaType<gpsephemeris>("gpsephemeris");
281 qRegisterMetaType<glonassephemeris>("glonassephemeris");
[3522]282 qRegisterMetaType<galileoephemeris>("galileoephemeris");
[624]283
[5524]284 connect(getThread, SIGNAL(newObs(QByteArray, QList<t_obs>)),
285 this, SLOT(slotNewObs(QByteArray, QList<t_obs>)));
[463]286
[5722]287 connect(getThread, SIGNAL(newObs(QByteArray, QList<t_obs>)),
[5724]288 this, SIGNAL(newObs(QByteArray, QList<t_obs>)));
[5722]289
[5648]290 connect(getThread, SIGNAL(newRawData(QByteArray, QByteArray)),
291 this, SLOT(slotNewRawData(QByteArray, QByteArray)));
292
[1556]293 connect(getThread, SIGNAL(getThreadFinished(QByteArray)),
294 this, SLOT(slotGetThreadFinished(QByteArray)));
[35]295
[88]296 _staIDs.push_back(getThread->staID());
[186]297 _threads.push_back(getThread);
[1170]298
[2528]299 if (noNewThread) {
300 getThread->run();
301 }
302 else {
303 getThread->start();
304 }
[35]305}
306
[1556]307// Get Thread destroyed
[35]308////////////////////////////////////////////////////////////////////////////
[1556]309void bncCaster::slotGetThreadFinished(QByteArray staID) {
[243]310 QMutexLocker locker(&_mutex);
[1560]311
312 QListIterator<bncGetThread*> it(_threads);
313 while (it.hasNext()) {
314 bncGetThread* thread = it.next();
315 if (thread->staID() == staID) {
316 _threads.removeOne(thread);
317 }
318 }
319
[88]320 _staIDs.removeAll(staID);
[82]321 emit( newMessage(
[1986]322 QString("Decoding %1 stream(s)").arg(_staIDs.size()).toAscii(), true) );
[88]323 if (_staIDs.size() == 0) {
[1450]324 emit(newMessage("bncCaster: Last get thread terminated", true));
[1556]325 emit getThreadsFinished();
[35]326 }
327}
328
329// Dump Complete Epochs
330////////////////////////////////////////////////////////////////////////////
[2316]331void bncCaster::dumpEpochs(long minTime, long maxTime) {
[35]332
[2316]333 for (long sec = minTime; sec <= maxTime; sec++) {
[140]334
[4990]335 if ( (_out || _sockets) &&
336 (_samplingRate == 0 || sec % _samplingRate == 0) ) {
[1999]337
[4990]338 QList<t_obs> allObs = _epochs->values(sec);
339
340 QListIterator<t_obs> it(allObs);
341 bool firstObs = true;
342 while (it.hasNext()) {
343 const t_obs& obs = it.next();
[4987]344
[4990]345 ostringstream oStr;
346 oStr.setf(ios::showpoint | ios::fixed);
347 if (firstObs) {
348 firstObs = false;
349 oStr << "> " << obs.GPSWeek << ' '
350 << setprecision(7) << obs.GPSWeeks << endl;;
351 }
352 oStr << obs.StatID << ' ' << bncRinex::asciiSatLine(obs) << endl;
353 if (!it.hasNext()) {
354 oStr << endl;
355 }
356 string hlpStr = oStr.str();
[35]357
[4990]358 // Output into the File
359 // --------------------
360 if (_out) {
361 *_out << hlpStr.c_str();
362 _out->flush();
363 }
[1810]364
[4990]365 // Output into the socket
366 // ----------------------
367 if (_sockets) {
368 QMutableListIterator<QTcpSocket*> is(*_sockets);
369 while (is.hasNext()) {
370 QTcpSocket* sock = is.next();
371 if (sock->state() == QAbstractSocket::ConnectedState) {
372 int numBytes = hlpStr.length();
373 if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
[637]374 delete sock;
375 is.remove();
376 }
[140]377 }
[4990]378 else if (sock->state() != QAbstractSocket::ConnectingState) {
379 delete sock;
380 is.remove();
381 }
[140]382 }
383 }
[73]384 }
[35]385 }
[4990]386 _epochs->remove(sec);
[35]387 }
388}
[1170]389
[4250]390// Reread configuration (private slot)
[1170]391////////////////////////////////////////////////////////////////////////////
[1179]392void bncCaster::slotReadMountPoints() {
[1170]393
[4252]394 bncSettings settings;
395 settings.reRead();
396
[4250]397 readMountPoints();
398}
399
400// Read Mountpoints
401////////////////////////////////////////////////////////////////////////////
402void bncCaster::readMountPoints() {
403
[1535]404 bncSettings settings;
[1170]405
406 // Reread several options
407 // ----------------------
[2316]408 _samplingRate = settings.value("binSampl").toInt();
409 _waitTime = settings.value("waitTime").toInt();
410 if (_waitTime < 1) {
411 _waitTime = 1;
[1170]412 }
413
414 // Add new mountpoints
415 // -------------------
416 int iMount = -1;
417 QListIterator<QString> it(settings.value("mountPoints").toStringList());
418 while (it.hasNext()) {
419 ++iMount;
420 QStringList hlp = it.next().split(" ");
421 if (hlp.size() <= 1) continue;
422 QUrl url(hlp[0]);
423
424 // Does it already exist?
425 // ----------------------
426 bool existFlg = false;
427 QListIterator<bncGetThread*> iTh(_threads);
428 while (iTh.hasNext()) {
429 bncGetThread* thread = iTh.next();
430 if (thread->mountPoint() == url) {
431 existFlg = true;
432 break;
433 }
434 }
435
436 // New bncGetThread
437 // ----------------
438 if (!existFlg) {
439 QByteArray format = hlp[1].toAscii();
440 QByteArray latitude = hlp[2].toAscii();
441 QByteArray longitude = hlp[3].toAscii();
442 QByteArray nmea = hlp[4].toAscii();
[1353]443 QByteArray ntripVersion = hlp[5].toAscii();
[1170]444
445 bncGetThread* getThread = new bncGetThread(url, format, latitude,
[3003]446 longitude, nmea, ntripVersion);
[1170]447 addGetThread(getThread);
448 }
449 }
450
451 // Remove mountpoints
452 // ------------------
453 QListIterator<bncGetThread*> iTh(_threads);
454 while (iTh.hasNext()) {
455 bncGetThread* thread = iTh.next();
456
457 bool existFlg = false;
458 QListIterator<QString> it(settings.value("mountPoints").toStringList());
459 while (it.hasNext()) {
460 QStringList hlp = it.next().split(" ");
461 if (hlp.size() <= 1) continue;
462 QUrl url(hlp[0]);
463
464 if (thread->mountPoint() == url) {
465 existFlg = true;
466 break;
467 }
468 }
469
470 if (!existFlg) {
471 disconnect(thread, 0, 0, 0);
472 _staIDs.removeAll(thread->staID());
473 _threads.removeAll(thread);
474 thread->terminate();
475 }
476 }
477
[1179]478 emit mountPointsRead(_threads);
[1529]479 emit( newMessage(QString("Configuration read: "
[5068]480 + BNC_CORE->confFileName()
[1529]481 + ", %1 stream(s)")
[1299]482 .arg(_threads.count()).toAscii(), true) );
[1176]483
[1170]484 // (Re-) Start the configuration timer
485 // -----------------------------------
486 int ms = 0;
487
[1705]488 if (_confInterval != -1) {
[1170]489 ms = 1000 * _confInterval;
490 }
491 else {
492 QTime currTime = currentDateAndTimeGPS().time();
493 QTime nextShotTime;
494
495 if (settings.value("onTheFlyInterval").toString() == "1 min") {
496 _confInterval = 60;
497 nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
498 }
[4536]499 else if (settings.value("onTheFlyInterval").toString() == "5 min") {
500 _confInterval = 300;
501 nextShotTime = QTime(currTime.hour(), currTime.minute()+5, 0);
502 }
[1170]503 else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
504 _confInterval = 3600;
505 nextShotTime = QTime(currTime.hour()+1, 0, 0);
506 }
507 else {
508 _confInterval = 86400;
509 nextShotTime = QTime(23, 59, 59, 999);
510 }
511
512 ms = currTime.msecsTo(nextShotTime);
[1176]513 if (ms < 30000) {
514 ms = 30000;
515 }
[1170]516 }
517
[1705]518 QTimer::singleShot(ms, this, SLOT(slotReadMountPoints()));
[1170]519}
[1182]520
521//
522////////////////////////////////////////////////////////////////////////////
523int bncCaster::myWrite(QTcpSocket* sock, const char* buf, int bufLen) {
[1229]524 sock->write(buf, bufLen);
525 for (int ii = 1; ii <= 10; ii++) {
526 if (sock->waitForBytesWritten(10)) { // wait 10 ms
527 return bufLen;
[1182]528 }
529 }
[1229]530 return -1;
[1182]531}
[2182]532
533//
534////////////////////////////////////////////////////////////////////////////
[5995]535void bncCaster::slotNewNMEAstr(QByteArray /* staID */, QByteArray str) {
[2184]536 if (_nmeaSockets) {
537 QMutableListIterator<QTcpSocket*> is(*_nmeaSockets);
538 while (is.hasNext()) {
539 QTcpSocket* sock = is.next();
540 if (sock->state() == QAbstractSocket::ConnectedState) {
541 sock->write(str);
542 }
543 else if (sock->state() != QAbstractSocket::ConnectingState) {
544 delete sock;
545 is.remove();
546 }
547 }
548 }
[2182]549}
[5528]550
551//
552////////////////////////////////////////////////////////////////////////////
553void bncCaster::reopenOutFile() {
554
555 bncSettings settings;
556
557 QString outFileName = settings.value("outFile").toString();
558 if ( !outFileName.isEmpty() ) {
559 expandEnvVar(outFileName);
560 if (!_outFile || _outFile->fileName() != outFileName) {
561 delete _out;
562 delete _outFile;
563 _outFile = new QFile(outFileName);
564 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
565 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
566 }
567 else {
568 _outFile->open(QIODevice::WriteOnly);
569 }
570 _out = new QTextStream(_outFile);
571 _out->setRealNumberNotation(QTextStream::FixedNotation);
572 }
573 }
574 else {
575 delete _out; _out = 0;
576 delete _outFile; _outFile = 0;
577 }
578}
579
[5649]580// Output into the Miscellaneous socket
[5648]581////////////////////////////////////////////////////////////////////////////
582void bncCaster::slotNewRawData(QByteArray staID, QByteArray data) {
583 if (_miscSockets && (_miscMount == "ALL" || _miscMount == staID)) {
584 QMutableListIterator<QTcpSocket*> is(*_miscSockets);
585 while (is.hasNext()) {
586 QTcpSocket* sock = is.next();
587 if (sock->state() == QAbstractSocket::ConnectedState) {
588 sock->write(data);
589 }
590 else if (sock->state() != QAbstractSocket::ConnectingState) {
591 delete sock;
592 is.remove();
593 }
594 }
595 }
596}
[5647]597
[5649]598// New Connection
[5647]599////////////////////////////////////////////////////////////////////////////
600void bncCaster::slotNewMiscConnection() {
601 _miscSockets->push_back( _miscServer->nextPendingConnection() );
602 emit( newMessage(QString("New client connection on Miscellaneous Output Port: # %1")
603 .arg(_miscSockets->size()).toAscii(), true) );
604}
Note: See TracBrowser for help on using the repository browser.