source: ntrip/trunk/BNC/src/bnccaster.cpp @ 8921

Last change on this file since 8921 was 8921, checked in by stuerze, 5 months ago

minor changes

File size: 17.1 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class:      bncCaster
30 *
31 * Purpose:    buffers and disseminates the data
32 *
33 * Author:     L. Mervart
34 *
35 * Created:    24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <math.h>
42#include <unistd.h>
43#include <iostream>
44#include <iomanip>
45#include <sstream>
46
47#include "bnccaster.h"
48#include "bncrinex.h"
49#include "bnccore.h"
50#include "bncgetthread.h"
51#include "bncutils.h"
52#include "bncsettings.h"
53
54using namespace std;
55
56// Constructor
57////////////////////////////////////////////////////////////////////////////
58bncCaster::bncCaster() {
59
60  bncSettings settings;
61
62  connect(this, SIGNAL(newMessage(QByteArray,bool)),
63          BNC_CORE, SLOT(slotMessage(const QByteArray,bool)));
64
65  _outFile = 0;
66  _out     = 0;
67  reopenOutFile();
68
69  int port = settings.value("outPort").toInt();
70
71  if (port != 0) {
72    _server = new QTcpServer;
73    _server->setProxy(QNetworkProxy::NoProxy);
74    if ( !_server->listen(QHostAddress::Any, port) ) {
75      QString message = "bncCaster: Cannot listen on sync port "
76                      + QByteArray::number(port) + ": "
77                      + _server->errorString();
78      emit newMessage(message.toLatin1(), true);
79    }
80    connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
81    _sockets = new QList<QTcpSocket*>;
82  }
83  else {
84    _server  = 0;
85    _sockets = 0;
86  }
87
88  int uPort = settings.value("outUPort").toInt();
89  if (uPort != 0) {
90    _uServer = new QTcpServer;
91    _uServer->setProxy(QNetworkProxy::NoProxy);
92    if ( !_uServer->listen(QHostAddress::Any, uPort) ) {
93      QString message = "bncCaster: Cannot listen on usync port "
94                      + QByteArray::number(uPort) + ": "
95                      + _uServer->errorString();
96      emit newMessage(message.toLatin1(), true);
97    }
98    connect(_uServer, SIGNAL(newConnection()), this, SLOT(slotNewUConnection()));
99    _uSockets = new QList<QTcpSocket*>;
100  }
101  else {
102    _uServer  = 0;
103    _uSockets = 0;
104  }
105
106  _outLockTime = settings.value("outLockTime",false).toBool();
107  _samplingRateMult10 = int(settings.value("outSampl").toString().split("sec").first().toDouble() * 10.0);
108  _outWait            = settings.value("outWait").toInt();
109  if (_outWait < 1) {
110    _outWait = 1;
111  }
112  _confInterval = -1;
113
114  // Miscellaneous output port
115  // -------------------------
116  _miscMount = settings.value("miscMount").toString();
117  _miscPort  = settings.value("miscPort").toInt();
118  if (!_miscMount.isEmpty() && _miscPort != 0) {
119    _miscServer = new QTcpServer;
120    _miscServer->setProxy(QNetworkProxy::NoProxy);
121    if ( !_miscServer->listen(QHostAddress::Any, _miscPort) ) {
122      QString message = "bncCaster: Cannot listen on miscellaneous output port "
123                      + QByteArray::number(_miscPort) + ": "
124                      + _miscServer->errorString();
125      emit newMessage(message.toLatin1(), true);
126    }
127    connect(_miscServer, SIGNAL(newConnection()), this, SLOT(slotNewMiscConnection()));
128    _miscSockets = new QList<QTcpSocket*>;
129  }
130  else {
131    _miscServer  = 0;
132    _miscSockets = 0;
133  }
134}
135
136// Destructor
137////////////////////////////////////////////////////////////////////////////
138bncCaster::~bncCaster() {
139
140  QMutexLocker locker(&_mutex);
141
142  QListIterator<bncGetThread*> it(_threads);
143  while(it.hasNext()){
144    bncGetThread* thread = it.next();
145    disconnect(thread, 0, 0, 0);
146    _staIDs.removeAll(thread->staID());
147    _threads.removeAll(thread);
148    thread->terminate();
149  }
150  delete _out;
151  delete _outFile;
152  delete _server;
153  delete _sockets;
154  delete _uServer;
155  delete _uSockets;
156  delete _miscServer;
157  delete _miscSockets;
158}
159
160// New Observations
161////////////////////////////////////////////////////////////////////////////
162void bncCaster::slotNewObs(const QByteArray staID, QList<t_satObs> obsList) {
163
164  QMutexLocker locker(&_mutex);
165
166  reopenOutFile();
167
168  unsigned index = 0;
169  QMutableListIterator<t_satObs> it(obsList);
170  while (it.hasNext()) {
171    ++index;
172    t_satObs& obs = it.next();
173
174    // Rename the Station
175    // ------------------
176    obs._staID = staID.data();
177
178    // Output into the socket
179    // ----------------------
180    if (_uSockets) {
181
182      ostringstream oStr;
183      oStr.setf(ios::showpoint | ios::fixed);
184      oStr << obs._staID                                        << " "
185           << setw(4)  << obs._time.gpsw()                      << " "
186           << setw(14) << setprecision(7) << obs._time.gpssec() << " "
187           << bncRinex::asciiSatLine(obs,_outLockTime) << endl;
188
189      string hlpStr = oStr.str();
190
191      QMutableListIterator<QTcpSocket*> is(*_uSockets);
192      while (is.hasNext()) {
193        QTcpSocket* sock = is.next();
194        if (sock->state() == QAbstractSocket::ConnectedState) {
195          int numBytes = hlpStr.length();
196          if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
197            delete sock;
198            is.remove();
199          }
200        }
201        else if (sock->state() != QAbstractSocket::ConnectingState) {
202          delete sock;
203          is.remove();
204        }
205      }
206    }
207
208    // First time: set the _lastDumpTime
209    // ---------------------------------
210    if (!_lastDumpTime.valid()) {
211      _lastDumpTime = obs._time - 1.0;
212    }
213
214    // An old observation - throw it away
215    // ----------------------------------
216    if (obs._time <= _lastDumpTime) {
217      if (index == 1) {
218        bncSettings settings;
219        if ( !settings.value("outFile").toString().isEmpty() ||
220             !settings.value("outPort").toString().isEmpty() ) {
221          emit( newMessage(QString("%1: Old or erroneous observation epoch %2 thrown away from %3")
222                           .arg(staID.data())
223                           .arg(string(obs._time).c_str())
224                           .arg(obs._prn.toString().c_str())
225                             .toLatin1(), true) );
226        }
227      }
228      continue;
229    }
230
231    // Save the observation
232    // --------------------
233    _epochs[obs._time].append(obs);
234
235    // Dump Epochs
236    // -----------
237    double outWait = (double)_outWait;
238    if ((obs._time - outWait) > _lastDumpTime) {
239      dumpEpochs(obs._time - outWait);
240      _lastDumpTime = obs._time - outWait;
241    }
242  }
243}
244
245// New Connection
246////////////////////////////////////////////////////////////////////////////
247void bncCaster::slotNewConnection() {
248  _sockets->push_back( _server->nextPendingConnection() );
249  emit( newMessage(QString("New client connection on sync port: # %1")
250                   .arg(_sockets->size()).toLatin1(), true) );
251}
252
253void bncCaster::slotNewUConnection() {
254  _uSockets->push_back( _uServer->nextPendingConnection() );
255  emit( newMessage(QString("New client connection on usync port: # %1")
256                   .arg(_uSockets->size()).toLatin1(), true) );
257}
258
259// Add New Thread
260////////////////////////////////////////////////////////////////////////////
261void bncCaster::addGetThread(bncGetThread* getThread, bool noNewThread) {
262
263  qRegisterMetaType<t_satObs>("t_satObs");
264  qRegisterMetaType< QList<t_satObs> >("QList<t_satObs>");
265
266  connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
267          this,      SLOT(slotNewObs(QByteArray, QList<t_satObs>)));
268
269  connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
270          this,      SIGNAL(newObs(QByteArray, QList<t_satObs>)));
271
272  connect(getThread, SIGNAL(newRawData(QByteArray, QByteArray)),
273          this,      SLOT(slotNewRawData(QByteArray, QByteArray)));
274
275  connect(getThread, SIGNAL(getThreadFinished(QByteArray)),
276          this, SLOT(slotGetThreadFinished(QByteArray)));
277
278  _staIDs.push_back(getThread->staID());
279  _threads.push_back(getThread);
280
281  if (noNewThread) {
282    getThread->run();
283  }
284  else {
285    getThread->start();
286  }
287}
288
289// Get Thread destroyed
290////////////////////////////////////////////////////////////////////////////
291void bncCaster::slotGetThreadFinished(QByteArray staID) {
292  QMutexLocker locker(&_mutex);
293
294  QListIterator<bncGetThread*> it(_threads);
295  while (it.hasNext()) {
296    bncGetThread* thread = it.next();
297    if (thread->staID() == staID) {
298      _threads.removeOne(thread);
299    }
300  }
301
302  _staIDs.removeAll(staID);
303  emit( newMessage(
304           QString("Decoding %1 stream(s)").arg(_staIDs.size()).toLatin1(), true) );
305  if (_staIDs.size() == 0) {
306    emit(newMessage("bncCaster: Last get thread terminated", true));
307    emit getThreadsFinished();
308  }
309}
310
311// Dump Complete Epochs
312////////////////////////////////////////////////////////////////////////////
313void bncCaster::dumpEpochs(const bncTime& maxTime) {
314
315  QMutableMapIterator<bncTime, QList<t_satObs> > itEpo(_epochs);
316  while (itEpo.hasNext()) {
317    itEpo.next();
318    const bncTime& epoTime = itEpo.key();
319    if (epoTime <= maxTime) {
320      const QList<t_satObs>& allObs = itEpo.value();
321      int sec = int(nint(epoTime.gpssec()*10));
322      if ( (_out || _sockets) && (sec % (_samplingRateMult10) == 0) ) {
323        QListIterator<t_satObs> it(allObs);
324        bool firstObs = true;
325        while (it.hasNext()) {
326          const t_satObs& obs = it.next();
327
328          ostringstream oStr;
329          oStr.setf(ios::showpoint | ios::fixed);
330          if (firstObs) {
331            firstObs = false;
332            oStr << "> " << obs._time.gpsw() << ' '
333                 << setprecision(7) << obs._time.gpssec() << endl;
334          }
335          oStr << obs._staID << ' '
336               << bncRinex::asciiSatLine(obs,_outLockTime) << endl;
337          if (!it.hasNext()) {
338            oStr << endl;
339          }
340          string hlpStr = oStr.str();
341
342          // Output into the File
343          // --------------------
344          if (_out) {
345            *_out << hlpStr.c_str();
346            _out->flush();
347          }
348
349          // Output into the socket
350          // ----------------------
351          if (_sockets) {
352            QMutableListIterator<QTcpSocket*> is(*_sockets);
353            while (is.hasNext()) {
354              QTcpSocket* sock = is.next();
355              if (sock->state() == QAbstractSocket::ConnectedState) {
356                int numBytes = hlpStr.length();
357                if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
358                  delete sock;
359                  is.remove();
360                }
361              }
362              else if (sock->state() != QAbstractSocket::ConnectingState) {
363                delete sock;
364                is.remove();
365              }
366            }
367          }
368        }
369      }
370      _epochs.remove(epoTime);
371    }
372  }
373}
374
375// Reread configuration (private slot)
376////////////////////////////////////////////////////////////////////////////
377void bncCaster::slotReadMountPoints() {
378
379  bncSettings settings;
380  settings.reRead();
381
382  readMountPoints();
383}
384
385// Read Mountpoints
386////////////////////////////////////////////////////////////////////////////
387void bncCaster::readMountPoints() {
388
389  bncSettings settings;
390
391  // Reread several options
392  // ----------------------
393  _samplingRateMult10 = int(settings.value("outSampl").toString().split("sec").first().toDouble() * 10.0);
394  _outWait      = settings.value("outWait").toInt();
395  if (_outWait < 1) {
396    _outWait = 1;
397  }
398
399  // Add new mountpoints
400  // -------------------
401  int iMount = -1;
402  QListIterator<QString> it(settings.value("mountPoints").toStringList());
403  while (it.hasNext()) {
404    ++iMount;
405    QStringList hlp = it.next().split(" ");
406    if (hlp.size() < 7) continue;
407    QUrl url(hlp[0]);
408
409    // Does it already exist?
410    // ----------------------
411    bool existFlg = false;
412    QListIterator<bncGetThread*> iTh(_threads);
413    while (iTh.hasNext()) {
414      bncGetThread* thread = iTh.next();
415      if (thread->mountPoint() == url) {
416        existFlg = true;
417        break;
418      }
419    }
420
421    // New bncGetThread
422    // ----------------
423    if (!existFlg) {
424      QByteArray format    = hlp[1].toLatin1();
425      QByteArray latitude  = hlp[3].toLatin1();
426      QByteArray longitude = hlp[4].toLatin1();
427      QByteArray nmea      = hlp[5].toLatin1();
428      QByteArray ntripVersion = hlp[6].toLatin1();
429
430      bncGetThread* getThread = new bncGetThread(url, format, latitude,
431                                        longitude, nmea, ntripVersion);
432      addGetThread(getThread);
433
434    }
435  }
436
437  // Remove mountpoints
438  // ------------------
439  QListIterator<bncGetThread*> iTh(_threads);
440  while (iTh.hasNext()) {
441    bncGetThread* thread = iTh.next();
442
443    bool existFlg = false;
444    QListIterator<QString> it(settings.value("mountPoints").toStringList());
445    while (it.hasNext()) {
446      QStringList hlp = it.next().split(" ");
447      if (hlp.size() <= 1) continue;
448      QUrl url(hlp[0]);
449
450      if (thread->mountPoint() == url) {
451        existFlg = true;
452        break;
453      }
454    }
455
456    if (!existFlg) {
457      disconnect(thread, 0, 0, 0);
458      _staIDs.removeAll(thread->staID());
459      _threads.removeAll(thread);
460      thread->terminate();
461    }
462  }
463
464  emit mountPointsRead(_threads);
465  emit( newMessage(QString("Configuration read: "
466                           + BNC_CORE->confFileName()
467                           + ", %1 stream(s)")
468                            .arg(_threads.count()).toLatin1(), true) );
469
470  // (Re-) Start the configuration timer
471  // -----------------------------------
472  if      (settings.value("onTheFlyInterval").toString() == "no") {
473    return;
474  }
475
476  int ms = 0;
477  if (_confInterval != -1) {
478    ms = 1000 * _confInterval;
479  }
480  else {
481    QTime currTime = currentDateAndTimeGPS().time();
482    QTime nextShotTime;
483    if      (settings.value("onTheFlyInterval").toString() == "1 min") {
484          _confInterval = 60;
485          nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
486    }
487    else if (settings.value("onTheFlyInterval").toString() == "5 min") {
488      _confInterval = 300;
489      nextShotTime = QTime(currTime.hour(), currTime.minute()+5, 0);
490    }
491    else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
492      _confInterval = 3600;
493      nextShotTime = QTime(currTime.hour()+1, 0, 0);
494    }
495    else {
496      _confInterval = 86400;
497      nextShotTime = QTime(23, 59, 59, 999);
498    }
499
500    ms = currTime.msecsTo(nextShotTime);
501    if (ms < 30000) {
502      ms = 30000;
503    }
504  }
505
506  QTimer::singleShot(ms, this, SLOT(slotReadMountPoints()));
507}
508
509//
510////////////////////////////////////////////////////////////////////////////
511int bncCaster::myWrite(QTcpSocket* sock, const char* buf, int bufLen) {
512  sock->write(buf, bufLen);
513  for (int ii = 1; ii <= 10; ii++) {
514    if (sock->waitForBytesWritten(10)) {  // wait 10 ms
515      return bufLen;
516    }
517  }
518  return -1;
519}
520
521//
522////////////////////////////////////////////////////////////////////////////
523void bncCaster::reopenOutFile() {
524
525  bncSettings settings;
526
527  QString outFileName = settings.value("outFile").toString();
528  if ( !outFileName.isEmpty() ) {
529    expandEnvVar(outFileName);
530    if (!_outFile || _outFile->fileName() != outFileName) {
531      delete _out;
532      delete _outFile;
533      _outFile = new QFile(outFileName);
534      if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
535        _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
536      }
537      else {
538        _outFile->open(QIODevice::WriteOnly);
539      }
540      _out = new QTextStream(_outFile);
541      _out->setRealNumberNotation(QTextStream::FixedNotation);
542    }
543  }
544  else {
545    delete _out;     _out     = 0;
546    delete _outFile; _outFile = 0;
547  }
548}
549
550// Output into the Miscellaneous socket
551////////////////////////////////////////////////////////////////////////////
552void bncCaster::slotNewRawData(QByteArray staID, QByteArray data) {
553  if (_miscSockets && (_miscMount == "ALL" || _miscMount == staID)) {
554    QMutableListIterator<QTcpSocket*> is(*_miscSockets);
555    while (is.hasNext()) {
556      QTcpSocket* sock = is.next();
557      if (sock->state() == QAbstractSocket::ConnectedState) {
558        sock->write(data);
559      }
560      else if (sock->state() != QAbstractSocket::ConnectingState) {
561        delete sock;
562        is.remove();
563      }
564    }
565  }
566}
567
568// New Connection
569////////////////////////////////////////////////////////////////////////////
570void bncCaster::slotNewMiscConnection() {
571  _miscSockets->push_back( _miscServer->nextPendingConnection() );
572  emit( newMessage(QString("New client connection on Miscellaneous Output Port: # %1")
573                   .arg(_miscSockets->size()).toLatin1(), true) );
574}
Note: See TracBrowser for help on using the repository browser.