source: ntrip/trunk/BNC/bnccaster.cpp@ 630

Last change on this file since 630 was 630, checked in by mervart, 16 years ago

* empty log message *

File size: 9.1 KB
RevLine 
[280]1// Part of BNC, a utility for retrieving decoding and
[464]2// converting GNSS data streams from NTRIP broadcasters.
[280]3//
[464]4// Copyright (C) 2007
[280]5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
[464]7// Czech Technical University Prague, Department of Geodesy
[280]8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
[35]24
25/* -------------------------------------------------------------------------
[93]26 * BKG NTRIP Client
[35]27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
[222]41#include <math.h>
42
[35]43#include "bnccaster.h"
44#include "bncgetthread.h"
[134]45#include "bncutils.h"
[207]46#include "RTCM/GPSDecoder.h"
[35]47
48// Constructor
49////////////////////////////////////////////////////////////////////////////
50bncCaster::bncCaster(const QString& outFileName, int port) {
51
[275]52 QSettings settings;
53
[35]54 if ( !outFileName.isEmpty() ) {
[134]55 QString lName = outFileName;
56 expandEnvVar(lName);
57 _outFile = new QFile(lName);
[275]58 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
59 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
60 }
61 else {
62 _outFile->open(QIODevice::WriteOnly);
63 }
[35]64 _out = new QTextStream(_outFile);
65 _out->setRealNumberNotation(QTextStream::FixedNotation);
66 }
67 else {
68 _outFile = 0;
69 _out = 0;
70 }
71
72 _port = port;
73
74 if (_port != 0) {
75 _server = new QTcpServer;
76 _server->listen(QHostAddress::Any, _port);
77 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
78 _sockets = new QList<QTcpSocket*>;
79 }
80 else {
81 _server = 0;
82 _sockets = 0;
83 }
84
[622]85 _epochs = new QMultiMap<long, p_obs>;
[35]86
87 _lastDumpSec = 0;
[133]88
89 _samplingRate = settings.value("rnxSampl").toInt();
[136]90 _waitTime = settings.value("waitTime").toInt();
[139]91 if (_waitTime < 1) {
92 _waitTime = 1;
[136]93 }
[35]94}
95
96// Destructor
97////////////////////////////////////////////////////////////////////////////
98bncCaster::~bncCaster() {
[186]99 QListIterator<bncGetThread*> it(_threads);
100 while(it.hasNext()){
101 bncGetThread* thread = it.next();
102 thread->terminate();
[605]103 thread->wait();
[230]104 delete thread;
[186]105 }
[35]106 delete _out;
107 delete _outFile;
[187]108 delete _server;
[630]109 if (_sockets) {
110 QListIterator<QTcpSocket*> is(*_sockets);
111 while (is.hasNext()) {
112 delete is.next();
113 }
114 delete _sockets;
115 }
[603]116 if (_epochs) {
[622]117 QListIterator<p_obs> it(_epochs->values());
[603]118 while (it.hasNext()) {
119 delete it.next();
120 }
121 delete _epochs;
122 }
[35]123}
124
125// New Observations
126////////////////////////////////////////////////////////////////////////////
[622]127void bncCaster::newObs(const QByteArray staID, bool firstObs, p_obs obs) {
[35]128
[243]129 QMutexLocker locker(&_mutex);
130
[624]131 obs->_status = t_obs::received;
132
[622]133 long iSec = long(floor(obs->_o.GPSWeeks+0.5));
134 long newTime = obs->_o.GPSWeek * 7*24*3600 + iSec;
[35]135
[160]136 // Rename the Station
137 // ------------------
[622]138 strncpy(obs->_o.StatID, staID.constData(),sizeof(obs->_o.StatID));
139 obs->_o.StatID[sizeof(obs->_o.StatID)-1] = '\0';
[160]140
[35]141 // First time, set the _lastDumpSec immediately
142 // --------------------------------------------
143 if (_lastDumpSec == 0) {
[462]144 _lastDumpSec = newTime - 1;
[35]145 }
146
147 // An old observation - throw it away
148 // ----------------------------------
[462]149 if (newTime <= _lastDumpSec) {
[257]150 if (firstObs) {
151 QSettings settings;
152 if ( !settings.value("outFile").toString().isEmpty() ||
153 !settings.value("outPort").toString().isEmpty() ) {
[258]154 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
155 .arg(staID.data()).arg(iSec).toAscii()) );
[257]156 }
[181]157 }
[35]158 delete obs;
[350]159 return;
[35]160 }
161
[160]162 // Save the observation
163 // --------------------
[462]164 _epochs->insert(newTime, obs);
[393]165
[462]166 // Dump Epochs
167 // -----------
168 if (newTime - _waitTime > _lastDumpSec) {
169 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
170 _lastDumpSec = newTime - _waitTime;
[252]171 }
[35]172}
173
174// New Connection
175////////////////////////////////////////////////////////////////////////////
176void bncCaster::slotNewConnection() {
177 _sockets->push_back( _server->nextPendingConnection() );
[630]178 emit( newMessage(QString("New Connection # %1")
179 .arg(_sockets->size()).toAscii()) );
[35]180}
181
182// Add New Thread
183////////////////////////////////////////////////////////////////////////////
184void bncCaster::addGetThread(bncGetThread* getThread) {
[624]185
186 qRegisterMetaType<p_obs>("p_obs");
187
[628]188 connect(getThread, SIGNAL(newObs(QByteArray, bool, p_obs)),
189 this, SLOT(newObs(QByteArray, bool, p_obs)));
[463]190
[628]191 connect(getThread, SIGNAL(error(QByteArray)),
192 this, SLOT(slotGetThreadError(QByteArray)));
[35]193
[88]194 _staIDs.push_back(getThread->staID());
[186]195 _threads.push_back(getThread);
[35]196}
197
198// Error in get thread
199////////////////////////////////////////////////////////////////////////////
[628]200void bncCaster::slotGetThreadError(QByteArray staID) {
[243]201 QMutexLocker locker(&_mutex);
[88]202 _staIDs.removeAll(staID);
[82]203 emit( newMessage(
[88]204 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
205 if (_staIDs.size() == 0) {
[82]206 emit(newMessage("bncCaster:: last get thread terminated"));
[35]207 emit getThreadErrors();
208 }
209}
210
211// Dump Complete Epochs
212////////////////////////////////////////////////////////////////////////////
213void bncCaster::dumpEpochs(long minTime, long maxTime) {
214
215 const char begEpoch = 'A';
216 const char begObs = 'B';
217 const char endEpoch = 'C';
218
219 for (long sec = minTime; sec <= maxTime; sec++) {
[140]220
[35]221 bool first = true;
[622]222 QList<p_obs> allObs = _epochs->values(sec);
223 QListIterator<p_obs> it(allObs);
[35]224 while (it.hasNext()) {
[622]225 p_obs obs = it.next();
[35]226
[140]227 if (_samplingRate == 0 || sec % _samplingRate == 0) {
228
229 // Output into the file
230 // --------------------
231 if (_out) {
[35]232 if (first) {
[341]233 _out->setFieldWidth(1); *_out << begEpoch << endl;;
[35]234 }
[622]235 _out->setFieldWidth(0); *_out << obs->_o.StatID;
236 _out->setFieldWidth(1); *_out << " " << obs->_o.satSys;
[344]237 _out->setPadChar('0');
[622]238 _out->setFieldWidth(2); *_out << obs->_o.satNum;
[344]239 _out->setPadChar(' ');
[342]240 _out->setFieldWidth(1); *_out << " ";
[622]241 _out->setFieldWidth(4); *_out << obs->_o.GPSWeek;
[342]242 _out->setFieldWidth(1); *_out << " ";
[622]243 _out->setFieldWidth(14); _out->setRealNumberPrecision(7); *_out << obs->_o.GPSWeeks;
[342]244 _out->setFieldWidth(1); *_out << " ";
[622]245 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C1;
[342]246 _out->setFieldWidth(1); *_out << " ";
[622]247 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C2;
[366]248 _out->setFieldWidth(1); *_out << " ";
[622]249 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P1;
[342]250 _out->setFieldWidth(1); *_out << " ";
[622]251 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P2;
[342]252 _out->setFieldWidth(1); *_out << " ";
[622]253 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L1;
[342]254 _out->setFieldWidth(1); *_out << " ";
[622]255 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L2;
[367]256 _out->setFieldWidth(1); *_out << " ";
[622]257 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S1;
[367]258 _out->setFieldWidth(1); *_out << " ";
[622]259 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S2;
[342]260 _out->setFieldWidth(1);
[622]261 *_out << " " << obs->_o.SNR1 << " " << obs->_o.SNR2 << endl;
[35]262 if (!it.hasNext()) {
[341]263 _out->setFieldWidth(1); *_out << endEpoch << endl;
[35]264 }
[347]265 _out->flush();
[35]266 }
[140]267
268 // Output into the socket
269 // ----------------------
270 if (_sockets) {
[622]271 int numBytes = sizeof(obs->_o);
[140]272 QListIterator<QTcpSocket*> is(*_sockets);
273 while (is.hasNext()) {
274 QTcpSocket* sock = is.next();
[397]275 if (sock->state() == QAbstractSocket::ConnectedState) {
276 if (first) {
277 sock->write(&begEpoch, 1);
278 }
279 sock->write(&begObs, 1);
[622]280 sock->write((char*) &obs->_o, numBytes);
[397]281 if (!it.hasNext()) {
282 sock->write(&endEpoch, 1);
283 }
[140]284 }
285 }
286 }
[73]287 }
288
[35]289 delete obs;
290 _epochs->remove(sec);
291 first = false;
292 }
293 }
294}
Note: See TracBrowser for help on using the repository browser.