source: ntrip/trunk/BNC/src/bnccaster.cpp@ 7177

Last change on this file since 7177 was 7176, checked in by stuerze, 10 years ago

country information from source table is added to have it available for RINEX v3 filenames

File size: 17.4 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <math.h>
42#include <unistd.h>
43#include <iostream>
44#include <iomanip>
45#include <sstream>
46
47#include "bnccaster.h"
48#include "bncrinex.h"
49#include "bnccore.h"
50#include "bncgetthread.h"
51#include "bncutils.h"
52#include "bncsettings.h"
53
54using namespace std;
55
56// Constructor
57////////////////////////////////////////////////////////////////////////////
58bncCaster::bncCaster() {
59
60 bncSettings settings;
61
62 connect(this, SIGNAL(newMessage(QByteArray,bool)),
63 BNC_CORE, SLOT(slotMessage(const QByteArray,bool)));
64
65 _outFile = 0;
66 _out = 0;
67 reopenOutFile();
68
69 int port = settings.value("outPort").toInt();
70
71 if (port != 0) {
72 _server = new QTcpServer;
73 if ( !_server->listen(QHostAddress::Any, port) ) {
74 emit newMessage("bncCaster: Cannot listen on sync port", true);
75 }
76 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
77 _sockets = new QList<QTcpSocket*>;
78 }
79 else {
80 _server = 0;
81 _sockets = 0;
82 }
83
84 int uPort = settings.value("outUPort").toInt();
85 if (uPort != 0) {
86 _uServer = new QTcpServer;
87 if ( !_uServer->listen(QHostAddress::Any, uPort) ) {
88 emit newMessage("bncCaster: Cannot listen on usync port", true);
89 }
90 connect(_uServer, SIGNAL(newConnection()), this, SLOT(slotNewUConnection()));
91 _uSockets = new QList<QTcpSocket*>;
92 }
93 else {
94 _uServer = 0;
95 _uSockets = 0;
96 }
97
98 int nmeaPort = settings.value("PPP/nmeaPort").toInt();
99 if (nmeaPort != 0) {
100 _nmeaServer = new QTcpServer;
101 if ( !_nmeaServer->listen(QHostAddress::Any, nmeaPort) ) {
102 emit newMessage("bncCaster: Cannot listen on port", true);
103 }
104 connect(_nmeaServer, SIGNAL(newConnection()), this, SLOT(slotNewNMEAConnection()));
105 connect(BNC_CORE, SIGNAL(newNMEAstr(QByteArray, QByteArray)),
106 this, SLOT(slotNewNMEAstr(QByteArray, QByteArray)));
107 _nmeaSockets = new QList<QTcpSocket*>;
108 }
109 else {
110 _nmeaServer = 0;
111 _nmeaSockets = 0;
112 }
113
114 _samplingRate = settings.value("binSampl").toInt();
115 _waitTime = settings.value("waitTime").toDouble();
116 if (_waitTime <= 0.0) {
117 _waitTime = 0.01;
118 }
119 _confInterval = -1;
120
121 // Miscellaneous output port
122 // -------------------------
123 _miscMount = settings.value("miscMount").toString();
124 _miscPort = settings.value("miscPort").toInt();
125 if (!_miscMount.isEmpty() && _miscPort != 0) {
126 _miscServer = new QTcpServer;
127 if ( !_miscServer->listen(QHostAddress::Any, _miscPort) ) {
128 emit newMessage("bncCaster: Cannot listen on Miscellaneous Output Port", true);
129 }
130 connect(_miscServer, SIGNAL(newConnection()), this, SLOT(slotNewMiscConnection()));
131 _miscSockets = new QList<QTcpSocket*>;
132 }
133 else {
134 _miscServer = 0;
135 _miscSockets = 0;
136 }
137}
138
139// Destructor
140////////////////////////////////////////////////////////////////////////////
141bncCaster::~bncCaster() {
142
143 QMutexLocker locker(&_mutex);
144
145 QListIterator<bncGetThread*> it(_threads);
146 while(it.hasNext()){
147 bncGetThread* thread = it.next();
148 thread->terminate();
149 }
150 delete _out;
151 delete _outFile;
152 delete _server;
153 delete _sockets;
154 delete _uServer;
155 delete _uSockets;
156 delete _nmeaServer;
157 delete _nmeaSockets;
158 delete _miscServer;
159 delete _miscSockets;
160}
161
162// New Observations
163////////////////////////////////////////////////////////////////////////////
164void bncCaster::slotNewObs(const QByteArray staID, QList<t_satObs> obsList) {
165
166 QMutexLocker locker(&_mutex);
167
168 reopenOutFile();
169
170 unsigned index = 0;
171 QMutableListIterator<t_satObs> it(obsList);
172 while (it.hasNext()) {
173 ++index;
174 t_satObs& obs = it.next();
175
176 // Rename the Station
177 // ------------------
178 obs._staID = staID.data();
179
180 // Output into the socket
181 // ----------------------
182 if (_uSockets) {
183
184 ostringstream oStr;
185 oStr.setf(ios::showpoint | ios::fixed);
186 oStr << obs._staID << " "
187 << setw(4) << obs._time.gpsw() << " "
188 << setw(14) << setprecision(7) << obs._time.gpssec() << " "
189 << bncRinex::asciiSatLine(obs) << endl;
190
191 string hlpStr = oStr.str();
192
193 QMutableListIterator<QTcpSocket*> is(*_uSockets);
194 while (is.hasNext()) {
195 QTcpSocket* sock = is.next();
196 if (sock->state() == QAbstractSocket::ConnectedState) {
197 int numBytes = hlpStr.length();
198 if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
199 delete sock;
200 is.remove();
201 }
202 }
203 else if (sock->state() != QAbstractSocket::ConnectingState) {
204 delete sock;
205 is.remove();
206 }
207 }
208 }
209
210 // First time: set the _lastDumpTime
211 // ---------------------------------
212 if (!_lastDumpTime.valid()) {
213 _lastDumpTime = obs._time - 1.0;
214 }
215
216 // An old observation - throw it away
217 // ----------------------------------
218 if (obs._time <= _lastDumpTime) {
219 if (index == 1) {
220 bncSettings settings;
221 if ( !settings.value("outFile").toString().isEmpty() ||
222 !settings.value("outPort").toString().isEmpty() ) {
223 emit( newMessage(QString("%1: Old epoch %2 thrown away")
224 .arg(staID.data()).arg(string(obs._time).c_str())
225 .toAscii(), true) );
226 }
227 }
228 continue;
229 }
230
231 // Save the observation
232 // --------------------
233 _epochs[obs._time].push_back(obs);
234
235 // Dump Epochs
236 // -----------
237 if (obs._time - _waitTime > _lastDumpTime) {
238 dumpEpochs(obs._time - _waitTime);
239 _lastDumpTime = obs._time - _waitTime;
240 }
241 }
242}
243
244// New Connection
245////////////////////////////////////////////////////////////////////////////
246void bncCaster::slotNewConnection() {
247 _sockets->push_back( _server->nextPendingConnection() );
248 emit( newMessage(QString("New client connection on sync port: # %1")
249 .arg(_sockets->size()).toAscii(), true) );
250}
251
252void bncCaster::slotNewUConnection() {
253 _uSockets->push_back( _uServer->nextPendingConnection() );
254 emit( newMessage(QString("New client connection on usync port: # %1")
255 .arg(_uSockets->size()).toAscii(), true) );
256}
257
258void bncCaster::slotNewNMEAConnection() {
259 _nmeaSockets->push_back( _nmeaServer->nextPendingConnection() );
260 emit( newMessage(QString("New PPP client on port: # %1")
261 .arg(_nmeaSockets->size()).toAscii(), true) );
262}
263
264// Add New Thread
265////////////////////////////////////////////////////////////////////////////
266void bncCaster::addGetThread(bncGetThread* getThread, bool noNewThread) {
267
268 qRegisterMetaType<t_satObs>("t_satObs");
269 qRegisterMetaType< QList<t_satObs> >("QList<t_satObs>");
270
271 connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
272 this, SLOT(slotNewObs(QByteArray, QList<t_satObs>)));
273
274 connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
275 this, SIGNAL(newObs(QByteArray, QList<t_satObs>)));
276
277 connect(getThread, SIGNAL(newRawData(QByteArray, QByteArray)),
278 this, SLOT(slotNewRawData(QByteArray, QByteArray)));
279
280 connect(getThread, SIGNAL(getThreadFinished(QByteArray)),
281 this, SLOT(slotGetThreadFinished(QByteArray)));
282
283 _staIDs.push_back(getThread->staID());
284 _threads.push_back(getThread);
285
286 if (noNewThread) {
287 getThread->run();
288 }
289 else {
290 getThread->start();
291 }
292}
293
294// Get Thread destroyed
295////////////////////////////////////////////////////////////////////////////
296void bncCaster::slotGetThreadFinished(QByteArray staID) {
297 QMutexLocker locker(&_mutex);
298
299 QListIterator<bncGetThread*> it(_threads);
300 while (it.hasNext()) {
301 bncGetThread* thread = it.next();
302 if (thread->staID() == staID) {
303 _threads.removeOne(thread);
304 }
305 }
306
307 _staIDs.removeAll(staID);
308 emit( newMessage(
309 QString("Decoding %1 stream(s)").arg(_staIDs.size()).toAscii(), true) );
310 if (_staIDs.size() == 0) {
311 emit(newMessage("bncCaster: Last get thread terminated", true));
312 emit getThreadsFinished();
313 }
314}
315
316// Dump Complete Epochs
317////////////////////////////////////////////////////////////////////////////
318void bncCaster::dumpEpochs(const bncTime& maxTime) {
319
320 QMutableMapIterator<bncTime, QList<t_satObs> > itEpo(_epochs);
321 while (itEpo.hasNext()) {
322 itEpo.next();
323 const bncTime& epoTime = itEpo.key();
324 if (epoTime <= maxTime) {
325 const QList<t_satObs>& allObs = itEpo.value();
326 int sec = int(nint(epoTime.gpssec()));
327 if ( (_out || _sockets) && (_samplingRate == 0 || sec % _samplingRate == 0) ) {
328
329 QListIterator<t_satObs> it(allObs);
330 bool firstObs = true;
331 while (it.hasNext()) {
332 const t_satObs& obs = it.next();
333
334 ostringstream oStr;
335 oStr.setf(ios::showpoint | ios::fixed);
336 if (firstObs) {
337 firstObs = false;
338 oStr << "> " << obs._time.gpsw() << ' '
339 << setprecision(7) << obs._time.gpssec() << endl;;
340 }
341 oStr << obs._staID << ' ' << bncRinex::asciiSatLine(obs) << endl;
342 if (!it.hasNext()) {
343 oStr << endl;
344 }
345 string hlpStr = oStr.str();
346
347 // Output into the File
348 // --------------------
349 if (_out) {
350 *_out << hlpStr.c_str();
351 _out->flush();
352 }
353
354 // Output into the socket
355 // ----------------------
356 if (_sockets) {
357 QMutableListIterator<QTcpSocket*> is(*_sockets);
358 while (is.hasNext()) {
359 QTcpSocket* sock = is.next();
360 if (sock->state() == QAbstractSocket::ConnectedState) {
361 int numBytes = hlpStr.length();
362 if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
363 delete sock;
364 is.remove();
365 }
366 }
367 else if (sock->state() != QAbstractSocket::ConnectingState) {
368 delete sock;
369 is.remove();
370 }
371 }
372 }
373 }
374 }
375 _epochs.remove(epoTime);
376 }
377 }
378}
379
380// Reread configuration (private slot)
381////////////////////////////////////////////////////////////////////////////
382void bncCaster::slotReadMountPoints() {
383
384 bncSettings settings;
385 settings.reRead();
386
387 readMountPoints();
388}
389
390// Read Mountpoints
391////////////////////////////////////////////////////////////////////////////
392void bncCaster::readMountPoints() {
393
394 bncSettings settings;
395
396 // Reread several options
397 // ----------------------
398 _samplingRate = settings.value("binSampl").toInt();
399 _waitTime = settings.value("waitTime").toInt();
400 if (_waitTime < 1) {
401 _waitTime = 1;
402 }
403
404 // Add new mountpoints
405 // -------------------
406 int iMount = -1;
407 QListIterator<QString> it(settings.value("mountPoints").toStringList());
408 while (it.hasNext()) {
409 ++iMount;
410 QStringList hlp = it.next().split(" ");
411 if (hlp.size() <= 1) continue;
412 QUrl url(hlp[0]);
413
414 // Does it already exist?
415 // ----------------------
416 bool existFlg = false;
417 QListIterator<bncGetThread*> iTh(_threads);
418 while (iTh.hasNext()) {
419 bncGetThread* thread = iTh.next();
420 if (thread->mountPoint() == url) {
421 existFlg = true;
422 break;
423 }
424 }
425
426 // New bncGetThread
427 // ----------------
428 if (!existFlg) {
429 QByteArray format = hlp[1].toAscii();
430 QByteArray latitude = hlp[3].toAscii();
431 QByteArray longitude = hlp[4].toAscii();
432 QByteArray nmea = hlp[5].toAscii();
433 QByteArray ntripVersion = hlp[6].toAscii();
434
435 bncGetThread* getThread = new bncGetThread(url, format, latitude,
436 longitude, nmea, ntripVersion);
437 addGetThread(getThread);
438 }
439 }
440
441 // Remove mountpoints
442 // ------------------
443 QListIterator<bncGetThread*> iTh(_threads);
444 while (iTh.hasNext()) {
445 bncGetThread* thread = iTh.next();
446
447 bool existFlg = false;
448 QListIterator<QString> it(settings.value("mountPoints").toStringList());
449 while (it.hasNext()) {
450 QStringList hlp = it.next().split(" ");
451 if (hlp.size() <= 1) continue;
452 QUrl url(hlp[0]);
453
454 if (thread->mountPoint() == url) {
455 existFlg = true;
456 break;
457 }
458 }
459
460 if (!existFlg) {
461 disconnect(thread, 0, 0, 0);
462 _staIDs.removeAll(thread->staID());
463 _threads.removeAll(thread);
464 thread->terminate();
465 }
466 }
467
468 emit mountPointsRead(_threads);
469 emit( newMessage(QString("Configuration read: "
470 + BNC_CORE->confFileName()
471 + ", %1 stream(s)")
472 .arg(_threads.count()).toAscii(), true) );
473
474 // (Re-) Start the configuration timer
475 // -----------------------------------
476 int ms = 0;
477
478 if (_confInterval != -1) {
479 ms = 1000 * _confInterval;
480 }
481 else {
482 QTime currTime = currentDateAndTimeGPS().time();
483 QTime nextShotTime;
484
485 if (settings.value("onTheFlyInterval").toString() == "1 min") {
486 _confInterval = 60;
487 nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
488 }
489 else if (settings.value("onTheFlyInterval").toString() == "5 min") {
490 _confInterval = 300;
491 nextShotTime = QTime(currTime.hour(), currTime.minute()+5, 0);
492 }
493 else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
494 _confInterval = 3600;
495 nextShotTime = QTime(currTime.hour()+1, 0, 0);
496 }
497 else {
498 _confInterval = 86400;
499 nextShotTime = QTime(23, 59, 59, 999);
500 }
501
502 ms = currTime.msecsTo(nextShotTime);
503 if (ms < 30000) {
504 ms = 30000;
505 }
506 }
507
508 QTimer::singleShot(ms, this, SLOT(slotReadMountPoints()));
509}
510
511//
512////////////////////////////////////////////////////////////////////////////
513int bncCaster::myWrite(QTcpSocket* sock, const char* buf, int bufLen) {
514 sock->write(buf, bufLen);
515 for (int ii = 1; ii <= 10; ii++) {
516 if (sock->waitForBytesWritten(10)) { // wait 10 ms
517 return bufLen;
518 }
519 }
520 return -1;
521}
522
523//
524////////////////////////////////////////////////////////////////////////////
525void bncCaster::slotNewNMEAstr(QByteArray /* staID */, QByteArray str) {
526 if (_nmeaSockets) {
527 QMutableListIterator<QTcpSocket*> is(*_nmeaSockets);
528 while (is.hasNext()) {
529 QTcpSocket* sock = is.next();
530 if (sock->state() == QAbstractSocket::ConnectedState) {
531 sock->write(str);
532 }
533 else if (sock->state() != QAbstractSocket::ConnectingState) {
534 delete sock;
535 is.remove();
536 }
537 }
538 }
539}
540
541//
542////////////////////////////////////////////////////////////////////////////
543void bncCaster::reopenOutFile() {
544
545 bncSettings settings;
546
547 QString outFileName = settings.value("outFile").toString();
548 if ( !outFileName.isEmpty() ) {
549 expandEnvVar(outFileName);
550 if (!_outFile || _outFile->fileName() != outFileName) {
551 delete _out;
552 delete _outFile;
553 _outFile = new QFile(outFileName);
554 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
555 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
556 }
557 else {
558 _outFile->open(QIODevice::WriteOnly);
559 }
560 _out = new QTextStream(_outFile);
561 _out->setRealNumberNotation(QTextStream::FixedNotation);
562 }
563 }
564 else {
565 delete _out; _out = 0;
566 delete _outFile; _outFile = 0;
567 }
568}
569
570// Output into the Miscellaneous socket
571////////////////////////////////////////////////////////////////////////////
572void bncCaster::slotNewRawData(QByteArray staID, QByteArray data) {
573 if (_miscSockets && (_miscMount == "ALL" || _miscMount == staID)) {
574 QMutableListIterator<QTcpSocket*> is(*_miscSockets);
575 while (is.hasNext()) {
576 QTcpSocket* sock = is.next();
577 if (sock->state() == QAbstractSocket::ConnectedState) {
578 sock->write(data);
579 }
580 else if (sock->state() != QAbstractSocket::ConnectingState) {
581 delete sock;
582 is.remove();
583 }
584 }
585 }
586}
587
588// New Connection
589////////////////////////////////////////////////////////////////////////////
590void bncCaster::slotNewMiscConnection() {
591 _miscSockets->push_back( _miscServer->nextPendingConnection() );
592 emit( newMessage(QString("New client connection on Miscellaneous Output Port: # %1")
593 .arg(_miscSockets->size()).toAscii(), true) );
594}
Note: See TracBrowser for help on using the repository browser.