source: ntrip/trunk/BNC/src/bnccaster.cpp@ 7181

Last change on this file since 7181 was 7180, checked in by stuerze, 10 years ago

NMEA TCP port functionlity removed into bncgetthread class in order to have it available for each PPP station

File size: 16.1 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <math.h>
42#include <unistd.h>
43#include <iostream>
44#include <iomanip>
45#include <sstream>
46
47#include "bnccaster.h"
48#include "bncrinex.h"
49#include "bnccore.h"
50#include "bncgetthread.h"
51#include "bncutils.h"
52#include "bncsettings.h"
53
54using namespace std;
55
56// Constructor
57////////////////////////////////////////////////////////////////////////////
58bncCaster::bncCaster() {
59
60 bncSettings settings;
61
62 connect(this, SIGNAL(newMessage(QByteArray,bool)),
63 BNC_CORE, SLOT(slotMessage(const QByteArray,bool)));
64
65 _outFile = 0;
66 _out = 0;
67 reopenOutFile();
68
69 int port = settings.value("outPort").toInt();
70
71 if (port != 0) {
72 _server = new QTcpServer;
73 if ( !_server->listen(QHostAddress::Any, port) ) {
74 emit newMessage("bncCaster: Cannot listen on sync port", true);
75 }
76 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
77 _sockets = new QList<QTcpSocket*>;
78 }
79 else {
80 _server = 0;
81 _sockets = 0;
82 }
83
84 int uPort = settings.value("outUPort").toInt();
85 if (uPort != 0) {
86 _uServer = new QTcpServer;
87 if ( !_uServer->listen(QHostAddress::Any, uPort) ) {
88 emit newMessage("bncCaster: Cannot listen on usync port", true);
89 }
90 connect(_uServer, SIGNAL(newConnection()), this, SLOT(slotNewUConnection()));
91 _uSockets = new QList<QTcpSocket*>;
92 }
93 else {
94 _uServer = 0;
95 _uSockets = 0;
96 }
97
98 _samplingRate = settings.value("binSampl").toInt();
99 _waitTime = settings.value("waitTime").toDouble();
100 if (_waitTime <= 0.0) {
101 _waitTime = 0.01;
102 }
103 _confInterval = -1;
104
105 // Miscellaneous output port
106 // -------------------------
107 _miscMount = settings.value("miscMount").toString();
108 _miscPort = settings.value("miscPort").toInt();
109 if (!_miscMount.isEmpty() && _miscPort != 0) {
110 _miscServer = new QTcpServer;
111 if ( !_miscServer->listen(QHostAddress::Any, _miscPort) ) {
112 emit newMessage("bncCaster: Cannot listen on Miscellaneous Output Port", true);
113 }
114 connect(_miscServer, SIGNAL(newConnection()), this, SLOT(slotNewMiscConnection()));
115 _miscSockets = new QList<QTcpSocket*>;
116 }
117 else {
118 _miscServer = 0;
119 _miscSockets = 0;
120 }
121}
122
123// Destructor
124////////////////////////////////////////////////////////////////////////////
125bncCaster::~bncCaster() {
126
127 QMutexLocker locker(&_mutex);
128
129 QListIterator<bncGetThread*> it(_threads);
130 while(it.hasNext()){
131 bncGetThread* thread = it.next();
132 thread->terminate();
133 }
134 delete _out;
135 delete _outFile;
136 delete _server;
137 delete _sockets;
138 delete _uServer;
139 delete _uSockets;
140 delete _miscServer;
141 delete _miscSockets;
142}
143
144// New Observations
145////////////////////////////////////////////////////////////////////////////
146void bncCaster::slotNewObs(const QByteArray staID, QList<t_satObs> obsList) {
147
148 QMutexLocker locker(&_mutex);
149
150 reopenOutFile();
151
152 unsigned index = 0;
153 QMutableListIterator<t_satObs> it(obsList);
154 while (it.hasNext()) {
155 ++index;
156 t_satObs& obs = it.next();
157
158 // Rename the Station
159 // ------------------
160 obs._staID = staID.data();
161
162 // Output into the socket
163 // ----------------------
164 if (_uSockets) {
165
166 ostringstream oStr;
167 oStr.setf(ios::showpoint | ios::fixed);
168 oStr << obs._staID << " "
169 << setw(4) << obs._time.gpsw() << " "
170 << setw(14) << setprecision(7) << obs._time.gpssec() << " "
171 << bncRinex::asciiSatLine(obs) << endl;
172
173 string hlpStr = oStr.str();
174
175 QMutableListIterator<QTcpSocket*> is(*_uSockets);
176 while (is.hasNext()) {
177 QTcpSocket* sock = is.next();
178 if (sock->state() == QAbstractSocket::ConnectedState) {
179 int numBytes = hlpStr.length();
180 if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
181 delete sock;
182 is.remove();
183 }
184 }
185 else if (sock->state() != QAbstractSocket::ConnectingState) {
186 delete sock;
187 is.remove();
188 }
189 }
190 }
191
192 // First time: set the _lastDumpTime
193 // ---------------------------------
194 if (!_lastDumpTime.valid()) {
195 _lastDumpTime = obs._time - 1.0;
196 }
197
198 // An old observation - throw it away
199 // ----------------------------------
200 if (obs._time <= _lastDumpTime) {
201 if (index == 1) {
202 bncSettings settings;
203 if ( !settings.value("outFile").toString().isEmpty() ||
204 !settings.value("outPort").toString().isEmpty() ) {
205 emit( newMessage(QString("%1: Old epoch %2 thrown away")
206 .arg(staID.data()).arg(string(obs._time).c_str())
207 .toAscii(), true) );
208 }
209 }
210 continue;
211 }
212
213 // Save the observation
214 // --------------------
215 _epochs[obs._time].push_back(obs);
216
217 // Dump Epochs
218 // -----------
219 if (obs._time - _waitTime > _lastDumpTime) {
220 dumpEpochs(obs._time - _waitTime);
221 _lastDumpTime = obs._time - _waitTime;
222 }
223 }
224}
225
226// New Connection
227////////////////////////////////////////////////////////////////////////////
228void bncCaster::slotNewConnection() {
229 _sockets->push_back( _server->nextPendingConnection() );
230 emit( newMessage(QString("New client connection on sync port: # %1")
231 .arg(_sockets->size()).toAscii(), true) );
232}
233
234void bncCaster::slotNewUConnection() {
235 _uSockets->push_back( _uServer->nextPendingConnection() );
236 emit( newMessage(QString("New client connection on usync port: # %1")
237 .arg(_uSockets->size()).toAscii(), true) );
238}
239
240// Add New Thread
241////////////////////////////////////////////////////////////////////////////
242void bncCaster::addGetThread(bncGetThread* getThread, bool noNewThread) {
243
244 qRegisterMetaType<t_satObs>("t_satObs");
245 qRegisterMetaType< QList<t_satObs> >("QList<t_satObs>");
246
247 connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
248 this, SLOT(slotNewObs(QByteArray, QList<t_satObs>)));
249
250 connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
251 this, SIGNAL(newObs(QByteArray, QList<t_satObs>)));
252
253 connect(getThread, SIGNAL(newRawData(QByteArray, QByteArray)),
254 this, SLOT(slotNewRawData(QByteArray, QByteArray)));
255
256 connect(getThread, SIGNAL(getThreadFinished(QByteArray)),
257 this, SLOT(slotGetThreadFinished(QByteArray)));
258
259 _staIDs.push_back(getThread->staID());
260 _threads.push_back(getThread);
261
262 if (noNewThread) {
263 getThread->run();
264 }
265 else {
266 getThread->start();
267 }
268}
269
270// Get Thread destroyed
271////////////////////////////////////////////////////////////////////////////
272void bncCaster::slotGetThreadFinished(QByteArray staID) {
273 QMutexLocker locker(&_mutex);
274
275 QListIterator<bncGetThread*> it(_threads);
276 while (it.hasNext()) {
277 bncGetThread* thread = it.next();
278 if (thread->staID() == staID) {
279 _threads.removeOne(thread);
280 }
281 }
282
283 _staIDs.removeAll(staID);
284 emit( newMessage(
285 QString("Decoding %1 stream(s)").arg(_staIDs.size()).toAscii(), true) );
286 if (_staIDs.size() == 0) {
287 emit(newMessage("bncCaster: Last get thread terminated", true));
288 emit getThreadsFinished();
289 }
290}
291
292// Dump Complete Epochs
293////////////////////////////////////////////////////////////////////////////
294void bncCaster::dumpEpochs(const bncTime& maxTime) {
295
296 QMutableMapIterator<bncTime, QList<t_satObs> > itEpo(_epochs);
297 while (itEpo.hasNext()) {
298 itEpo.next();
299 const bncTime& epoTime = itEpo.key();
300 if (epoTime <= maxTime) {
301 const QList<t_satObs>& allObs = itEpo.value();
302 int sec = int(nint(epoTime.gpssec()));
303 if ( (_out || _sockets) && (_samplingRate == 0 || sec % _samplingRate == 0) ) {
304
305 QListIterator<t_satObs> it(allObs);
306 bool firstObs = true;
307 while (it.hasNext()) {
308 const t_satObs& obs = it.next();
309
310 ostringstream oStr;
311 oStr.setf(ios::showpoint | ios::fixed);
312 if (firstObs) {
313 firstObs = false;
314 oStr << "> " << obs._time.gpsw() << ' '
315 << setprecision(7) << obs._time.gpssec() << endl;
316 }
317 oStr << obs._staID << ' ' << bncRinex::asciiSatLine(obs) << endl;
318 if (!it.hasNext()) {
319 oStr << endl;
320 }
321 string hlpStr = oStr.str();
322
323 // Output into the File
324 // --------------------
325 if (_out) {
326 *_out << hlpStr.c_str();
327 _out->flush();
328 }
329
330 // Output into the socket
331 // ----------------------
332 if (_sockets) {
333 QMutableListIterator<QTcpSocket*> is(*_sockets);
334 while (is.hasNext()) {
335 QTcpSocket* sock = is.next();
336 if (sock->state() == QAbstractSocket::ConnectedState) {
337 int numBytes = hlpStr.length();
338 if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
339 delete sock;
340 is.remove();
341 }
342 }
343 else if (sock->state() != QAbstractSocket::ConnectingState) {
344 delete sock;
345 is.remove();
346 }
347 }
348 }
349 }
350 }
351 _epochs.remove(epoTime);
352 }
353 }
354}
355
356// Reread configuration (private slot)
357////////////////////////////////////////////////////////////////////////////
358void bncCaster::slotReadMountPoints() {
359
360 bncSettings settings;
361 settings.reRead();
362
363 readMountPoints();
364}
365
366// Read Mountpoints
367////////////////////////////////////////////////////////////////////////////
368void bncCaster::readMountPoints() {
369
370 bncSettings settings;
371
372 // Reread several options
373 // ----------------------
374 _samplingRate = settings.value("binSampl").toInt();
375 _waitTime = settings.value("waitTime").toInt();
376 if (_waitTime < 1) {
377 _waitTime = 1;
378 }
379
380 // Add new mountpoints
381 // -------------------
382 int iMount = -1;
383 QListIterator<QString> it(settings.value("mountPoints").toStringList());
384 while (it.hasNext()) {
385 ++iMount;
386 QStringList hlp = it.next().split(" ");
387 if (hlp.size() <= 1) continue;
388 QUrl url(hlp[0]);
389
390 // Does it already exist?
391 // ----------------------
392 bool existFlg = false;
393 QListIterator<bncGetThread*> iTh(_threads);
394 while (iTh.hasNext()) {
395 bncGetThread* thread = iTh.next();
396 if (thread->mountPoint() == url) {
397 existFlg = true;
398 break;
399 }
400 }
401
402 // New bncGetThread
403 // ----------------
404 if (!existFlg) {
405 QByteArray format = hlp[1].toAscii();
406 QByteArray latitude = hlp[3].toAscii();
407 QByteArray longitude = hlp[4].toAscii();
408 QByteArray nmea = hlp[5].toAscii();
409 QByteArray ntripVersion = hlp[6].toAscii();
410
411 bncGetThread* getThread = new bncGetThread(url, format, latitude,
412 longitude, nmea, ntripVersion);
413 addGetThread(getThread);
414
415 }
416 }
417
418 // Remove mountpoints
419 // ------------------
420 QListIterator<bncGetThread*> iTh(_threads);
421 while (iTh.hasNext()) {
422 bncGetThread* thread = iTh.next();
423
424 bool existFlg = false;
425 QListIterator<QString> it(settings.value("mountPoints").toStringList());
426 while (it.hasNext()) {
427 QStringList hlp = it.next().split(" ");
428 if (hlp.size() <= 1) continue;
429 QUrl url(hlp[0]);
430
431 if (thread->mountPoint() == url) {
432 existFlg = true;
433 break;
434 }
435 }
436
437 if (!existFlg) {
438 disconnect(thread, 0, 0, 0);
439 _staIDs.removeAll(thread->staID());
440 _threads.removeAll(thread);
441 thread->terminate();
442 }
443 }
444
445 emit mountPointsRead(_threads);
446 emit( newMessage(QString("Configuration read: "
447 + BNC_CORE->confFileName()
448 + ", %1 stream(s)")
449 .arg(_threads.count()).toAscii(), true) );
450
451 // (Re-) Start the configuration timer
452 // -----------------------------------
453 int ms = 0;
454
455 if (_confInterval != -1) {
456 ms = 1000 * _confInterval;
457 }
458 else {
459 QTime currTime = currentDateAndTimeGPS().time();
460 QTime nextShotTime;
461
462 if (settings.value("onTheFlyInterval").toString() == "1 min") {
463 _confInterval = 60;
464 nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
465 }
466 else if (settings.value("onTheFlyInterval").toString() == "5 min") {
467 _confInterval = 300;
468 nextShotTime = QTime(currTime.hour(), currTime.minute()+5, 0);
469 }
470 else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
471 _confInterval = 3600;
472 nextShotTime = QTime(currTime.hour()+1, 0, 0);
473 }
474 else {
475 _confInterval = 86400;
476 nextShotTime = QTime(23, 59, 59, 999);
477 }
478
479 ms = currTime.msecsTo(nextShotTime);
480 if (ms < 30000) {
481 ms = 30000;
482 }
483 }
484
485 QTimer::singleShot(ms, this, SLOT(slotReadMountPoints()));
486}
487
488//
489////////////////////////////////////////////////////////////////////////////
490int bncCaster::myWrite(QTcpSocket* sock, const char* buf, int bufLen) {
491 sock->write(buf, bufLen);
492 for (int ii = 1; ii <= 10; ii++) {
493 if (sock->waitForBytesWritten(10)) { // wait 10 ms
494 return bufLen;
495 }
496 }
497 return -1;
498}
499
500//
501////////////////////////////////////////////////////////////////////////////
502void bncCaster::reopenOutFile() {
503
504 bncSettings settings;
505
506 QString outFileName = settings.value("outFile").toString();
507 if ( !outFileName.isEmpty() ) {
508 expandEnvVar(outFileName);
509 if (!_outFile || _outFile->fileName() != outFileName) {
510 delete _out;
511 delete _outFile;
512 _outFile = new QFile(outFileName);
513 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
514 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
515 }
516 else {
517 _outFile->open(QIODevice::WriteOnly);
518 }
519 _out = new QTextStream(_outFile);
520 _out->setRealNumberNotation(QTextStream::FixedNotation);
521 }
522 }
523 else {
524 delete _out; _out = 0;
525 delete _outFile; _outFile = 0;
526 }
527}
528
529// Output into the Miscellaneous socket
530////////////////////////////////////////////////////////////////////////////
531void bncCaster::slotNewRawData(QByteArray staID, QByteArray data) {
532 if (_miscSockets && (_miscMount == "ALL" || _miscMount == staID)) {
533 QMutableListIterator<QTcpSocket*> is(*_miscSockets);
534 while (is.hasNext()) {
535 QTcpSocket* sock = is.next();
536 if (sock->state() == QAbstractSocket::ConnectedState) {
537 sock->write(data);
538 }
539 else if (sock->state() != QAbstractSocket::ConnectingState) {
540 delete sock;
541 is.remove();
542 }
543 }
544 }
545}
546
547// New Connection
548////////////////////////////////////////////////////////////////////////////
549void bncCaster::slotNewMiscConnection() {
550 _miscSockets->push_back( _miscServer->nextPendingConnection() );
551 emit( newMessage(QString("New client connection on Miscellaneous Output Port: # %1")
552 .arg(_miscSockets->size()).toAscii(), true) );
553}
Note: See TracBrowser for help on using the repository browser.