source: ntrip/trunk/BNC/bnccaster.cpp@ 404

Last change on this file since 404 was 397, checked in by mervart, 19 years ago

* empty log message *

File size: 10.2 KB
RevLine 
[280]1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters,
3// written by Leos Mervart.
4//
5// Copyright (C) 2006
6// German Federal Agency for Cartography and Geodesy (BKG)
7// http://www.bkg.bund.de
8// Czech Technical University Prague, Department of Advanced Geodesy
9// http://www.fsv.cvut.cz
10//
11// Email: euref-ip@bkg.bund.de
12//
13// This program is free software; you can redistribute it and/or
14// modify it under the terms of the GNU General Public License
15// as published by the Free Software Foundation, version 2.
16//
17// This program is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU General Public License for more details.
21//
22// You should have received a copy of the GNU General Public License
23// along with this program; if not, write to the Free Software
24// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
[35]25
26/* -------------------------------------------------------------------------
[93]27 * BKG NTRIP Client
[35]28 * -------------------------------------------------------------------------
29 *
30 * Class: bncCaster
31 *
32 * Purpose: buffers and disseminates the data
33 *
34 * Author: L. Mervart
35 *
36 * Created: 24-Dec-2005
37 *
38 * Changes:
39 *
40 * -----------------------------------------------------------------------*/
41
[222]42#include <math.h>
43
[35]44#include "bnccaster.h"
45#include "bncgetthread.h"
[134]46#include "bncutils.h"
[207]47#include "RTCM/GPSDecoder.h"
[35]48
49// Constructor
50////////////////////////////////////////////////////////////////////////////
51bncCaster::bncCaster(const QString& outFileName, int port) {
52
[275]53 QSettings settings;
54
[35]55 if ( !outFileName.isEmpty() ) {
[134]56 QString lName = outFileName;
57 expandEnvVar(lName);
58 _outFile = new QFile(lName);
[275]59 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
60 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
61 }
62 else {
63 _outFile->open(QIODevice::WriteOnly);
64 }
[35]65 _out = new QTextStream(_outFile);
66 _out->setRealNumberNotation(QTextStream::FixedNotation);
67 }
68 else {
69 _outFile = 0;
70 _out = 0;
71 }
72
73 _port = port;
74
75 if (_port != 0) {
76 _server = new QTcpServer;
77 _server->listen(QHostAddress::Any, _port);
78 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
79 _sockets = new QList<QTcpSocket*>;
80 }
81 else {
82 _server = 0;
83 _sockets = 0;
84 }
85
86 _epochs = new QMultiMap<long, Observation*>;
87
88 _lastDumpSec = 0;
[133]89
90 _samplingRate = settings.value("rnxSampl").toInt();
[136]91 _waitTime = settings.value("waitTime").toInt();
[139]92 if (_waitTime < 1) {
93 _waitTime = 1;
[136]94 }
[382]95
[392]96 if ( settings.value("rnxPath").toString().isEmpty() ) {
97 _rinexWriters = 0;
98 }
99 else {
100 _rinexWriters = new QMap<QString, bncRinex*>;
101 }
102
[382]103 // Start dump epoch loop
104 // ---------------------
[393]105 _newObsRunning = false;
[382]106 _newTime = 0;
107 dumpEpochSlot();
[35]108}
109
110// Destructor
111////////////////////////////////////////////////////////////////////////////
112bncCaster::~bncCaster() {
[186]113 QListIterator<bncGetThread*> it(_threads);
114 while(it.hasNext()){
115 bncGetThread* thread = it.next();
116 thread->terminate();
[188]117 thread->wait();
[230]118 delete thread;
[186]119 }
[35]120 delete _out;
121 delete _outFile;
[187]122 delete _server;
123 delete _sockets;
124 delete _epochs;
[392]125 delete _rinexWriters;
[35]126}
127
[369]128// Reconnecting
129////////////////////////////////////////////////////////////////////////////
130void bncCaster::reconnecting(const QByteArray& staID) {
[387]131 QMutexLocker locker(&_mutex);
132
[392]133 if (_rinexWriters && _rinexWriters->find(staID) != _rinexWriters->end()) {
134 bncRinex* rnx = _rinexWriters->find(staID).value();
[369]135 rnx->setReconnectFlag(true);
136 }
137}
138
[35]139// New Observations
140////////////////////////////////////////////////////////////////////////////
[350]141void bncCaster::newObs(const QByteArray& staID, const QUrl& mountPoint,
142 bool firstObs, Observation* obs,
[366]143 const QByteArray& format,
144 const QByteArray& latitude,
145 const QByteArray& longitude,
146 const QByteArray& nmea) {
[35]147
[243]148 QMutexLocker locker(&_mutex);
[393]149 _newObsRunning = true;
[243]150
[222]151 long iSec = long(floor(obs->GPSWeeks+0.5));
[382]152 _newTime = obs->GPSWeek * 7*24*3600 + iSec;
[35]153
[160]154 // Rename the Station
155 // ------------------
156 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
[333]157 obs->StatID[sizeof(obs->StatID)-1] = '\0';
[160]158
[350]159 // Prepare RINEX Output
160 // --------------------
[392]161 if (_rinexWriters) {
162 if (_rinexWriters->find(obs->StatID) == _rinexWriters->end()) {
163 _rinexWriters->insert(obs->StatID, new bncRinex(obs->StatID, mountPoint,
164 format, latitude, longitude, nmea));
165 }
166 bncRinex* rnx = _rinexWriters->find(obs->StatID).value();
167 if (_samplingRate == 0 || iSec % _samplingRate == 0) {
168 rnx->deepCopy(obs);
169 }
170 rnx->dumpEpoch(_newTime);
[350]171 }
172
[35]173 // First time, set the _lastDumpSec immediately
174 // --------------------------------------------
175 if (_lastDumpSec == 0) {
[382]176 _lastDumpSec = _newTime - 1;
[35]177 }
178
179 // An old observation - throw it away
180 // ----------------------------------
[382]181 if (_newTime <= _lastDumpSec) {
[257]182 if (firstObs) {
183 QSettings settings;
184 if ( !settings.value("outFile").toString().isEmpty() ||
185 !settings.value("outPort").toString().isEmpty() ) {
[258]186 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
187 .arg(staID.data()).arg(iSec).toAscii()) );
[257]188 }
[181]189 }
[35]190 delete obs;
[350]191 return;
[35]192 }
193
[160]194 // Save the observation
195 // --------------------
[382]196 _epochs->insert(_newTime, obs);
[393]197
198 _newObsRunning = false;
[382]199}
[35]200
[382]201// Dump Loop Event
202////////////////////////////////////////////////////////////////////////////
203void bncCaster::dumpEpochSlot() {
[393]204 if (!_newObsRunning) {
205 if (_newTime != 0 && _epochs->size() > 0) {
206 dumpEpochs(_lastDumpSec + 1, _newTime - _waitTime);
207
208 if (_lastDumpSec < _newTime - _waitTime) {
209 _lastDumpSec = _newTime - _waitTime;
210 }
[382]211 }
[252]212 }
[382]213 QTimer::singleShot(100, this, SLOT(dumpEpochSlot()));
[35]214}
215
216// New Connection
217////////////////////////////////////////////////////////////////////////////
218void bncCaster::slotNewConnection() {
219 _sockets->push_back( _server->nextPendingConnection() );
220}
221
222// Add New Thread
223////////////////////////////////////////////////////////////////////////////
224void bncCaster::addGetThread(bncGetThread* getThread) {
225 connect(getThread, SIGNAL(error(const QByteArray&)),
226 this, SLOT(slotGetThreadError(const QByteArray&)));
227
[88]228 _staIDs.push_back(getThread->staID());
[186]229 _threads.push_back(getThread);
[35]230}
231
232// Error in get thread
233////////////////////////////////////////////////////////////////////////////
[88]234void bncCaster::slotGetThreadError(const QByteArray& staID) {
[243]235 QMutexLocker locker(&_mutex);
[88]236 _staIDs.removeAll(staID);
[82]237 emit( newMessage(
[88]238 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
239 if (_staIDs.size() == 0) {
[82]240 emit(newMessage("bncCaster:: last get thread terminated"));
[35]241 emit getThreadErrors();
242 }
243}
244
245// Dump Complete Epochs
246////////////////////////////////////////////////////////////////////////////
247void bncCaster::dumpEpochs(long minTime, long maxTime) {
248
249 const char begEpoch = 'A';
250 const char begObs = 'B';
251 const char endEpoch = 'C';
252
253 for (long sec = minTime; sec <= maxTime; sec++) {
[140]254
[35]255 bool first = true;
256 QList<Observation*> allObs = _epochs->values(sec);
257 QListIterator<Observation*> it(allObs);
258 while (it.hasNext()) {
259 Observation* obs = it.next();
260
[140]261 if (_samplingRate == 0 || sec % _samplingRate == 0) {
262
263 // Output into the file
264 // --------------------
265 if (_out) {
[35]266 if (first) {
[341]267 _out->setFieldWidth(1); *_out << begEpoch << endl;;
[35]268 }
[343]269 _out->setFieldWidth(0); *_out << obs->StatID;
[342]270 _out->setFieldWidth(1); *_out << " " << obs->satSys;
[344]271 _out->setPadChar('0');
[342]272 _out->setFieldWidth(2); *_out << obs->satNum;
[344]273 _out->setPadChar(' ');
[342]274 _out->setFieldWidth(1); *_out << " ";
275 _out->setFieldWidth(4); *_out << obs->GPSWeek;
276 _out->setFieldWidth(1); *_out << " ";
277 _out->setFieldWidth(14); _out->setRealNumberPrecision(7); *_out << obs->GPSWeeks;
278 _out->setFieldWidth(1); *_out << " ";
279 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->C1;
280 _out->setFieldWidth(1); *_out << " ";
[366]281 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->C2;
282 _out->setFieldWidth(1); *_out << " ";
[342]283 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->P1;
284 _out->setFieldWidth(1); *_out << " ";
285 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->P2;
286 _out->setFieldWidth(1); *_out << " ";
287 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->L1;
288 _out->setFieldWidth(1); *_out << " ";
289 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->L2;
[367]290 _out->setFieldWidth(1); *_out << " ";
291 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->S1;
292 _out->setFieldWidth(1); *_out << " ";
293 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->S2;
[342]294 _out->setFieldWidth(1);
295 *_out << " " << obs->SNR1 << " " << obs->SNR2 << endl;
[35]296 if (!it.hasNext()) {
[341]297 _out->setFieldWidth(1); *_out << endEpoch << endl;
[35]298 }
[347]299 _out->flush();
[35]300 }
[140]301
302 // Output into the socket
303 // ----------------------
304 if (_sockets) {
305 int numBytes = sizeof(*obs);
306 QListIterator<QTcpSocket*> is(*_sockets);
307 while (is.hasNext()) {
308 QTcpSocket* sock = is.next();
[397]309 if (sock->state() == QAbstractSocket::ConnectedState) {
310 if (first) {
311 sock->write(&begEpoch, 1);
312 }
313 sock->write(&begObs, 1);
314 sock->write((char*) obs, numBytes);
315 if (!it.hasNext()) {
316 sock->write(&endEpoch, 1);
317 }
[140]318 }
319 }
320 }
[73]321 }
322
[35]323 delete obs;
324 _epochs->remove(sec);
325 first = false;
326 }
327 }
328}
Note: See TracBrowser for help on using the repository browser.