source: ntrip/trunk/BNC/bnccaster.cpp@ 1176

Last change on this file since 1176 was 1176, checked in by mervart, 16 years ago

* empty log message *

File size: 12.9 KB
RevLine 
[280]1// Part of BNC, a utility for retrieving decoding and
[464]2// converting GNSS data streams from NTRIP broadcasters.
[280]3//
[464]4// Copyright (C) 2007
[280]5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
[464]7// Czech Technical University Prague, Department of Geodesy
[280]8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
[35]24
25/* -------------------------------------------------------------------------
[93]26 * BKG NTRIP Client
[35]27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
[637]41#include <iostream>
[222]42#include <math.h>
[636]43#include <unistd.h>
[222]44
[35]45#include "bnccaster.h"
[1170]46#include "bncapp.h"
[35]47#include "bncgetthread.h"
[134]48#include "bncutils.h"
[207]49#include "RTCM/GPSDecoder.h"
[35]50
[635]51
[35]52// Constructor
53////////////////////////////////////////////////////////////////////////////
54bncCaster::bncCaster(const QString& outFileName, int port) {
55
[275]56 QSettings settings;
57
[35]58 if ( !outFileName.isEmpty() ) {
[134]59 QString lName = outFileName;
60 expandEnvVar(lName);
61 _outFile = new QFile(lName);
[275]62 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
63 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
64 }
65 else {
66 _outFile->open(QIODevice::WriteOnly);
67 }
[35]68 _out = new QTextStream(_outFile);
69 _out->setRealNumberNotation(QTextStream::FixedNotation);
70 }
71 else {
72 _outFile = 0;
73 _out = 0;
74 }
75
76 _port = port;
77
78 if (_port != 0) {
79 _server = new QTcpServer;
80 _server->listen(QHostAddress::Any, _port);
81 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
82 _sockets = new QList<QTcpSocket*>;
83 }
84 else {
85 _server = 0;
86 _sockets = 0;
87 }
88
[622]89 _epochs = new QMultiMap<long, p_obs>;
[35]90
91 _lastDumpSec = 0;
[133]92
[1170]93 _confTimer = 0;
[1171]94
95 connect(this, SIGNAL(newMessage(QByteArray)),
96 (bncApp*) qApp, SLOT(slotMessage(const QByteArray)));
[35]97}
98
99// Destructor
100////////////////////////////////////////////////////////////////////////////
101bncCaster::~bncCaster() {
[186]102 QListIterator<bncGetThread*> it(_threads);
103 while(it.hasNext()){
104 bncGetThread* thread = it.next();
105 thread->terminate();
[605]106 thread->wait();
[230]107 delete thread;
[186]108 }
[35]109 delete _out;
110 delete _outFile;
[187]111 delete _server;
[631]112 delete _sockets;
[603]113 if (_epochs) {
[622]114 QListIterator<p_obs> it(_epochs->values());
[603]115 while (it.hasNext()) {
116 delete it.next();
117 }
118 delete _epochs;
119 }
[35]120}
121
122// New Observations
123////////////////////////////////////////////////////////////////////////////
[622]124void bncCaster::newObs(const QByteArray staID, bool firstObs, p_obs obs) {
[35]125
[243]126 QMutexLocker locker(&_mutex);
127
[624]128 obs->_status = t_obs::received;
129
[622]130 long iSec = long(floor(obs->_o.GPSWeeks+0.5));
131 long newTime = obs->_o.GPSWeek * 7*24*3600 + iSec;
[35]132
[160]133 // Rename the Station
134 // ------------------
[622]135 strncpy(obs->_o.StatID, staID.constData(),sizeof(obs->_o.StatID));
136 obs->_o.StatID[sizeof(obs->_o.StatID)-1] = '\0';
[160]137
[35]138 // First time, set the _lastDumpSec immediately
139 // --------------------------------------------
140 if (_lastDumpSec == 0) {
[462]141 _lastDumpSec = newTime - 1;
[35]142 }
143
144 // An old observation - throw it away
145 // ----------------------------------
[462]146 if (newTime <= _lastDumpSec) {
[257]147 if (firstObs) {
148 QSettings settings;
149 if ( !settings.value("outFile").toString().isEmpty() ||
150 !settings.value("outPort").toString().isEmpty() ) {
[258]151 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
152 .arg(staID.data()).arg(iSec).toAscii()) );
[257]153 }
[181]154 }
[35]155 delete obs;
[350]156 return;
[35]157 }
158
[160]159 // Save the observation
160 // --------------------
[462]161 _epochs->insert(newTime, obs);
[393]162
[462]163 // Dump Epochs
164 // -----------
165 if (newTime - _waitTime > _lastDumpSec) {
166 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
167 _lastDumpSec = newTime - _waitTime;
[252]168 }
[35]169}
170
171// New Connection
172////////////////////////////////////////////////////////////////////////////
173void bncCaster::slotNewConnection() {
174 _sockets->push_back( _server->nextPendingConnection() );
[630]175 emit( newMessage(QString("New Connection # %1")
176 .arg(_sockets->size()).toAscii()) );
[35]177}
178
179// Add New Thread
180////////////////////////////////////////////////////////////////////////////
181void bncCaster::addGetThread(bncGetThread* getThread) {
[624]182
183 qRegisterMetaType<p_obs>("p_obs");
184
[628]185 connect(getThread, SIGNAL(newObs(QByteArray, bool, p_obs)),
186 this, SLOT(newObs(QByteArray, bool, p_obs)));
[463]187
[628]188 connect(getThread, SIGNAL(error(QByteArray)),
189 this, SLOT(slotGetThreadError(QByteArray)));
[35]190
[88]191 _staIDs.push_back(getThread->staID());
[186]192 _threads.push_back(getThread);
[1170]193
194 getThread->start();
[1173]195
196 emit newGetThread(getThread);
[35]197}
198
199// Error in get thread
200////////////////////////////////////////////////////////////////////////////
[628]201void bncCaster::slotGetThreadError(QByteArray staID) {
[243]202 QMutexLocker locker(&_mutex);
[88]203 _staIDs.removeAll(staID);
[82]204 emit( newMessage(
[88]205 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
206 if (_staIDs.size() == 0) {
[82]207 emit(newMessage("bncCaster:: last get thread terminated"));
[35]208 emit getThreadErrors();
209 }
210}
211
212// Dump Complete Epochs
213////////////////////////////////////////////////////////////////////////////
214void bncCaster::dumpEpochs(long minTime, long maxTime) {
215
216 const char begEpoch = 'A';
217 const char begObs = 'B';
218 const char endEpoch = 'C';
219
220 for (long sec = minTime; sec <= maxTime; sec++) {
[140]221
[35]222 bool first = true;
[622]223 QList<p_obs> allObs = _epochs->values(sec);
224 QListIterator<p_obs> it(allObs);
[35]225 while (it.hasNext()) {
[622]226 p_obs obs = it.next();
[35]227
[140]228 if (_samplingRate == 0 || sec % _samplingRate == 0) {
229
230 // Output into the file
231 // --------------------
232 if (_out) {
[35]233 if (first) {
[341]234 _out->setFieldWidth(1); *_out << begEpoch << endl;;
[35]235 }
[622]236 _out->setFieldWidth(0); *_out << obs->_o.StatID;
237 _out->setFieldWidth(1); *_out << " " << obs->_o.satSys;
[344]238 _out->setPadChar('0');
[622]239 _out->setFieldWidth(2); *_out << obs->_o.satNum;
[344]240 _out->setPadChar(' ');
[342]241 _out->setFieldWidth(1); *_out << " ";
[622]242 _out->setFieldWidth(4); *_out << obs->_o.GPSWeek;
[342]243 _out->setFieldWidth(1); *_out << " ";
[622]244 _out->setFieldWidth(14); _out->setRealNumberPrecision(7); *_out << obs->_o.GPSWeeks;
[342]245 _out->setFieldWidth(1); *_out << " ";
[622]246 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C1;
[342]247 _out->setFieldWidth(1); *_out << " ";
[622]248 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C2;
[366]249 _out->setFieldWidth(1); *_out << " ";
[622]250 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P1;
[342]251 _out->setFieldWidth(1); *_out << " ";
[622]252 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P2;
[342]253 _out->setFieldWidth(1); *_out << " ";
[622]254 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L1;
[342]255 _out->setFieldWidth(1); *_out << " ";
[622]256 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L2;
[367]257 _out->setFieldWidth(1); *_out << " ";
[622]258 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S1;
[367]259 _out->setFieldWidth(1); *_out << " ";
[622]260 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S2;
[342]261 _out->setFieldWidth(1);
[622]262 *_out << " " << obs->_o.SNR1 << " " << obs->_o.SNR2 << endl;
[35]263 if (!it.hasNext()) {
[341]264 _out->setFieldWidth(1); *_out << endEpoch << endl;
[35]265 }
[347]266 _out->flush();
[35]267 }
[140]268
269 // Output into the socket
270 // ----------------------
271 if (_sockets) {
[622]272 int numBytes = sizeof(obs->_o);
[637]273 QMutableListIterator<QTcpSocket*> is(*_sockets);
[140]274 while (is.hasNext()) {
275 QTcpSocket* sock = is.next();
[397]276 if (sock->state() == QAbstractSocket::ConnectedState) {
[637]277 bool ok = true;
[635]278 int fd = sock->socketDescriptor();
[397]279 if (first) {
[637]280 if (::write(fd, &begEpoch, 1) != 1) {
281 ok = false;
282 }
[397]283 }
[637]284 if (::write(fd, &begObs, 1) != 1) {
285 ok = false;
286 }
287 if (::write(fd, &obs->_o, numBytes) != numBytes) {
288 ok = false;
289 }
[397]290 if (!it.hasNext()) {
[637]291 if (::write(fd, &endEpoch, 1) != 1) {
292 ok = false;
293 }
[397]294 }
[637]295 if (!ok) {
296 delete sock;
297 is.remove();
298 }
[140]299 }
[637]300 else if (sock->state() != QAbstractSocket::ConnectingState) {
301 delete sock;
302 is.remove();
303 }
[140]304 }
305 }
[73]306 }
307
[35]308 delete obs;
309 _epochs->remove(sec);
310 first = false;
311 }
312 }
313}
[1170]314
315// Reread configuration
316////////////////////////////////////////////////////////////////////////////
317void bncCaster::slotReadMountpoints() {
318
319 QSettings settings;
320
321 // Reread several options
322 // ----------------------
323 _samplingRate = settings.value("binSampl").toInt();
324 _waitTime = settings.value("waitTime").toInt();
325 if (_waitTime < 1) {
326 _waitTime = 1;
327 }
328
329 // Add new mountpoints
330 // -------------------
331 int iMount = -1;
332 QListIterator<QString> it(settings.value("mountPoints").toStringList());
333 while (it.hasNext()) {
334 ++iMount;
335 QStringList hlp = it.next().split(" ");
336 if (hlp.size() <= 1) continue;
337 QUrl url(hlp[0]);
338
339 // Does it already exist?
340 // ----------------------
341 bool existFlg = false;
342 QListIterator<bncGetThread*> iTh(_threads);
343 while (iTh.hasNext()) {
344 bncGetThread* thread = iTh.next();
345 if (thread->mountPoint() == url) {
346 existFlg = true;
347 break;
348 }
349 }
350
351 // New bncGetThread
352 // ----------------
353 if (!existFlg) {
354 QByteArray format = hlp[1].toAscii();
355 QByteArray latitude = hlp[2].toAscii();
356 QByteArray longitude = hlp[3].toAscii();
357 QByteArray nmea = hlp[4].toAscii();
358
359 bncGetThread* getThread = new bncGetThread(url, format, latitude,
360 longitude, nmea, iMount);
361
362 bncApp* app = (bncApp*) qApp;
363 app->connect(getThread, SIGNAL(newMessage(QByteArray)),
364 app, SLOT(slotMessage(const QByteArray)));
365 addGetThread(getThread);
366 }
367 }
368
369 // Remove mountpoints
370 // ------------------
371 QListIterator<bncGetThread*> iTh(_threads);
372 while (iTh.hasNext()) {
373 bncGetThread* thread = iTh.next();
374
375 bool existFlg = false;
376 QListIterator<QString> it(settings.value("mountPoints").toStringList());
377 while (it.hasNext()) {
378 QStringList hlp = it.next().split(" ");
379 if (hlp.size() <= 1) continue;
380 QUrl url(hlp[0]);
381
382 if (thread->mountPoint() == url) {
383 existFlg = true;
384 break;
385 }
386 }
387
388 if (!existFlg) {
389 disconnect(thread, 0, 0, 0);
390 _staIDs.removeAll(thread->staID());
391 _threads.removeAll(thread);
392 thread->terminate();
393 thread->wait();
394 delete thread;
395 }
396 }
397
[1176]398 emit( newMessage(QString("Table of Mountpoints (re-)read: new number = %1")
399 .arg(_threads.count()).toAscii()) );
400
[1170]401 // (Re-) Start the configuration timer
402 // -----------------------------------
403 int ms = 0;
404
405 if (_confTimer) {
406 ms = 1000 * _confInterval;
407 }
408 else {
409 _confTimer = new QTimer();
410 connect(_confTimer, SIGNAL(timeout()), this, SLOT(slotReadMountpoints()));
411
412 QTime currTime = currentDateAndTimeGPS().time();
413 QTime nextShotTime;
414
415 if (settings.value("onTheFlyInterval").toString() == "1 min") {
416 _confInterval = 60;
417 nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
418 }
419 else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
420 _confInterval = 3600;
421 nextShotTime = QTime(currTime.hour()+1, 0, 0);
422 }
423 else {
424 _confInterval = 86400;
425 nextShotTime = QTime(23, 59, 59, 999);
426 }
427
428 ms = currTime.msecsTo(nextShotTime);
[1176]429 if (ms < 30000) {
430 ms = 30000;
431 }
[1170]432 }
433
434 _confTimer->start(ms);
435}
Note: See TracBrowser for help on using the repository browser.