source: ntrip/trunk/BNC/bnccaster.cpp@ 1178

Last change on this file since 1178 was 1178, checked in by mervart, 16 years ago

* empty log message *

File size: 12.9 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <iostream>
42#include <math.h>
43#include <unistd.h>
44
45#include "bnccaster.h"
46#include "bncapp.h"
47#include "bncgetthread.h"
48#include "bncutils.h"
49#include "RTCM/GPSDecoder.h"
50
51
52// Constructor
53////////////////////////////////////////////////////////////////////////////
54bncCaster::bncCaster(const QString& outFileName, int port) {
55
56 QSettings settings;
57
58 if ( !outFileName.isEmpty() ) {
59 QString lName = outFileName;
60 expandEnvVar(lName);
61 _outFile = new QFile(lName);
62 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
63 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
64 }
65 else {
66 _outFile->open(QIODevice::WriteOnly);
67 }
68 _out = new QTextStream(_outFile);
69 _out->setRealNumberNotation(QTextStream::FixedNotation);
70 }
71 else {
72 _outFile = 0;
73 _out = 0;
74 }
75
76 _port = port;
77
78 if (_port != 0) {
79 _server = new QTcpServer;
80 _server->listen(QHostAddress::Any, _port);
81 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
82 _sockets = new QList<QTcpSocket*>;
83 }
84 else {
85 _server = 0;
86 _sockets = 0;
87 }
88
89 _epochs = new QMultiMap<long, p_obs>;
90
91 _lastDumpSec = 0;
92
93 _confTimer = 0;
94
95 connect(this, SIGNAL(newMessage(QByteArray)),
96 (bncApp*) qApp, SLOT(slotMessage(const QByteArray)));
97}
98
99// Destructor
100////////////////////////////////////////////////////////////////////////////
101bncCaster::~bncCaster() {
102 QListIterator<bncGetThread*> it(_threads);
103 while(it.hasNext()){
104 bncGetThread* thread = it.next();
105 thread->terminate();
106 thread->wait();
107 delete thread;
108 }
109 delete _out;
110 delete _outFile;
111 delete _server;
112 delete _sockets;
113 if (_epochs) {
114 QListIterator<p_obs> it(_epochs->values());
115 while (it.hasNext()) {
116 delete it.next();
117 }
118 delete _epochs;
119 }
120}
121
122// New Observations
123////////////////////////////////////////////////////////////////////////////
124void bncCaster::newObs(const QByteArray staID, bool firstObs, p_obs obs) {
125
126 QMutexLocker locker(&_mutex);
127
128 obs->_status = t_obs::received;
129
130 long iSec = long(floor(obs->_o.GPSWeeks+0.5));
131 long newTime = obs->_o.GPSWeek * 7*24*3600 + iSec;
132
133 // Rename the Station
134 // ------------------
135 strncpy(obs->_o.StatID, staID.constData(),sizeof(obs->_o.StatID));
136 obs->_o.StatID[sizeof(obs->_o.StatID)-1] = '\0';
137
138 // First time, set the _lastDumpSec immediately
139 // --------------------------------------------
140 if (_lastDumpSec == 0) {
141 _lastDumpSec = newTime - 1;
142 }
143
144 // An old observation - throw it away
145 // ----------------------------------
146 if (newTime <= _lastDumpSec) {
147 if (firstObs) {
148 QSettings settings;
149 if ( !settings.value("outFile").toString().isEmpty() ||
150 !settings.value("outPort").toString().isEmpty() ) {
151 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
152 .arg(staID.data()).arg(iSec).toAscii()) );
153 }
154 }
155 delete obs;
156 return;
157 }
158
159 // Save the observation
160 // --------------------
161 _epochs->insert(newTime, obs);
162
163 // Dump Epochs
164 // -----------
165 if (newTime - _waitTime > _lastDumpSec) {
166 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
167 _lastDumpSec = newTime - _waitTime;
168 }
169}
170
171// New Connection
172////////////////////////////////////////////////////////////////////////////
173void bncCaster::slotNewConnection() {
174 _sockets->push_back( _server->nextPendingConnection() );
175 emit( newMessage(QString("New Connection # %1")
176 .arg(_sockets->size()).toAscii()) );
177}
178
179// Add New Thread
180////////////////////////////////////////////////////////////////////////////
181void bncCaster::addGetThread(bncGetThread* getThread) {
182
183 qRegisterMetaType<p_obs>("p_obs");
184
185 connect(getThread, SIGNAL(newObs(QByteArray, bool, p_obs)),
186 this, SLOT(newObs(QByteArray, bool, p_obs)));
187
188 connect(getThread, SIGNAL(error(QByteArray)),
189 this, SLOT(slotGetThreadError(QByteArray)));
190
191 _staIDs.push_back(getThread->staID());
192 _threads.push_back(getThread);
193
194 getThread->start();
195}
196
197// Error in get thread
198////////////////////////////////////////////////////////////////////////////
199void bncCaster::slotGetThreadError(QByteArray staID) {
200 QMutexLocker locker(&_mutex);
201 _staIDs.removeAll(staID);
202 emit( newMessage(
203 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
204 if (_staIDs.size() == 0) {
205 emit(newMessage("bncCaster:: last get thread terminated"));
206 emit getThreadErrors();
207 }
208}
209
210// Dump Complete Epochs
211////////////////////////////////////////////////////////////////////////////
212void bncCaster::dumpEpochs(long minTime, long maxTime) {
213
214 const char begEpoch = 'A';
215 const char begObs = 'B';
216 const char endEpoch = 'C';
217
218 for (long sec = minTime; sec <= maxTime; sec++) {
219
220 bool first = true;
221 QList<p_obs> allObs = _epochs->values(sec);
222 QListIterator<p_obs> it(allObs);
223 while (it.hasNext()) {
224 p_obs obs = it.next();
225
226 if (_samplingRate == 0 || sec % _samplingRate == 0) {
227
228 // Output into the file
229 // --------------------
230 if (_out) {
231 if (first) {
232 _out->setFieldWidth(1); *_out << begEpoch << endl;;
233 }
234 _out->setFieldWidth(0); *_out << obs->_o.StatID;
235 _out->setFieldWidth(1); *_out << " " << obs->_o.satSys;
236 _out->setPadChar('0');
237 _out->setFieldWidth(2); *_out << obs->_o.satNum;
238 _out->setPadChar(' ');
239 _out->setFieldWidth(1); *_out << " ";
240 _out->setFieldWidth(4); *_out << obs->_o.GPSWeek;
241 _out->setFieldWidth(1); *_out << " ";
242 _out->setFieldWidth(14); _out->setRealNumberPrecision(7); *_out << obs->_o.GPSWeeks;
243 _out->setFieldWidth(1); *_out << " ";
244 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C1;
245 _out->setFieldWidth(1); *_out << " ";
246 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C2;
247 _out->setFieldWidth(1); *_out << " ";
248 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P1;
249 _out->setFieldWidth(1); *_out << " ";
250 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P2;
251 _out->setFieldWidth(1); *_out << " ";
252 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L1;
253 _out->setFieldWidth(1); *_out << " ";
254 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L2;
255 _out->setFieldWidth(1); *_out << " ";
256 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S1;
257 _out->setFieldWidth(1); *_out << " ";
258 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S2;
259 _out->setFieldWidth(1);
260 *_out << " " << obs->_o.SNR1 << " " << obs->_o.SNR2 << endl;
261 if (!it.hasNext()) {
262 _out->setFieldWidth(1); *_out << endEpoch << endl;
263 }
264 _out->flush();
265 }
266
267 // Output into the socket
268 // ----------------------
269 if (_sockets) {
270 int numBytes = sizeof(obs->_o);
271 QMutableListIterator<QTcpSocket*> is(*_sockets);
272 while (is.hasNext()) {
273 QTcpSocket* sock = is.next();
274 if (sock->state() == QAbstractSocket::ConnectedState) {
275 bool ok = true;
276 int fd = sock->socketDescriptor();
277 if (first) {
278 if (::write(fd, &begEpoch, 1) != 1) {
279 ok = false;
280 }
281 }
282 if (::write(fd, &begObs, 1) != 1) {
283 ok = false;
284 }
285 if (::write(fd, &obs->_o, numBytes) != numBytes) {
286 ok = false;
287 }
288 if (!it.hasNext()) {
289 if (::write(fd, &endEpoch, 1) != 1) {
290 ok = false;
291 }
292 }
293 if (!ok) {
294 delete sock;
295 is.remove();
296 }
297 }
298 else if (sock->state() != QAbstractSocket::ConnectingState) {
299 delete sock;
300 is.remove();
301 }
302 }
303 }
304 }
305
306 delete obs;
307 _epochs->remove(sec);
308 first = false;
309 }
310 }
311}
312
313// Reread configuration
314////////////////////////////////////////////////////////////////////////////
315void bncCaster::slotReadMountpoints() {
316
317 QSettings settings;
318
319 // Reread several options
320 // ----------------------
321 _samplingRate = settings.value("binSampl").toInt();
322 _waitTime = settings.value("waitTime").toInt();
323 if (_waitTime < 1) {
324 _waitTime = 1;
325 }
326
327 // Add new mountpoints
328 // -------------------
329 int iMount = -1;
330 QListIterator<QString> it(settings.value("mountPoints").toStringList());
331 while (it.hasNext()) {
332 ++iMount;
333 QStringList hlp = it.next().split(" ");
334 if (hlp.size() <= 1) continue;
335 QUrl url(hlp[0]);
336
337 // Does it already exist?
338 // ----------------------
339 bool existFlg = false;
340 QListIterator<bncGetThread*> iTh(_threads);
341 while (iTh.hasNext()) {
342 bncGetThread* thread = iTh.next();
343 if (thread->mountPoint() == url) {
344 existFlg = true;
345 break;
346 }
347 }
348
349 // New bncGetThread
350 // ----------------
351 if (!existFlg) {
352 QByteArray format = hlp[1].toAscii();
353 QByteArray latitude = hlp[2].toAscii();
354 QByteArray longitude = hlp[3].toAscii();
355 QByteArray nmea = hlp[4].toAscii();
356
357 bncGetThread* getThread = new bncGetThread(url, format, latitude,
358 longitude, nmea, iMount);
359
360 bncApp* app = (bncApp*) qApp;
361 app->connect(getThread, SIGNAL(newMessage(QByteArray)),
362 app, SLOT(slotMessage(const QByteArray)));
363 addGetThread(getThread);
364 }
365 }
366
367 // Remove mountpoints
368 // ------------------
369 QListIterator<bncGetThread*> iTh(_threads);
370 while (iTh.hasNext()) {
371 bncGetThread* thread = iTh.next();
372
373 bool existFlg = false;
374 QListIterator<QString> it(settings.value("mountPoints").toStringList());
375 while (it.hasNext()) {
376 QStringList hlp = it.next().split(" ");
377 if (hlp.size() <= 1) continue;
378 QUrl url(hlp[0]);
379
380 if (thread->mountPoint() == url) {
381 existFlg = true;
382 break;
383 }
384 }
385
386 if (!existFlg) {
387 disconnect(thread, 0, 0, 0);
388 _staIDs.removeAll(thread->staID());
389 _threads.removeAll(thread);
390 thread->terminate();
391 thread->wait();
392 delete thread;
393 }
394 }
395
396 emit mountPointsRead();
397 emit( newMessage(QString("Table of Mountpoints (re-)read: new number = %1")
398 .arg(_threads.count()).toAscii()) );
399
400 // (Re-) Start the configuration timer
401 // -----------------------------------
402 int ms = 0;
403
404 if (_confTimer) {
405 ms = 1000 * _confInterval;
406 }
407 else {
408 _confTimer = new QTimer();
409 connect(_confTimer, SIGNAL(timeout()), this, SLOT(slotReadMountpoints()));
410
411 QTime currTime = currentDateAndTimeGPS().time();
412 QTime nextShotTime;
413
414 if (settings.value("onTheFlyInterval").toString() == "1 min") {
415 _confInterval = 60;
416 nextShotTime = QTime(currTime.hour(), currTime.minute()+1, 0);
417 }
418 else if (settings.value("onTheFlyInterval").toString() == "1 hour") {
419 _confInterval = 3600;
420 nextShotTime = QTime(currTime.hour()+1, 0, 0);
421 }
422 else {
423 _confInterval = 86400;
424 nextShotTime = QTime(23, 59, 59, 999);
425 }
426
427 ms = currTime.msecsTo(nextShotTime);
428 if (ms < 30000) {
429 ms = 30000;
430 }
431 }
432
433 _confTimer->start(ms);
434}
Note: See TracBrowser for help on using the repository browser.