1 | // Part of BNC, a utility for retrieving decoding and
2 | // converting GNSS data streams from NTRIP broadcasters.
3 | //
4 | // Copyright (C) 2007
5 | // German Federal Agency for Cartography and Geodesy (BKG)
6 | // http://www.bkg.bund.de
7 | // Czech Technical University Prague, Department of Geodesy
8 | // http://www.fsv.cvut.cz
9 | //
10 | // Email: euref-ip@bkg.bund.de
11 | //
12 | // This program is free software; you can redistribute it and/or
13 | // modify it under the terms of the GNU General Public License
14 | // as published by the Free Software Foundation, version 2.
15 | //
16 | // This program is distributed in the hope that it will be useful,
17 | // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 | // GNU General Public License for more details.
20 | //
21 | // You should have received a copy of the GNU General Public License
22 | // along with this program; if not, write to the Free Software
23 | // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24 |
25 | /* -------------------------------------------------------------------------
26 | * BKG NTRIP Client
27 | * -------------------------------------------------------------------------
28 | *
29 | * Class: bncCaster
30 | *
31 | * Purpose: buffers and disseminates the data
32 | *
33 | * Author: L. Mervart
34 | *
35 | * Created: 24-Dec-2005
36 | *
37 | * Changes:
38 | *
39 | * -----------------------------------------------------------------------*/
40 |
41 | #include <math.h>
42 | #include <unistd.h>
43 | #include <iostream>
44 | #include <iomanip>
45 | #include <sstream>
46 |
47 | #include "bnccaster.h"
48 | #include "bncrinex.h"
49 | #include "bnccore.h"
50 | #include "bncgetthread.h"
51 | #include "bncutils.h"
52 | #include "bncsettings.h"
53 |
54 | using namespace std;
55 |
56 | // Constructor
57 | ////////////////////////////////////////////////////////////////////////////
58 | bncCaster::bncCaster() {
59 |
60 | bncSettings settings;
61 |
62 | connect(this, SIGNAL(newMessage(QByteArray,bool)),
63 | BNC_CORE, SLOT(slotMessage(const QByteArray,bool)));
64 |
65 | _outFile = 0;
66 | _out = 0;
67 | reopenOutFile();
68 |
69 | int port = settings.value("outPort").toInt();
70 |
71 | if (port != 0) {
72 | _server = new QTcpServer;
73 | if ( !_server->listen(QHostAddress::Any, port) ) {
74 | emit newMessage("bncCaster: Cannot listen on sync port", true);
75 | }
76 | connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
77 | _sockets = new QList<QTcpSocket*>;
78 | }
79 | else {
80 | _server = 0;
81 | _sockets = 0;
82 | }
83 |
84 | int uPort = settings.value("outUPort").toInt();
85 | if (uPort != 0) {
86 | _uServer = new QTcpServer;
87 | if ( !_uServer->listen(QHostAddress::Any, uPort) ) {
88 | emit newMessage("bncCaster: Cannot listen on usync port", true);
89 | }
90 | connect(_uServer, SIGNAL(newConnection()), this, SLOT(slotNewUConnection()));
91 | _uSockets = new QList<QTcpSocket*>;
92 | }
93 | else {
94 | _uServer = 0;
95 | _uSockets = 0;
96 | }
97 |
98 | _outLockTime = settings.value("outLockTime",false).toBool();
99 | _samplingRateMult10 = int(settings.value("outSampl").toString().split("sec").first().toDouble() * 10.0);
100 | _outWait = settings.value("outWait").toDouble();
101 | if (_outWait <= 0.0) {
102 | _outWait = 0.01;
103 | }
104 | _confInterval = -1;
105 |
106 | // Miscellaneous output port
107 | // -------------------------
108 | _miscMount = settings.value("miscMount").toString();
109 | _miscPort = settings.value("miscPort").toInt();
110 | if (!_miscMount.isEmpty() && _miscPort != 0) {
111 | _miscServer = new QTcpServer;
112 | if ( !_miscServer->listen(QHostAddress::Any, _miscPort) ) {
113 | emit newMessage("bncCaster: Cannot listen on Miscellaneous Output Port", true);
114 | }
115 | connect(_miscServer, SIGNAL(newConnection()), this, SLOT(slotNewMiscConnection()));
116 | _miscSockets = new QList<QTcpSocket*>;
117 | }
118 | else {
119 | _miscServer = 0;
120 | _miscSockets = 0;
121 | }
122 | }
123 |
124 | // Destructor
125 | ////////////////////////////////////////////////////////////////////////////
126 | bncCaster::~bncCaster() {
127 |
128 | QMutexLocker locker(&_mutex);
129 |
130 | QListIterator<bncGetThread*> it(_threads);
131 | while(it.hasNext()){
132 | bncGetThread* thread = it.next();
133 | disconnect(thread, 0, 0, 0);
134 | _staIDs.removeAll(thread->staID());
135 | _threads.removeAll(thread);
136 | thread->terminate();
137 | }
138 | delete _out;
139 | delete _outFile;
140 | delete _server;
141 | delete _sockets;
142 | delete _uServer;
143 | delete _uSockets;
144 | delete _miscServer;
145 | delete _miscSockets;
146 | }
147 |
148 | // New Observations
149 | ////////////////////////////////////////////////////////////////////////////
150 | void bncCaster::slotNewObs(const QByteArray staID, QList<t_satObs> obsList) {
151 |
152 | QMutexLocker locker(&_mutex);
153 |
154 | reopenOutFile();
155 |
156 | unsigned index = 0;
157 | QMutableListIterator<t_satObs> it(obsList);
158 | while (it.hasNext()) {
159 | ++index;
160 | t_satObs& obs = it.next();
161 |
162 | // Rename the Station
163 | // ------------------
164 | obs._staID = staID.data();
165 |
166 | // Output into the socket
167 | // ----------------------
168 | if (_uSockets) {
169 |
170 | ostringstream oStr;
171 | oStr.setf(ios::showpoint | ios::fixed);
172 | oStr << obs._staID << " "
173 | << setw(4) << obs._time.gpsw() << " "
174 | << setw(14) << setprecision(7) << obs._time.gpssec() << " "
175 | << bncRinex::asciiSatLine(obs,_outLockTime) << endl;
176 |
177 | string hlpStr = oStr.str();
178 |
179 | QMutableListIterator<QTcpSocket*> is(*_uSockets);
180 | while (is.hasNext()) {
181 | QTcpSocket* sock = is.next();
182 | if (sock->state() == QAbstractSocket::ConnectedState) {
183 | int numBytes = hlpStr.length();
184 | if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
185 | delete sock;
186 | is.remove();
187 | }
188 | }
189 | else if (sock->state() != QAbstractSocket::ConnectingState) {
190 | delete sock;
191 | is.remove();
192 | }
193 | }
194 | }
195 |
196 | // First time: set the _lastDumpTime
197 | // ---------------------------------
198 | if (!_lastDumpTime.valid()) {
199 | _lastDumpTime = obs._time - 1.0;
200 | }
201 |
202 | // An old observation - throw it away
203 | // ----------------------------------
204 | if (obs._time <= _lastDumpTime) {
205 | if (index == 1) {
206 | bncSettings settings;
207 | if ( !settings.value("outFile").toString().isEmpty() ||
208 | !settings.value("outPort").toString().isEmpty() ) {
209 | emit( newMessage(QString("%1: Old epoch %2 thrown away")
210 | .arg(staID.data()).arg(string(obs._time).c_str())
211 | .toAscii(), true) );
212 | }
213 | }
214 | continue;
215 | }
216 |
217 | // Save the observation
218 | // --------------------
219 | _epochs[obs._time].append(obs);
220 |
221 | // Dump Epochs
222 | // -----------
223 | if (obs._time - _outWait > _lastDumpTime) {
224 | dumpEpochs(obs._time - _outWait);
225 | _lastDumpTime = obs._time - _outWait;
226 | }
227 | }
228 | }
229 |
230 | // New Connection
231 | ////////////////////////////////////////////////////////////////////////////
232 | void bncCaster::slotNewConnection() {
233 | _sockets->push_back( _server->nextPendingConnection() );
234 | emit( newMessage(QString("New client connection on sync port: # %1")
235 | .arg(_sockets->size()).toAscii(), true) );
236 | }
237 |
238 | void bncCaster::slotNewUConnection() {
239 | _uSockets->push_back( _uServer->nextPendingConnection() );
240 | emit( newMessage(QString("New client connection on usync port: # %1")
241 | .arg(_uSockets->size()).toAscii(), true) );
242 | }
243 |
244 | // Add New Thread
245 | ////////////////////////////////////////////////////////////////////////////
246 | void bncCaster::addGetThread(bncGetThread* getThread, bool noNewThread) {
247 |
248 | qRegisterMetaType<t_satObs>("t_satObs");
249 | qRegisterMetaType< QList<t_satObs> >("QList<t_satObs>");
250 |
251 | connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
252 | this, SLOT(slotNewObs(QByteArray, QList<t_satObs>)));
253 |
254 | connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
255 | this, SIGNAL(newObs(QByteArray, QList<t_satObs>)));
256 |
257 | connect(getThread, SIGNAL(newRawData(QByteArray, QByteArray)),
258 | this, SLOT(slotNewRawData(QByteArray, QByteArray)));
259 |
260 | connect(getThread, SIGNAL(getThreadFinished(QByteArray)),
261 | this, SLOT(slotGetThreadFinished(QByteArray)));
262 |
263 | _staIDs.push_back(getThread->staID());
264 | _threads.push_back(getThread);
265 |
266 | if (noNewThread) {
267 | getThread->run();
268 | }
269 | else {
270 | getThread->start();
271 | }
272 | }
273 |
274 | // Get Thread destroyed
275 | ////////////////////////////////////////////////////////////////////////////
276 | void bncCaster::slotGetThreadFinished(QByteArray staID) {
277 | QMutexLocker locker(&_mutex);
278 |
279 | QListIterator<bncGetThread*> it(_threads);
280 | while (it.hasNext()) {
281 | bncGetThread* thread = it.next();
282 | if (thread->staID() == staID) {
283 | _threads.removeOne(thread);
284 | }
285 | }
286 |
287 | _staIDs.removeAll(staID);
288 | emit( newMessage(
289 | QString("Decoding %1 stream(s)").arg(_staIDs.size()).toAscii(), true) );
290 | if (_staIDs.size() == 0) {
291 | emit(newMessage("bncCaster: Last get thread terminated", true));
292 | emit getThreadsFinished();
293 | }
294 | }
295 |
296 | // Dump Complete Epochs
297 | ////////////////////////////////////////////////////////////////////////////
298 | void bncCaster::dumpEpochs(const bncTime& maxTime) {
299 |
300 | QMutableMapIterator<bncTime, QList<t_satObs> > itEpo(_epochs);
301 | while (itEpo.hasNext()) {
302 | itEpo.next();
303 | const bncTime& epoTime = itEpo.key();
304 | if (epoTime <= maxTime) {
305 | const QList<t_satObs>& allObs = itEpo.value();
306 | int sec = int(nint(epoTime.gpssec()*10));
307 | if ( (_out || _sockets) && (sec % (_samplingRateMult10) == 0) ) {
308 | QListIterator<t_satObs> it(allObs);
309 | bool firstObs = true;
310 | while (it.hasNext()) {
311 | const t_satObs& obs = it.next();
312 |
313 | ostringstream oStr;
314 | oStr.setf(ios::showpoint | ios::fixed);
315 | if (firstObs) {
316 | firstObs = false;
317 | oStr << "> " << obs._time.gpsw() << ' '
318 | << setprecision(7) << obs._time.gpssec() << endl;
319 | }
320 | oStr << obs._staID << ' '
321 | << bncRinex::asciiSatLine(obs,_outLockTime) << endl;
322 | if (!it.hasNext()) {
323 | oStr << endl;
324 | }
325 | string hlpStr = oStr.str();
326 |
327 | // Output into the File
328 | // --------------------
329 | if (_out) {
330 | *_out << hlpStr.c_str();
331 | _out->flush();
332 | }
333 |
334 | // Output into the socket
335 | // ----------------------
336 | if (_sockets) {
337 | QMutableListIterator<QTcpSocket*> is(*_sockets);
338 | while (is.hasNext()) {
339 | QTcpSocket* sock = is.next();
340 | if (sock->state() == QAbstractSocket::ConnectedState) {
341 | int numBytes = hlpStr.length();
342 | if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
343 | delete sock;
344 | is.remove();
345 | }
346 | }
347 | else if (sock->state() != QAbstractSocket::ConnectingState) {
348 | delete sock;
349 | is.remove();
350 | }
351 | }
352 | }
353 | }
354 | }
355 | _epochs.remove(epoTime);
356 | }
357 | }
358 | }
359 |
360 | // Reread configuration (private slot)
361 | ////////////////////////////////////////////////////////////////////////////
362 | void bncCaster::slotReadMountPoints() {
363 |
364 | bncSettings settings;
365 | settings.reRead();
366 |
367 | readMountPoints();
368 | }
369 |
370 | // Read Mountpoints
371 | ////////////////////////////////////////////////////////////////////////////
372 | void bncCaster::readMountPoints() {
373 |
374 | bncSettings settings;
375 |
376 | // Reread several options
377 | // ----------------------
378 | _samplingRateMult10 = int(settings.value("outSampl").toString().split("sec").first().toDouble() * 10.0);
379 | _outWait = settings.value("outWait").toInt();
380 | if (_outWait < 1) {
381 | _outWait = 1;
382 | }
383 |
384 | // Add new mountpoints
385 | // -------------------
386 | int iMount = -1;
387 | QListIterator<QString> it(settings.value("mountPoints").toStringList());
388 | while (it.hasNext()) {
389 | ++iMount;
390 | QStringList hlp = it.next().split(" ");
391 | if (hlp.size() < 7) continue;
392 | QUrl url(hlp[0]);
393 |
394 | // Does it already exist?
395 | // ----------------------
396 | bool existFlg = false;
397 | QListIterator<bncGetThread*> iTh(_threads);
398 | while (iTh.hasNext()) {
399 | bncGetThread* thread = iTh.next();
400 | if (thread->mountPoint() == url) {
401 | existFlg = true;
402 | break;
403 | }
404 | }
405 |
406 | // New bncGetThread
407 | // ----------------
408 | if (!existFlg) {
409 | QByteArray format = hlp[1].toAscii();
410 | QByteArray latitude = hlp[3].toAscii();
411 | QByteArray longitude = hlp[4].toAscii();
412 | QByteArray nmea = hlp[5].toAscii();
413 | QByteArray ntripVersion = hlp[6].toAscii();
414 |
415 | bncGetThread* getThread = new bncGetThread(url, format, latitude,
416 | longitude, nmea, ntripVersion);
417 | addGetThread(getThread);
418 |
419 | }
420 | }
421 |
422 | // Remove mountpoints
423 | // ------------------
424 | QListIterator<bncGetThread*> iTh(_threads);
425 | while (iTh.hasNext()) {
426 | bncGetThread* thread = iTh.next();
427 |
428 | bool existFlg = false;
429 | QListIterator<QString> it(settings.value("mountPoints").toStringList());
430 | while (it.hasNext()) {
431 | QStringList hlp = it.next().split(" ");
432 | if (hlp.size() <= 1) continue;
433 | QUrl url(hlp[0]);
434 |
435 | if (thread->mountPoint() == url) {
436 | existFlg = true;
437 | break;
438 | }
439 | }
440 |
441 | if (!existFlg) {
442 | disconnect(thread, 0, 0, 0);
443 | _staIDs.removeAll(thread->staID());
444 | _threads.removeAll(thread);
445 | thread->terminate();
446 | }
447 | }
448 |
449 | emit mountPointsRead(_threads);
450 | emit( newMessage(QString("Configuration read: "
451 | + BNC_CORE->confFileName()
452 | + ", %1 stream(s)")
453 | .arg(_threads.count()).toAscii(), true) );
454 |
455 | // (Re-) Start the configuration timer
456 | // -----------------------------------
457 | if (settings.value("onTheFlyInterval").toString() == "no") {
458 | return;
459 | }
460 |
461 | int ms = 0;
462 |
463 | if (_confInterval != -1) {
464 | ms = 1000 * _confInterval;
465 | }
466 | else {
467 | QTime currTime = currentDateAndTimeGPS().time();
468 | QTime nextShotTime;
469 |
470 | if (settings.value("onTheFlyInterval").toString() == "1 min") {
471 | _confInterval = 60;
472 | nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
473 | }
474 | else if (settings.value("onTheFlyInterval").toString() == "5 min") {
475 | _confInterval = 300;
476 | nextShotTime = QTime(currTime.hour(), currTime.minute()+5, 0);
477 | }
478 | else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
479 | _confInterval = 3600;
480 | nextShotTime = QTime(currTime.hour()+1, 0, 0);
481 | }
482 | else {
483 | _confInterval = 86400;
484 | nextShotTime = QTime(23, 59, 59, 999);
485 | }
486 |
487 | ms = currTime.msecsTo(nextShotTime);
488 | if (ms < 30000) {
489 | ms = 30000;
490 | }
491 | }
492 |
493 | QTimer::singleShot(ms, this, SLOT(slotReadMountPoints()));
494 | }
495 |
496 | //
497 | ////////////////////////////////////////////////////////////////////////////
498 | int bncCaster::myWrite(QTcpSocket* sock, const char* buf, int bufLen) {
499 | sock->write(buf, bufLen);
500 | for (int ii = 1; ii <= 10; ii++) {
501 | if (sock->waitForBytesWritten(10)) { // wait 10 ms
502 | return bufLen;
503 | }
504 | }
505 | return -1;
506 | }
507 |
508 | //
509 | ////////////////////////////////////////////////////////////////////////////
510 | void bncCaster::reopenOutFile() {
511 |
512 | bncSettings settings;
513 |
514 | QString outFileName = settings.value("outFile").toString();
515 | if ( !outFileName.isEmpty() ) {
516 | expandEnvVar(outFileName);
517 | if (!_outFile || _outFile->fileName() != outFileName) {
518 | delete _out;
519 | delete _outFile;
520 | _outFile = new QFile(outFileName);
521 | if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
522 | _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
523 | }
524 | else {
525 | _outFile->open(QIODevice::WriteOnly);
526 | }
527 | _out = new QTextStream(_outFile);
528 | _out->setRealNumberNotation(QTextStream::FixedNotation);
529 | }
530 | }
531 | else {
532 | delete _out; _out = 0;
533 | delete _outFile; _outFile = 0;
534 | }
535 | }
536 |
537 | // Output into the Miscellaneous socket
538 | ////////////////////////////////////////////////////////////////////////////
539 | void bncCaster::slotNewRawData(QByteArray staID, QByteArray data) {
540 | if (_miscSockets && (_miscMount == "ALL" || _miscMount == staID)) {
541 | QMutableListIterator<QTcpSocket*> is(*_miscSockets);
542 | while (is.hasNext()) {
543 | QTcpSocket* sock = is.next();
544 | if (sock->state() == QAbstractSocket::ConnectedState) {
545 | sock->write(data);
546 | }
547 | else if (sock->state() != QAbstractSocket::ConnectingState) {
548 | delete sock;
549 | is.remove();
550 | }
551 | }
552 | }
553 | }
554 |
555 | // New Connection
556 | ////////////////////////////////////////////////////////////////////////////
557 | void bncCaster::slotNewMiscConnection() {
558 | _miscSockets->push_back( _miscServer->nextPendingConnection() );
559 | emit( newMessage(QString("New client connection on Miscellaneous Output Port: # %1")
560 | .arg(_miscSockets->size()).toAscii(), true) );
561 | }