source: ntrip/trunk/BNC/src/bnccaster.cpp@ 6000

Last change on this file since 6000 was 5996, checked in by mervart, 10 years ago
File size: 17.6 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <math.h>
42#include <unistd.h>
43#include <iostream>
44#include <iomanip>
45#include <sstream>
46
47#include "bnccaster.h"
48#include "bncrinex.h"
49#include "bnccore.h"
50#include "bncgetthread.h"
51#include "bncutils.h"
52#include "bncsettings.h"
53#include "GPSDecoder.h"
54
55using namespace std;
56
57// Constructor
58////////////////////////////////////////////////////////////////////////////
59bncCaster::bncCaster() {
60
61 bncSettings settings;
62
63 connect(this, SIGNAL(newMessage(QByteArray,bool)),
64 BNC_CORE, SLOT(slotMessage(const QByteArray,bool)));
65
66 _outFile = 0;
67 _out = 0;
68 reopenOutFile();
69
70 _port = settings.value("outPort").toInt();
71
72 if (_port != 0) {
73 _server = new QTcpServer;
74 if ( !_server->listen(QHostAddress::Any, _port) ) {
75 emit newMessage("bncCaster: Cannot listen on sync port", true);
76 }
77 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
78 _sockets = new QList<QTcpSocket*>;
79 }
80 else {
81 _server = 0;
82 _sockets = 0;
83 }
84
85 int uPort = settings.value("outUPort").toInt();
86 if (uPort != 0) {
87 _uServer = new QTcpServer;
88 if ( !_uServer->listen(QHostAddress::Any, uPort) ) {
89 emit newMessage("bncCaster: Cannot listen on usync port", true);
90 }
91 connect(_uServer, SIGNAL(newConnection()), this, SLOT(slotNewUConnection()));
92 _uSockets = new QList<QTcpSocket*>;
93 }
94 else {
95 _uServer = 0;
96 _uSockets = 0;
97 }
98
99 int nmeaPort = settings.value("PPP/nmeaPort").toInt();
100 if (nmeaPort != 0) {
101 _nmeaServer = new QTcpServer;
102 if ( !_nmeaServer->listen(QHostAddress::Any, nmeaPort) ) {
103 emit newMessage("bncCaster: Cannot listen on port", true);
104 }
105 connect(_nmeaServer, SIGNAL(newConnection()), this, SLOT(slotNewNMEAConnection()));
106 connect(BNC_CORE, SIGNAL(newNMEAstr(QByteArray, QByteArray)),
107 this, SLOT(slotNewNMEAstr(QByteArray, QByteArray)));
108 _nmeaSockets = new QList<QTcpSocket*>;
109 }
110 else {
111 _nmeaServer = 0;
112 _nmeaSockets = 0;
113 }
114
115 _epochs = new QMultiMap<long, t_obs>;
116
117 _samplingRate = settings.value("binSampl").toInt();
118 _waitTime = settings.value("waitTime").toInt();
119 _lastDumpSec = 0;
120 _confInterval = -1;
121
122 // Miscellaneous output port
123 // -------------------------
124 _miscMount = settings.value("miscMount").toString();
125 _miscPort = settings.value("miscPort").toInt();
126 if (!_miscMount.isEmpty() && _miscPort != 0) {
127 _miscServer = new QTcpServer;
128 if ( !_miscServer->listen(QHostAddress::Any, _miscPort) ) {
129 emit newMessage("bncCaster: Cannot listen on Miscellaneous Output Port", true);
130 }
131 connect(_miscServer, SIGNAL(newConnection()), this, SLOT(slotNewMiscConnection()));
132 _miscSockets = new QList<QTcpSocket*>;
133 }
134 else {
135 _miscServer = 0;
136 _miscSockets = 0;
137 }
138}
139
140// Destructor
141////////////////////////////////////////////////////////////////////////////
142bncCaster::~bncCaster() {
143
144 QMutexLocker locker(&_mutex);
145
146 QListIterator<bncGetThread*> it(_threads);
147 while(it.hasNext()){
148 bncGetThread* thread = it.next();
149 thread->terminate();
150 }
151 delete _out;
152 delete _outFile;
153 delete _server;
154 delete _sockets;
155 delete _uServer;
156 delete _uSockets;
157 delete _nmeaServer;
158 delete _nmeaSockets;
159 delete _epochs;
160 delete _miscServer;
161 delete _miscSockets;
162}
163
164// New Observations
165////////////////////////////////////////////////////////////////////////////
166void bncCaster::slotNewObs(const QByteArray staID, QList<t_obs> obsList) {
167
168 QMutexLocker locker(&_mutex);
169
170 reopenOutFile();
171
172 unsigned index = 0;
173 QMutableListIterator<t_obs> it(obsList);
174 while (it.hasNext()) {
175 ++index;
176 t_obs& obs = it.next();
177
178 long iSec = long(floor(obs.GPSWeeks+0.5));
179 long newTime = obs.GPSWeek * 7*24*3600 + iSec;
180
181 // Rename the Station
182 // ------------------
183 strncpy(obs.StatID, staID.constData(),sizeof(obs.StatID));
184 obs.StatID[sizeof(obs.StatID)-1] = '\0';
185
186 // Output into the socket
187 // ----------------------
188 if (_uSockets) {
189
190 ostringstream oStr;
191 oStr.setf(ios::showpoint | ios::fixed);
192 oStr << obs.StatID << " "
193 << setw(4) << obs.GPSWeek << " "
194 << setw(14) << setprecision(7) << obs.GPSWeeks << " "
195 << bncRinex::asciiSatLine(obs) << endl;
196
197 string hlpStr = oStr.str();
198
199 QMutableListIterator<QTcpSocket*> is(*_uSockets);
200 while (is.hasNext()) {
201 QTcpSocket* sock = is.next();
202 if (sock->state() == QAbstractSocket::ConnectedState) {
203 int numBytes = hlpStr.length();
204 if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
205 delete sock;
206 is.remove();
207 }
208 }
209 else if (sock->state() != QAbstractSocket::ConnectingState) {
210 delete sock;
211 is.remove();
212 }
213 }
214 }
215
216 // First time, set the _lastDumpSec immediately
217 // --------------------------------------------
218 if (_lastDumpSec == 0) {
219 _lastDumpSec = newTime - 1;
220 }
221
222 // An old observation - throw it away
223 // ----------------------------------
224 if (newTime <= _lastDumpSec) {
225 if (index == 1) {
226 bncSettings settings;
227 if ( !settings.value("outFile").toString().isEmpty() ||
228 !settings.value("outPort").toString().isEmpty() ) {
229
230 QTime enomtime = QTime(0,0,0).addSecs(iSec);
231
232 emit( newMessage(QString("%1: Old epoch %2 (%3) thrown away")
233 .arg(staID.data()).arg(iSec)
234 .arg(enomtime.toString("HH:mm:ss"))
235 .toAscii(), true) );
236 }
237 }
238 continue;
239 }
240
241 // Save the observation
242 // --------------------
243 _epochs->insert(newTime, obs);
244
245 // Dump Epochs
246 // -----------
247 if (newTime - _waitTime > _lastDumpSec) {
248 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
249 _lastDumpSec = newTime - _waitTime;
250 }
251 }
252}
253
254// New Connection
255////////////////////////////////////////////////////////////////////////////
256void bncCaster::slotNewConnection() {
257 _sockets->push_back( _server->nextPendingConnection() );
258 emit( newMessage(QString("New client connection on sync port: # %1")
259 .arg(_sockets->size()).toAscii(), true) );
260}
261
262void bncCaster::slotNewUConnection() {
263 _uSockets->push_back( _uServer->nextPendingConnection() );
264 emit( newMessage(QString("New client connection on usync port: # %1")
265 .arg(_uSockets->size()).toAscii(), true) );
266}
267
268void bncCaster::slotNewNMEAConnection() {
269 _nmeaSockets->push_back( _nmeaServer->nextPendingConnection() );
270 emit( newMessage(QString("New PPP client on port: # %1")
271 .arg(_nmeaSockets->size()).toAscii(), true) );
272}
273
274// Add New Thread
275////////////////////////////////////////////////////////////////////////////
276void bncCaster::addGetThread(bncGetThread* getThread, bool noNewThread) {
277
278 qRegisterMetaType<t_obs>("t_obs");
279 qRegisterMetaType< QList<t_obs> >("QList<t_obs>");
280 qRegisterMetaType<gpsephemeris>("gpsephemeris");
281 qRegisterMetaType<glonassephemeris>("glonassephemeris");
282 qRegisterMetaType<galileoephemeris>("galileoephemeris");
283
284 connect(getThread, SIGNAL(newObs(QByteArray, QList<t_obs>)),
285 this, SLOT(slotNewObs(QByteArray, QList<t_obs>)));
286
287 connect(getThread, SIGNAL(newObs(QByteArray, QList<t_obs>)),
288 this, SIGNAL(newObs(QByteArray, QList<t_obs>)));
289
290 connect(getThread, SIGNAL(newRawData(QByteArray, QByteArray)),
291 this, SLOT(slotNewRawData(QByteArray, QByteArray)));
292
293 connect(getThread, SIGNAL(getThreadFinished(QByteArray)),
294 this, SLOT(slotGetThreadFinished(QByteArray)));
295
296 _staIDs.push_back(getThread->staID());
297 _threads.push_back(getThread);
298
299 if (noNewThread) {
300 getThread->run();
301 }
302 else {
303 getThread->start();
304 }
305}
306
307// Get Thread destroyed
308////////////////////////////////////////////////////////////////////////////
309void bncCaster::slotGetThreadFinished(QByteArray staID) {
310 QMutexLocker locker(&_mutex);
311
312 QListIterator<bncGetThread*> it(_threads);
313 while (it.hasNext()) {
314 bncGetThread* thread = it.next();
315 if (thread->staID() == staID) {
316 _threads.removeOne(thread);
317 }
318 }
319
320 _staIDs.removeAll(staID);
321 emit( newMessage(
322 QString("Decoding %1 stream(s)").arg(_staIDs.size()).toAscii(), true) );
323 if (_staIDs.size() == 0) {
324 emit(newMessage("bncCaster: Last get thread terminated", true));
325 emit getThreadsFinished();
326 }
327}
328
329// Dump Complete Epochs
330////////////////////////////////////////////////////////////////////////////
331void bncCaster::dumpEpochs(long minTime, long maxTime) {
332
333 for (long sec = minTime; sec <= maxTime; sec++) {
334
335 if ( (_out || _sockets) &&
336 (_samplingRate == 0 || sec % _samplingRate == 0) ) {
337
338 QList<t_obs> allObs = _epochs->values(sec);
339
340 QListIterator<t_obs> it(allObs);
341 bool firstObs = true;
342 while (it.hasNext()) {
343 const t_obs& obs = it.next();
344
345 ostringstream oStr;
346 oStr.setf(ios::showpoint | ios::fixed);
347 if (firstObs) {
348 firstObs = false;
349 oStr << "> " << obs.GPSWeek << ' '
350 << setprecision(7) << obs.GPSWeeks << endl;;
351 }
352 oStr << obs.StatID << ' ' << bncRinex::asciiSatLine(obs) << endl;
353 if (!it.hasNext()) {
354 oStr << endl;
355 }
356 string hlpStr = oStr.str();
357
358 // Output into the File
359 // --------------------
360 if (_out) {
361 *_out << hlpStr.c_str();
362 _out->flush();
363 }
364
365 // Output into the socket
366 // ----------------------
367 if (_sockets) {
368 QMutableListIterator<QTcpSocket*> is(*_sockets);
369 while (is.hasNext()) {
370 QTcpSocket* sock = is.next();
371 if (sock->state() == QAbstractSocket::ConnectedState) {
372 int numBytes = hlpStr.length();
373 if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
374 delete sock;
375 is.remove();
376 }
377 }
378 else if (sock->state() != QAbstractSocket::ConnectingState) {
379 delete sock;
380 is.remove();
381 }
382 }
383 }
384 }
385 }
386 _epochs->remove(sec);
387 }
388}
389
390// Reread configuration (private slot)
391////////////////////////////////////////////////////////////////////////////
392void bncCaster::slotReadMountPoints() {
393
394 bncSettings settings;
395 settings.reRead();
396
397 readMountPoints();
398}
399
400// Read Mountpoints
401////////////////////////////////////////////////////////////////////////////
402void bncCaster::readMountPoints() {
403
404 bncSettings settings;
405
406 // Reread several options
407 // ----------------------
408 _samplingRate = settings.value("binSampl").toInt();
409 _waitTime = settings.value("waitTime").toInt();
410 if (_waitTime < 1) {
411 _waitTime = 1;
412 }
413
414 // Add new mountpoints
415 // -------------------
416 int iMount = -1;
417 QListIterator<QString> it(settings.value("mountPoints").toStringList());
418 while (it.hasNext()) {
419 ++iMount;
420 QStringList hlp = it.next().split(" ");
421 if (hlp.size() <= 1) continue;
422 QUrl url(hlp[0]);
423
424 // Does it already exist?
425 // ----------------------
426 bool existFlg = false;
427 QListIterator<bncGetThread*> iTh(_threads);
428 while (iTh.hasNext()) {
429 bncGetThread* thread = iTh.next();
430 if (thread->mountPoint() == url) {
431 existFlg = true;
432 break;
433 }
434 }
435
436 // New bncGetThread
437 // ----------------
438 if (!existFlg) {
439 QByteArray format = hlp[1].toAscii();
440 QByteArray latitude = hlp[2].toAscii();
441 QByteArray longitude = hlp[3].toAscii();
442 QByteArray nmea = hlp[4].toAscii();
443 QByteArray ntripVersion = hlp[5].toAscii();
444
445 bncGetThread* getThread = new bncGetThread(url, format, latitude,
446 longitude, nmea, ntripVersion);
447 addGetThread(getThread);
448 }
449 }
450
451 // Remove mountpoints
452 // ------------------
453 QListIterator<bncGetThread*> iTh(_threads);
454 while (iTh.hasNext()) {
455 bncGetThread* thread = iTh.next();
456
457 bool existFlg = false;
458 QListIterator<QString> it(settings.value("mountPoints").toStringList());
459 while (it.hasNext()) {
460 QStringList hlp = it.next().split(" ");
461 if (hlp.size() <= 1) continue;
462 QUrl url(hlp[0]);
463
464 if (thread->mountPoint() == url) {
465 existFlg = true;
466 break;
467 }
468 }
469
470 if (!existFlg) {
471 disconnect(thread, 0, 0, 0);
472 _staIDs.removeAll(thread->staID());
473 _threads.removeAll(thread);
474 thread->terminate();
475 }
476 }
477
478 emit mountPointsRead(_threads);
479 emit( newMessage(QString("Configuration read: "
480 + BNC_CORE->confFileName()
481 + ", %1 stream(s)")
482 .arg(_threads.count()).toAscii(), true) );
483
484 // (Re-) Start the configuration timer
485 // -----------------------------------
486 int ms = 0;
487
488 if (_confInterval != -1) {
489 ms = 1000 * _confInterval;
490 }
491 else {
492 QTime currTime = currentDateAndTimeGPS().time();
493 QTime nextShotTime;
494
495 if (settings.value("onTheFlyInterval").toString() == "1 min") {
496 _confInterval = 60;
497 nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
498 }
499 else if (settings.value("onTheFlyInterval").toString() == "5 min") {
500 _confInterval = 300;
501 nextShotTime = QTime(currTime.hour(), currTime.minute()+5, 0);
502 }
503 else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
504 _confInterval = 3600;
505 nextShotTime = QTime(currTime.hour()+1, 0, 0);
506 }
507 else {
508 _confInterval = 86400;
509 nextShotTime = QTime(23, 59, 59, 999);
510 }
511
512 ms = currTime.msecsTo(nextShotTime);
513 if (ms < 30000) {
514 ms = 30000;
515 }
516 }
517
518 QTimer::singleShot(ms, this, SLOT(slotReadMountPoints()));
519}
520
521//
522////////////////////////////////////////////////////////////////////////////
523int bncCaster::myWrite(QTcpSocket* sock, const char* buf, int bufLen) {
524 sock->write(buf, bufLen);
525 for (int ii = 1; ii <= 10; ii++) {
526 if (sock->waitForBytesWritten(10)) { // wait 10 ms
527 return bufLen;
528 }
529 }
530 return -1;
531}
532
533//
534////////////////////////////////////////////////////////////////////////////
535void bncCaster::slotNewNMEAstr(QByteArray /* staID */, QByteArray str) {
536 if (_nmeaSockets) {
537 QMutableListIterator<QTcpSocket*> is(*_nmeaSockets);
538 while (is.hasNext()) {
539 QTcpSocket* sock = is.next();
540 if (sock->state() == QAbstractSocket::ConnectedState) {
541 sock->write(str);
542 }
543 else if (sock->state() != QAbstractSocket::ConnectingState) {
544 delete sock;
545 is.remove();
546 }
547 }
548 }
549}
550
551//
552////////////////////////////////////////////////////////////////////////////
553void bncCaster::reopenOutFile() {
554
555 bncSettings settings;
556
557 QString outFileName = settings.value("outFile").toString();
558 if ( !outFileName.isEmpty() ) {
559 expandEnvVar(outFileName);
560 if (!_outFile || _outFile->fileName() != outFileName) {
561 delete _out;
562 delete _outFile;
563 _outFile = new QFile(outFileName);
564 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
565 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
566 }
567 else {
568 _outFile->open(QIODevice::WriteOnly);
569 }
570 _out = new QTextStream(_outFile);
571 _out->setRealNumberNotation(QTextStream::FixedNotation);
572 }
573 }
574 else {
575 delete _out; _out = 0;
576 delete _outFile; _outFile = 0;
577 }
578}
579
580// Output into the Miscellaneous socket
581////////////////////////////////////////////////////////////////////////////
582void bncCaster::slotNewRawData(QByteArray staID, QByteArray data) {
583 if (_miscSockets && (_miscMount == "ALL" || _miscMount == staID)) {
584 QMutableListIterator<QTcpSocket*> is(*_miscSockets);
585 while (is.hasNext()) {
586 QTcpSocket* sock = is.next();
587 if (sock->state() == QAbstractSocket::ConnectedState) {
588 sock->write(data);
589 }
590 else if (sock->state() != QAbstractSocket::ConnectingState) {
591 delete sock;
592 is.remove();
593 }
594 }
595 }
596}
597
598// New Connection
599////////////////////////////////////////////////////////////////////////////
600void bncCaster::slotNewMiscConnection() {
601 _miscSockets->push_back( _miscServer->nextPendingConnection() );
602 emit( newMessage(QString("New client connection on Miscellaneous Output Port: # %1")
603 .arg(_miscSockets->size()).toAscii(), true) );
604}
Note: See TracBrowser for help on using the repository browser.