source: ntrip/branches/BNC_2.12/src/bnccaster.cpp @ 9190

Last change on this file since 9190 was 9190, checked in by stuerze, 3 months ago

small bug fixed

File size: 17.6 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class:      bncCaster
30 *
31 * Purpose:    buffers and disseminates the data
32 *
33 * Author:     L. Mervart
34 *
35 * Created:    24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <math.h>
42#include <unistd.h>
43#include <iostream>
44#include <iomanip>
45#include <sstream>
46
47#include "bnccaster.h"
48#include "bncrinex.h"
49#include "bnccore.h"
50#include "bncgetthread.h"
51#include "bncutils.h"
52#include "bncsettings.h"
53
54using namespace std;
55
56// Constructor
57////////////////////////////////////////////////////////////////////////////
58bncCaster::bncCaster() {
59
60  bncSettings settings;
61
62  connect(this, SIGNAL(newMessage(QByteArray,bool)),
63          BNC_CORE, SLOT(slotMessage(const QByteArray,bool)));
64
65  _outFile = 0;
66  _out     = 0;
67  reopenOutFile();
68
69  int port = settings.value("outPort").toInt();
70
71  if (port != 0) {
72    _server = new QTcpServer;
73    if ( !_server->listen(QHostAddress::Any, port) ) {
74      emit newMessage("bncCaster: Cannot listen on sync port", true);
75    }
76    connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
77    _sockets = new QList<QTcpSocket*>;
78  }
79  else {
80    _server  = 0;
81    _sockets = 0;
82  }
83
84  int uPort = settings.value("outUPort").toInt();
85  if (uPort != 0) {
86    _uServer = new QTcpServer;
87    if ( !_uServer->listen(QHostAddress::Any, uPort) ) {
88      emit newMessage("bncCaster: Cannot listen on usync port", true);
89    }
90    connect(_uServer, SIGNAL(newConnection()), this, SLOT(slotNewUConnection()));
91    _uSockets = new QList<QTcpSocket*>;
92  }
93  else {
94    _uServer  = 0;
95    _uSockets = 0;
96  }
97
98  _outLockTime = settings.value("outLockTime",false).toBool();
99  _samplingRateMult10 = int(settings.value("outSampl").toString().split("sec").first().toDouble() * 10.0);
100  _outWait      = settings.value("outWait").toDouble();
101  if (_outWait <= 0.0) {
102    _outWait = 0.01;
103  }
104  _confInterval = -1;
105
106  // Miscellaneous output port
107  // -------------------------
108  _miscMount = settings.value("miscMount").toString();
109  _miscPort  = settings.value("miscPort").toInt();
110  if (!_miscMount.isEmpty() && _miscPort != 0) {
111    _miscServer = new QTcpServer;
112    if ( !_miscServer->listen(QHostAddress::Any, _miscPort) ) {
113      emit newMessage("bncCaster: Cannot listen on Miscellaneous Output Port", true);
114    }
115    connect(_miscServer, SIGNAL(newConnection()), this, SLOT(slotNewMiscConnection()));
116    _miscSockets = new QList<QTcpSocket*>;
117  }
118  else {
119    _miscServer  = 0;
120    _miscSockets = 0;
121  }
122}
123
124// Destructor
125////////////////////////////////////////////////////////////////////////////
126bncCaster::~bncCaster() {
127
128  QMutexLocker locker(&_mutex);
129
130  QListIterator<bncGetThread*> it(_threads);
131  while(it.hasNext()){
132    bncGetThread* thread = it.next();
133    disconnect(thread, 0, 0, 0);
134    _staIDs.removeAll(thread->staID());
135    _threads.removeAll(thread);
136    thread->terminate();
137  }
138  delete _out;
139  delete _outFile;
140  delete _server;
141  delete _sockets;
142  delete _uServer;
143  delete _uSockets;
144  delete _miscServer;
145  delete _miscSockets;
146}
147
148// New Observations
149////////////////////////////////////////////////////////////////////////////
150void bncCaster::slotNewObs(const QByteArray staID, QList<t_satObs> obsList) {
151
152  QMutexLocker locker(&_mutex);
153
154  reopenOutFile();
155
156  unsigned index = 0;
157  QMutableListIterator<t_satObs> it(obsList);
158  while (it.hasNext()) {
159    ++index;
160    t_satObs& obs = it.next();
161
162    // Rename the Station
163    // ------------------
164    obs._staID = staID.data();
165
166    // Update/Set Slip Counters
167    // ------------------------
168    setSlipCounters(obs);
169
170    // Output into the socket
171    // ----------------------
172    if (_uSockets) {
173
174      ostringstream oStr;
175      oStr.setf(ios::showpoint | ios::fixed);
176      oStr << obs._staID                                        << " "
177           << setw(4)  << obs._time.gpsw()                      << " "
178           << setw(14) << setprecision(7) << obs._time.gpssec() << " "
179           << bncRinex::asciiSatLine(obs,_outLockTime) << endl;
180
181      string hlpStr = oStr.str();
182
183      QMutableListIterator<QTcpSocket*> is(*_uSockets);
184      while (is.hasNext()) {
185        QTcpSocket* sock = is.next();
186        if (sock->state() == QAbstractSocket::ConnectedState) {
187          int numBytes = hlpStr.length();
188          if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
189            delete sock;
190            is.remove();
191          }
192        }
193        else if (sock->state() != QAbstractSocket::ConnectingState) {
194          delete sock;
195          is.remove();
196        }
197      }
198    }
199
200    // First time: set the _lastDumpTime
201    // ---------------------------------
202    if (!_lastDumpTime.valid()) {
203      _lastDumpTime = obs._time - 1.0;
204    }
205
206    // An old observation - throw it away
207    // ----------------------------------
208    if (obs._time <= _lastDumpTime) {
209      if (index == 1) {
210        bncSettings settings;
211        if ( !settings.value("outFile").toString().isEmpty() ||
212             !settings.value("outPort").toString().isEmpty() ) {
213          emit( newMessage(QString("%1: Old or erroneous observation epoch %2 thrown away from %3")
214                           .arg(staID.data())
215                           .arg(string(obs._time).c_str())
216                           .arg(obs._prn.toString().c_str())
217                           .toAscii(), true) );
218        }
219      }
220      continue;
221    }
222
223    // Save the observation
224    // --------------------
225    _epochs[obs._time].append(obs);
226
227    // Dump Epochs
228    // -----------
229    if (obs._time - _outWait > _lastDumpTime) {
230      dumpEpochs(obs._time - _outWait);
231      _lastDumpTime = obs._time - _outWait;
232    }
233  }
234}
235
236// New Connection
237////////////////////////////////////////////////////////////////////////////
238void bncCaster::slotNewConnection() {
239  _sockets->push_back( _server->nextPendingConnection() );
240  emit( newMessage(QString("New client connection on sync port: # %1")
241                   .arg(_sockets->size()).toAscii(), true) );
242}
243
244void bncCaster::slotNewUConnection() {
245  _uSockets->push_back( _uServer->nextPendingConnection() );
246  emit( newMessage(QString("New client connection on usync port: # %1")
247                   .arg(_uSockets->size()).toAscii(), true) );
248}
249
250// Add New Thread
251////////////////////////////////////////////////////////////////////////////
252void bncCaster::addGetThread(bncGetThread* getThread, bool noNewThread) {
253
254  qRegisterMetaType<t_satObs>("t_satObs");
255  qRegisterMetaType< QList<t_satObs> >("QList<t_satObs>");
256
257  connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
258          this,      SLOT(slotNewObs(QByteArray, QList<t_satObs>)));
259
260  connect(getThread, SIGNAL(newObs(QByteArray, QList<t_satObs>)),
261          this,      SIGNAL(newObs(QByteArray, QList<t_satObs>)));
262
263  connect(getThread, SIGNAL(newRawData(QByteArray, QByteArray)),
264          this,      SLOT(slotNewRawData(QByteArray, QByteArray)));
265
266  connect(getThread, SIGNAL(getThreadFinished(QByteArray)),
267          this, SLOT(slotGetThreadFinished(QByteArray)));
268
269  _staIDs.push_back(getThread->staID());
270  _threads.push_back(getThread);
271
272  if (noNewThread) {
273    getThread->run();
274  }
275  else {
276    getThread->start();
277  }
278}
279
280// Get Thread destroyed
281////////////////////////////////////////////////////////////////////////////
282void bncCaster::slotGetThreadFinished(QByteArray staID) {
283  QMutexLocker locker(&_mutex);
284
285  QListIterator<bncGetThread*> it(_threads);
286  while (it.hasNext()) {
287    bncGetThread* thread = it.next();
288    if (thread->staID() == staID) {
289      _threads.removeOne(thread);
290    }
291  }
292
293  _staIDs.removeAll(staID);
294  emit( newMessage(
295           QString("Decoding %1 stream(s)").arg(_staIDs.size()).toAscii(), true) );
296  if (_staIDs.size() == 0) {
297    emit(newMessage("bncCaster: Last get thread terminated", true));
298    emit getThreadsFinished();
299  }
300}
301
302// Dump Complete Epochs
303////////////////////////////////////////////////////////////////////////////
304void bncCaster::dumpEpochs(const bncTime& maxTime) {
305
306  QMutableMapIterator<bncTime, QList<t_satObs> > itEpo(_epochs);
307  while (itEpo.hasNext()) {
308    itEpo.next();
309    const bncTime& epoTime = itEpo.key();
310    if (epoTime <= maxTime) {
311      const QList<t_satObs>& allObs = itEpo.value();
312      int sec = int(nint(epoTime.gpssec()*10));
313      if ( (_out || _sockets) && (sec % (_samplingRateMult10) == 0) ) {
314        QListIterator<t_satObs> it(allObs);
315        bool firstObs = true;
316        while (it.hasNext()) {
317          const t_satObs& obs = it.next();
318
319          ostringstream oStr;
320          oStr.setf(ios::showpoint | ios::fixed);
321          if (firstObs) {
322            firstObs = false;
323            oStr << "> " << obs._time.gpsw() << ' '
324                 << setprecision(7) << obs._time.gpssec() << endl;
325          }
326          oStr << obs._staID << ' '
327               << bncRinex::asciiSatLine(obs,_outLockTime) << endl;
328          if (!it.hasNext()) {
329            oStr << endl;
330          }
331          string hlpStr = oStr.str();
332
333          // Output into the File
334          // --------------------
335          if (_out) {
336            *_out << hlpStr.c_str();
337            _out->flush();
338          }
339
340          // Output into the socket
341          // ----------------------
342          if (_sockets) {
343            QMutableListIterator<QTcpSocket*> is(*_sockets);
344            while (is.hasNext()) {
345              QTcpSocket* sock = is.next();
346              if (sock->state() == QAbstractSocket::ConnectedState) {
347                int numBytes = hlpStr.length();
348                if (myWrite(sock, hlpStr.c_str(), numBytes) != numBytes) {
349                  delete sock;
350                  is.remove();
351                }
352              }
353              else if (sock->state() != QAbstractSocket::ConnectingState) {
354                delete sock;
355                is.remove();
356              }
357            }
358          }
359        }
360      }
361       //_epochs.remove(epoTime);
362      itEpo.remove();
363    }
364  }
365}
366
367// Reread configuration (private slot)
368////////////////////////////////////////////////////////////////////////////
369void bncCaster::slotReadMountPoints() {
370
371  bncSettings settings;
372  settings.reRead();
373
374  readMountPoints();
375}
376
377// Read Mountpoints
378////////////////////////////////////////////////////////////////////////////
379void bncCaster::readMountPoints() {
380
381  bncSettings settings;
382
383  // Reread several options
384  // ----------------------
385  _samplingRateMult10 = int(settings.value("outSampl").toString().split("sec").first().toDouble() * 10.0);
386  _outWait      = settings.value("outWait").toInt();
387  if (_outWait < 1) {
388    _outWait = 1;
389  }
390
391  // Add new mountpoints
392  // -------------------
393  int iMount = -1;
394  QListIterator<QString> it(settings.value("mountPoints").toStringList());
395  while (it.hasNext()) {
396    ++iMount;
397    QStringList hlp = it.next().split(" ");
398    if (hlp.size() < 7) continue;
399    QUrl url(hlp[0]);
400
401    // Does it already exist?
402    // ----------------------
403    bool existFlg = false;
404    QListIterator<bncGetThread*> iTh(_threads);
405    while (iTh.hasNext()) {
406      bncGetThread* thread = iTh.next();
407      if (thread->mountPoint() == url) {
408        existFlg = true;
409        break;
410      }
411    }
412
413    // New bncGetThread
414    // ----------------
415    if (!existFlg) {
416      QByteArray format    = hlp[1].toAscii();
417      QByteArray latitude  = hlp[3].toAscii();
418      QByteArray longitude = hlp[4].toAscii();
419      QByteArray nmea      = hlp[5].toAscii();
420      QByteArray ntripVersion = hlp[6].toAscii();
421
422      bncGetThread* getThread = new bncGetThread(url, format, latitude,
423                                        longitude, nmea, ntripVersion);
424      addGetThread(getThread);
425
426    }
427  }
428
429  // Remove mountpoints
430  // ------------------
431  QListIterator<bncGetThread*> iTh(_threads);
432  while (iTh.hasNext()) {
433    bncGetThread* thread = iTh.next();
434
435    bool existFlg = false;
436    QListIterator<QString> it(settings.value("mountPoints").toStringList());
437    while (it.hasNext()) {
438      QStringList hlp = it.next().split(" ");
439      if (hlp.size() <= 1) continue;
440      QUrl url(hlp[0]);
441
442      if (thread->mountPoint() == url) {
443        existFlg = true;
444        break;
445      }
446    }
447
448    if (!existFlg) {
449      disconnect(thread, 0, 0, 0);
450      _staIDs.removeAll(thread->staID());
451      _threads.removeAll(thread);
452      thread->terminate();
453    }
454  }
455
456  emit mountPointsRead(_threads);
457  emit( newMessage(QString("Configuration read: "
458                           + BNC_CORE->confFileName()
459                           + ", %1 stream(s)")
460                            .arg(_threads.count()).toAscii(), true) );
461
462  // (Re-) Start the configuration timer
463  // -----------------------------------
464    if      (settings.value("onTheFlyInterval").toString() == "no") {
465    return;
466  }
467
468  int ms = 0;
469  if (_confInterval != -1) {
470    ms = 1000 * _confInterval;
471  }
472  else {
473    QTime currTime = currentDateAndTimeGPS().time();
474    QTime nextShotTime;
475    if      (settings.value("onTheFlyInterval").toString() == "1 min") {
476      _confInterval = 60;
477      nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
478    }
479    else if (settings.value("onTheFlyInterval").toString() == "5 min") {
480      _confInterval = 300;
481      nextShotTime = QTime(currTime.hour(), currTime.minute()+5, 0);
482    }
483    else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
484      _confInterval = 3600;
485      nextShotTime = QTime(currTime.hour()+1, 0, 0);
486    }
487    else {
488      _confInterval = 86400;
489      nextShotTime = QTime(23, 59, 59, 999);
490    }
491
492    ms = currTime.msecsTo(nextShotTime);
493    if (ms < 30000) {
494      ms = 30000;
495    }
496  }
497
498  QTimer::singleShot(ms, this, SLOT(slotReadMountPoints()));
499}
500
501//
502////////////////////////////////////////////////////////////////////////////
503int bncCaster::myWrite(QTcpSocket* sock, const char* buf, int bufLen) {
504  sock->write(buf, bufLen);
505  for (int ii = 1; ii <= 10; ii++) {
506    if (sock->waitForBytesWritten(10)) {  // wait 10 ms
507      return bufLen;
508    }
509  }
510  return -1;
511}
512
513//
514////////////////////////////////////////////////////////////////////////////
515void bncCaster::reopenOutFile() {
516
517  bncSettings settings;
518
519  QString outFileName = settings.value("outFile").toString();
520  if ( !outFileName.isEmpty() ) {
521    expandEnvVar(outFileName);
522    if (!_outFile || _outFile->fileName() != outFileName) {
523      delete _out;
524      delete _outFile;
525      _outFile = new QFile(outFileName);
526      if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
527        _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
528      }
529      else {
530        _outFile->open(QIODevice::WriteOnly);
531      }
532      _out = new QTextStream(_outFile);
533      _out->setRealNumberNotation(QTextStream::FixedNotation);
534    }
535  }
536  else {
537    delete _out;     _out     = 0;
538    delete _outFile; _outFile = 0;
539  }
540}
541
542// Output into the Miscellaneous socket
543////////////////////////////////////////////////////////////////////////////
544void bncCaster::slotNewRawData(QByteArray staID, QByteArray data) {
545  if (_miscSockets && (_miscMount == "ALL" || _miscMount == staID)) {
546    QMutableListIterator<QTcpSocket*> is(*_miscSockets);
547    while (is.hasNext()) {
548      QTcpSocket* sock = is.next();
549      if (sock->state() == QAbstractSocket::ConnectedState) {
550        sock->write(data);
551      }
552      else if (sock->state() != QAbstractSocket::ConnectingState) {
553        delete sock;
554        is.remove();
555      }
556    }
557  }
558}
559
560// New Connection
561////////////////////////////////////////////////////////////////////////////
562void bncCaster::slotNewMiscConnection() {
563  _miscSockets->push_back( _miscServer->nextPendingConnection() );
564  emit( newMessage(QString("New client connection on Miscellaneous Output Port: # %1")
565                   .arg(_miscSockets->size()).toAscii(), true) );
566}
567
568
569// Set/Update Slip Counters
570////////////////////////////////////////////////////////////////////////////
571void bncCaster::setSlipCounters(t_satObs& obs) {
572
573  double minLockTime = -1.0;
574  for (unsigned ii = 0; ii < obs._obs.size(); ii++) {
575    const t_frqObs* frqObs = obs._obs[ii];
576    if (frqObs->_lockTimeValid) {
577      if (minLockTime == -1.0 || minLockTime < frqObs->_lockTime) {
578        minLockTime = frqObs->_lockTime;
579      }
580    }
581  }
582
583  if (minLockTime == -1.0) {
584    return;
585  }
586
587  QMap<t_prn, double>& ltMap = _lockTimeMap[obs._staID];
588  QMap<t_prn, int>&    jcMap = _jumpCounterMap[obs._staID];
589
590  QMap<t_prn, double>::const_iterator it = ltMap.find(obs._prn);
591  if (it == ltMap.end()) {
592    ltMap[obs._prn] = minLockTime;
593    jcMap[obs._prn] = 0;
594  }
595  else {
596    if (minLockTime < ltMap[obs._prn]) {
597      jcMap[obs._prn] += 1;
598      if (jcMap[obs._prn] > 9999) {
599        jcMap[obs._prn] = 0;
600      }
601    }
602    ltMap[obs._prn] = minLockTime;
603  }
604  for (unsigned ii = 0; ii < obs._obs.size(); ii++) {
605    t_frqObs* frqObs = obs._obs[ii];
606    frqObs->_slipCounter = jcMap[obs._prn];
607  }
608}
Note: See TracBrowser for help on using the repository browser.