source: ntrip/trunk/BNC/bnccaster.cpp@ 369

Last change on this file since 369 was 369, checked in by mervart, 17 years ago

* empty log message *

File size: 9.5 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters,
3// written by Leos Mervart.
4//
5// Copyright (C) 2006
6// German Federal Agency for Cartography and Geodesy (BKG)
7// http://www.bkg.bund.de
8// Czech Technical University Prague, Department of Advanced Geodesy
9// http://www.fsv.cvut.cz
10//
11// Email: euref-ip@bkg.bund.de
12//
13// This program is free software; you can redistribute it and/or
14// modify it under the terms of the GNU General Public License
15// as published by the Free Software Foundation, version 2.
16//
17// This program is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU General Public License for more details.
21//
22// You should have received a copy of the GNU General Public License
23// along with this program; if not, write to the Free Software
24// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26/* -------------------------------------------------------------------------
27 * BKG NTRIP Client
28 * -------------------------------------------------------------------------
29 *
30 * Class: bncCaster
31 *
32 * Purpose: buffers and disseminates the data
33 *
34 * Author: L. Mervart
35 *
36 * Created: 24-Dec-2005
37 *
38 * Changes:
39 *
40 * -----------------------------------------------------------------------*/
41
42#include <math.h>
43
44#include "bnccaster.h"
45#include "bncgetthread.h"
46#include "bncutils.h"
47#include "RTCM/GPSDecoder.h"
48
49// Constructor
50////////////////////////////////////////////////////////////////////////////
51bncCaster::bncCaster(const QString& outFileName, int port) {
52
53 QSettings settings;
54
55 if ( !outFileName.isEmpty() ) {
56 QString lName = outFileName;
57 expandEnvVar(lName);
58 _outFile = new QFile(lName);
59 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
60 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
61 }
62 else {
63 _outFile->open(QIODevice::WriteOnly);
64 }
65 _out = new QTextStream(_outFile);
66 _out->setRealNumberNotation(QTextStream::FixedNotation);
67 }
68 else {
69 _outFile = 0;
70 _out = 0;
71 }
72
73 _port = port;
74
75 if (_port != 0) {
76 _server = new QTcpServer;
77 _server->listen(QHostAddress::Any, _port);
78 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
79 _sockets = new QList<QTcpSocket*>;
80 }
81 else {
82 _server = 0;
83 _sockets = 0;
84 }
85
86 _epochs = new QMultiMap<long, Observation*>;
87
88 _lastDumpSec = 0;
89
90 _samplingRate = settings.value("rnxSampl").toInt();
91 _waitTime = settings.value("waitTime").toInt();
92 if (_waitTime < 1) {
93 _waitTime = 1;
94 }
95}
96
97// Destructor
98////////////////////////////////////////////////////////////////////////////
99bncCaster::~bncCaster() {
100 QListIterator<bncGetThread*> it(_threads);
101 while(it.hasNext()){
102 bncGetThread* thread = it.next();
103 thread->terminate();
104 thread->wait();
105 delete thread;
106 }
107 delete _out;
108 delete _outFile;
109 delete _server;
110 delete _sockets;
111 delete _epochs;
112}
113
114// Reconnecting
115////////////////////////////////////////////////////////////////////////////
116void bncCaster::reconnecting(const QByteArray& staID) {
117 if (_rinexWriters.find(staID) != _rinexWriters.end()) {
118 bncRinex* rnx = _rinexWriters.find(staID).value();
119 rnx->setReconnectFlag(true);
120 }
121}
122
123// New Observations
124////////////////////////////////////////////////////////////////////////////
125void bncCaster::newObs(const QByteArray& staID, const QUrl& mountPoint,
126 bool firstObs, Observation* obs,
127 const QByteArray& format,
128 const QByteArray& latitude,
129 const QByteArray& longitude,
130 const QByteArray& nmea) {
131
132 QMutexLocker locker(&_mutex);
133
134 long iSec = long(floor(obs->GPSWeeks+0.5));
135 long newTime = obs->GPSWeek * 7*24*3600 + iSec;
136
137 // Rename the Station
138 // ------------------
139 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
140 obs->StatID[sizeof(obs->StatID)-1] = '\0';
141
142 // Prepare RINEX Output
143 // --------------------
144 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
145 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID,
146 mountPoint, format, latitude, longitude, nmea));
147 }
148 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
149 if (_samplingRate == 0 || iSec % _samplingRate == 0) {
150 rnx->deepCopy(obs);
151 }
152 rnx->dumpEpoch(newTime);
153
154 // First time, set the _lastDumpSec immediately
155 // --------------------------------------------
156 if (_lastDumpSec == 0) {
157 _lastDumpSec = newTime - 1;
158 }
159
160 // An old observation - throw it away
161 // ----------------------------------
162 if (newTime <= _lastDumpSec) {
163 if (firstObs) {
164 QSettings settings;
165 if ( !settings.value("outFile").toString().isEmpty() ||
166 !settings.value("outPort").toString().isEmpty() ) {
167 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
168 .arg(staID.data()).arg(iSec).toAscii()) );
169 }
170 }
171 delete obs;
172 return;
173 }
174
175 // Save the observation
176 // --------------------
177 _epochs->insert(newTime, obs);
178
179 // Dump older epochs
180 // -----------------
181 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
182
183 if (_lastDumpSec < newTime - _waitTime) {
184 _lastDumpSec = newTime - _waitTime;
185 }
186}
187
188// New Connection
189////////////////////////////////////////////////////////////////////////////
190void bncCaster::slotNewConnection() {
191 _sockets->push_back( _server->nextPendingConnection() );
192}
193
194// Add New Thread
195////////////////////////////////////////////////////////////////////////////
196void bncCaster::addGetThread(bncGetThread* getThread) {
197 connect(getThread, SIGNAL(error(const QByteArray&)),
198 this, SLOT(slotGetThreadError(const QByteArray&)));
199
200 _staIDs.push_back(getThread->staID());
201 _threads.push_back(getThread);
202}
203
204// Error in get thread
205////////////////////////////////////////////////////////////////////////////
206void bncCaster::slotGetThreadError(const QByteArray& staID) {
207 QMutexLocker locker(&_mutex);
208 _staIDs.removeAll(staID);
209 emit( newMessage(
210 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
211 if (_staIDs.size() == 0) {
212 emit(newMessage("bncCaster:: last get thread terminated"));
213 emit getThreadErrors();
214 }
215}
216
217// Dump Complete Epochs
218////////////////////////////////////////////////////////////////////////////
219void bncCaster::dumpEpochs(long minTime, long maxTime) {
220
221 const char begEpoch = 'A';
222 const char begObs = 'B';
223 const char endEpoch = 'C';
224
225 for (long sec = minTime; sec <= maxTime; sec++) {
226
227 bool first = true;
228 QList<Observation*> allObs = _epochs->values(sec);
229 QListIterator<Observation*> it(allObs);
230 while (it.hasNext()) {
231 Observation* obs = it.next();
232
233 if (_samplingRate == 0 || sec % _samplingRate == 0) {
234
235 // Output into the file
236 // --------------------
237 if (_out) {
238 if (first) {
239 _out->setFieldWidth(1); *_out << begEpoch << endl;;
240 }
241 _out->setFieldWidth(0); *_out << obs->StatID;
242 _out->setFieldWidth(1); *_out << " " << obs->satSys;
243 _out->setPadChar('0');
244 _out->setFieldWidth(2); *_out << obs->satNum;
245 _out->setPadChar(' ');
246 _out->setFieldWidth(1); *_out << " ";
247 _out->setFieldWidth(4); *_out << obs->GPSWeek;
248 _out->setFieldWidth(1); *_out << " ";
249 _out->setFieldWidth(14); _out->setRealNumberPrecision(7); *_out << obs->GPSWeeks;
250 _out->setFieldWidth(1); *_out << " ";
251 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->C1;
252 _out->setFieldWidth(1); *_out << " ";
253 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->C2;
254 _out->setFieldWidth(1); *_out << " ";
255 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->P1;
256 _out->setFieldWidth(1); *_out << " ";
257 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->P2;
258 _out->setFieldWidth(1); *_out << " ";
259 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->L1;
260 _out->setFieldWidth(1); *_out << " ";
261 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->L2;
262 _out->setFieldWidth(1); *_out << " ";
263 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->S1;
264 _out->setFieldWidth(1); *_out << " ";
265 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->S2;
266 _out->setFieldWidth(1);
267 *_out << " " << obs->SNR1 << " " << obs->SNR2 << endl;
268 if (!it.hasNext()) {
269 _out->setFieldWidth(1); *_out << endEpoch << endl;
270 }
271 _out->flush();
272 }
273
274 // Output into the socket
275 // ----------------------
276 if (_sockets) {
277 int numBytes = sizeof(*obs);
278 QListIterator<QTcpSocket*> is(*_sockets);
279 while (is.hasNext()) {
280 QTcpSocket* sock = is.next();
281 if (first) {
282 sock->write(&begEpoch, 1);
283 }
284 sock->write(&begObs, 1);
285 sock->write((char*) obs, numBytes);
286 if (!it.hasNext()) {
287 sock->write(&endEpoch, 1);
288 }
289 }
290 }
291 }
292
293 delete obs;
294 _epochs->remove(sec);
295 first = false;
296 }
297 }
298}
Note: See TracBrowser for help on using the repository browser.