source: ntrip/trunk/BNC/bnccaster.cpp@ 738

Last change on this file since 738 was 738, checked in by mervart, 16 years ago

* empty log message *

File size: 9.6 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <iostream>
42#include <math.h>
43#include <unistd.h>
44
45#include "bnccaster.h"
46#include "bncgetthread.h"
47#include "bncutils.h"
48#include "RTCM/GPSDecoder.h"
49
50
51// Constructor
52////////////////////////////////////////////////////////////////////////////
53bncCaster::bncCaster(const QString& outFileName, int port) {
54
55 QSettings settings;
56
57 if ( !outFileName.isEmpty() ) {
58 QString lName = outFileName;
59 expandEnvVar(lName);
60 _outFile = new QFile(lName);
61 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
62 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
63 }
64 else {
65 _outFile->open(QIODevice::WriteOnly);
66 }
67 _out = new QTextStream(_outFile);
68 _out->setRealNumberNotation(QTextStream::FixedNotation);
69 }
70 else {
71 _outFile = 0;
72 _out = 0;
73 }
74
75 _port = port;
76
77 if (_port != 0) {
78 _server = new QTcpServer;
79 _server->listen(QHostAddress::Any, _port);
80 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
81 _sockets = new QList<QTcpSocket*>;
82 }
83 else {
84 _server = 0;
85 _sockets = 0;
86 }
87
88 _epochs = new QMultiMap<long, p_obs>;
89
90 _lastDumpSec = 0;
91
92 ///// _samplingRate = settings.value("rnxSampl").toInt();
93 _samplingRate = 0;
94 _waitTime = settings.value("waitTime").toInt();
95 if (_waitTime < 1) {
96 _waitTime = 1;
97 }
98}
99
100// Destructor
101////////////////////////////////////////////////////////////////////////////
102bncCaster::~bncCaster() {
103 QListIterator<bncGetThread*> it(_threads);
104 while(it.hasNext()){
105 bncGetThread* thread = it.next();
106 thread->terminate();
107 thread->wait();
108 delete thread;
109 }
110 delete _out;
111 delete _outFile;
112 delete _server;
113 delete _sockets;
114 if (_epochs) {
115 QListIterator<p_obs> it(_epochs->values());
116 while (it.hasNext()) {
117 delete it.next();
118 }
119 delete _epochs;
120 }
121}
122
123// New Observations
124////////////////////////////////////////////////////////////////////////////
125void bncCaster::newObs(const QByteArray staID, bool firstObs, p_obs obs) {
126
127 QMutexLocker locker(&_mutex);
128
129 obs->_status = t_obs::received;
130
131 long iSec = long(floor(obs->_o.GPSWeeks+0.5));
132 long newTime = obs->_o.GPSWeek * 7*24*3600 + iSec;
133
134 // Rename the Station
135 // ------------------
136 strncpy(obs->_o.StatID, staID.constData(),sizeof(obs->_o.StatID));
137 obs->_o.StatID[sizeof(obs->_o.StatID)-1] = '\0';
138
139 // First time, set the _lastDumpSec immediately
140 // --------------------------------------------
141 if (_lastDumpSec == 0) {
142 _lastDumpSec = newTime - 1;
143 }
144
145 // An old observation - throw it away
146 // ----------------------------------
147 if (newTime <= _lastDumpSec) {
148 if (firstObs) {
149 QSettings settings;
150 if ( !settings.value("outFile").toString().isEmpty() ||
151 !settings.value("outPort").toString().isEmpty() ) {
152 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
153 .arg(staID.data()).arg(iSec).toAscii()) );
154 }
155 }
156 delete obs;
157 return;
158 }
159
160 // Save the observation
161 // --------------------
162 _epochs->insert(newTime, obs);
163
164 // Dump Epochs
165 // -----------
166 if (newTime - _waitTime > _lastDumpSec) {
167 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
168 _lastDumpSec = newTime - _waitTime;
169 }
170}
171
172// New Connection
173////////////////////////////////////////////////////////////////////////////
174void bncCaster::slotNewConnection() {
175 _sockets->push_back( _server->nextPendingConnection() );
176 emit( newMessage(QString("New Connection # %1")
177 .arg(_sockets->size()).toAscii()) );
178}
179
180// Add New Thread
181////////////////////////////////////////////////////////////////////////////
182void bncCaster::addGetThread(bncGetThread* getThread) {
183
184 qRegisterMetaType<p_obs>("p_obs");
185
186 connect(getThread, SIGNAL(newObs(QByteArray, bool, p_obs)),
187 this, SLOT(newObs(QByteArray, bool, p_obs)));
188
189 connect(getThread, SIGNAL(error(QByteArray)),
190 this, SLOT(slotGetThreadError(QByteArray)));
191
192 _staIDs.push_back(getThread->staID());
193 _threads.push_back(getThread);
194}
195
196// Error in get thread
197////////////////////////////////////////////////////////////////////////////
198void bncCaster::slotGetThreadError(QByteArray staID) {
199 QMutexLocker locker(&_mutex);
200 _staIDs.removeAll(staID);
201 emit( newMessage(
202 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
203 if (_staIDs.size() == 0) {
204 emit(newMessage("bncCaster:: last get thread terminated"));
205 emit getThreadErrors();
206 }
207}
208
209// Dump Complete Epochs
210////////////////////////////////////////////////////////////////////////////
211void bncCaster::dumpEpochs(long minTime, long maxTime) {
212
213 const char begEpoch = 'A';
214 const char begObs = 'B';
215 const char endEpoch = 'C';
216
217 for (long sec = minTime; sec <= maxTime; sec++) {
218
219 bool first = true;
220 QList<p_obs> allObs = _epochs->values(sec);
221 QListIterator<p_obs> it(allObs);
222 while (it.hasNext()) {
223 p_obs obs = it.next();
224
225 if (_samplingRate == 0 || sec % _samplingRate == 0) {
226
227 // Output into the file
228 // --------------------
229 if (_out) {
230 if (first) {
231 _out->setFieldWidth(1); *_out << begEpoch << endl;;
232 }
233 _out->setFieldWidth(0); *_out << obs->_o.StatID;
234 _out->setFieldWidth(1); *_out << " " << obs->_o.satSys;
235 _out->setPadChar('0');
236 _out->setFieldWidth(2); *_out << obs->_o.satNum;
237 _out->setPadChar(' ');
238 _out->setFieldWidth(1); *_out << " ";
239 _out->setFieldWidth(4); *_out << obs->_o.GPSWeek;
240 _out->setFieldWidth(1); *_out << " ";
241 _out->setFieldWidth(14); _out->setRealNumberPrecision(7); *_out << obs->_o.GPSWeeks;
242 _out->setFieldWidth(1); *_out << " ";
243 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C1;
244 _out->setFieldWidth(1); *_out << " ";
245 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C2;
246 _out->setFieldWidth(1); *_out << " ";
247 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P1;
248 _out->setFieldWidth(1); *_out << " ";
249 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P2;
250 _out->setFieldWidth(1); *_out << " ";
251 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L1;
252 _out->setFieldWidth(1); *_out << " ";
253 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L2;
254 _out->setFieldWidth(1); *_out << " ";
255 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S1;
256 _out->setFieldWidth(1); *_out << " ";
257 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S2;
258 _out->setFieldWidth(1);
259 *_out << " " << obs->_o.SNR1 << " " << obs->_o.SNR2 << endl;
260 if (!it.hasNext()) {
261 _out->setFieldWidth(1); *_out << endEpoch << endl;
262 }
263 _out->flush();
264 }
265
266 // Output into the socket
267 // ----------------------
268 if (_sockets) {
269 int numBytes = sizeof(obs->_o);
270 QMutableListIterator<QTcpSocket*> is(*_sockets);
271 while (is.hasNext()) {
272 QTcpSocket* sock = is.next();
273 if (sock->state() == QAbstractSocket::ConnectedState) {
274 bool ok = true;
275 int fd = sock->socketDescriptor();
276 if (first) {
277 if (::write(fd, &begEpoch, 1) != 1) {
278 ok = false;
279 }
280 }
281 if (::write(fd, &begObs, 1) != 1) {
282 ok = false;
283 }
284 if (::write(fd, &obs->_o, numBytes) != numBytes) {
285 ok = false;
286 }
287 if (!it.hasNext()) {
288 if (::write(fd, &endEpoch, 1) != 1) {
289 ok = false;
290 }
291 }
292 if (!ok) {
293 delete sock;
294 is.remove();
295 }
296 }
297 else if (sock->state() != QAbstractSocket::ConnectingState) {
298 delete sock;
299 is.remove();
300 }
301 }
302 }
303 }
304
305 delete obs;
306 _epochs->remove(sec);
307 first = false;
308 }
309 }
310}
Note: See TracBrowser for help on using the repository browser.