source: ntrip/trunk/BNC/bnccaster.cpp@ 635

Last change on this file since 635 was 635, checked in by mervart, 16 years ago

* empty log message *

File size: 9.0 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters.
3//
4// Copyright (C) 2007
5// German Federal Agency for Cartography and Geodesy (BKG)
6// http://www.bkg.bund.de
7// Czech Technical University Prague, Department of Geodesy
8// http://www.fsv.cvut.cz
9//
10// Email: euref-ip@bkg.bund.de
11//
12// This program is free software; you can redistribute it and/or
13// modify it under the terms of the GNU General Public License
14// as published by the Free Software Foundation, version 2.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24
25/* -------------------------------------------------------------------------
26 * BKG NTRIP Client
27 * -------------------------------------------------------------------------
28 *
29 * Class: bncCaster
30 *
31 * Purpose: buffers and disseminates the data
32 *
33 * Author: L. Mervart
34 *
35 * Created: 24-Dec-2005
36 *
37 * Changes:
38 *
39 * -----------------------------------------------------------------------*/
40
41#include <math.h>
42
43#include "bnccaster.h"
44#include "bncgetthread.h"
45#include "bncutils.h"
46#include "RTCM/GPSDecoder.h"
47
48
49// Constructor
50////////////////////////////////////////////////////////////////////////////
51bncCaster::bncCaster(const QString& outFileName, int port) {
52
53 QSettings settings;
54
55 if ( !outFileName.isEmpty() ) {
56 QString lName = outFileName;
57 expandEnvVar(lName);
58 _outFile = new QFile(lName);
59 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
60 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
61 }
62 else {
63 _outFile->open(QIODevice::WriteOnly);
64 }
65 _out = new QTextStream(_outFile);
66 _out->setRealNumberNotation(QTextStream::FixedNotation);
67 }
68 else {
69 _outFile = 0;
70 _out = 0;
71 }
72
73 _port = port;
74
75 if (_port != 0) {
76 _server = new QTcpServer;
77 _server->listen(QHostAddress::Any, _port);
78 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
79 _sockets = new QList<QTcpSocket*>;
80 }
81 else {
82 _server = 0;
83 _sockets = 0;
84 }
85
86 _epochs = new QMultiMap<long, p_obs>;
87
88 _lastDumpSec = 0;
89
90 _samplingRate = settings.value("rnxSampl").toInt();
91 _waitTime = settings.value("waitTime").toInt();
92 if (_waitTime < 1) {
93 _waitTime = 1;
94 }
95}
96
97// Destructor
98////////////////////////////////////////////////////////////////////////////
99bncCaster::~bncCaster() {
100 QListIterator<bncGetThread*> it(_threads);
101 while(it.hasNext()){
102 bncGetThread* thread = it.next();
103 thread->terminate();
104 thread->wait();
105 delete thread;
106 }
107 delete _out;
108 delete _outFile;
109 delete _server;
110 delete _sockets;
111 if (_epochs) {
112 QListIterator<p_obs> it(_epochs->values());
113 while (it.hasNext()) {
114 delete it.next();
115 }
116 delete _epochs;
117 }
118}
119
120// New Observations
121////////////////////////////////////////////////////////////////////////////
122void bncCaster::newObs(const QByteArray staID, bool firstObs, p_obs obs) {
123
124 QMutexLocker locker(&_mutex);
125
126 obs->_status = t_obs::received;
127
128 long iSec = long(floor(obs->_o.GPSWeeks+0.5));
129 long newTime = obs->_o.GPSWeek * 7*24*3600 + iSec;
130
131 // Rename the Station
132 // ------------------
133 strncpy(obs->_o.StatID, staID.constData(),sizeof(obs->_o.StatID));
134 obs->_o.StatID[sizeof(obs->_o.StatID)-1] = '\0';
135
136 // First time, set the _lastDumpSec immediately
137 // --------------------------------------------
138 if (_lastDumpSec == 0) {
139 _lastDumpSec = newTime - 1;
140 }
141
142 // An old observation - throw it away
143 // ----------------------------------
144 if (newTime <= _lastDumpSec) {
145 if (firstObs) {
146 QSettings settings;
147 if ( !settings.value("outFile").toString().isEmpty() ||
148 !settings.value("outPort").toString().isEmpty() ) {
149 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
150 .arg(staID.data()).arg(iSec).toAscii()) );
151 }
152 }
153 delete obs;
154 return;
155 }
156
157 // Save the observation
158 // --------------------
159 _epochs->insert(newTime, obs);
160
161 // Dump Epochs
162 // -----------
163 if (newTime - _waitTime > _lastDumpSec) {
164 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
165 _lastDumpSec = newTime - _waitTime;
166 }
167}
168
169// New Connection
170////////////////////////////////////////////////////////////////////////////
171void bncCaster::slotNewConnection() {
172 _sockets->push_back( _server->nextPendingConnection() );
173 emit( newMessage(QString("New Connection # %1")
174 .arg(_sockets->size()).toAscii()) );
175}
176
177// Add New Thread
178////////////////////////////////////////////////////////////////////////////
179void bncCaster::addGetThread(bncGetThread* getThread) {
180
181 qRegisterMetaType<p_obs>("p_obs");
182
183 connect(getThread, SIGNAL(newObs(QByteArray, bool, p_obs)),
184 this, SLOT(newObs(QByteArray, bool, p_obs)));
185
186 connect(getThread, SIGNAL(error(QByteArray)),
187 this, SLOT(slotGetThreadError(QByteArray)));
188
189 _staIDs.push_back(getThread->staID());
190 _threads.push_back(getThread);
191}
192
193// Error in get thread
194////////////////////////////////////////////////////////////////////////////
195void bncCaster::slotGetThreadError(QByteArray staID) {
196 QMutexLocker locker(&_mutex);
197 _staIDs.removeAll(staID);
198 emit( newMessage(
199 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
200 if (_staIDs.size() == 0) {
201 emit(newMessage("bncCaster:: last get thread terminated"));
202 emit getThreadErrors();
203 }
204}
205
206// Dump Complete Epochs
207////////////////////////////////////////////////////////////////////////////
208void bncCaster::dumpEpochs(long minTime, long maxTime) {
209
210 const char begEpoch = 'A';
211 const char begObs = 'B';
212 const char endEpoch = 'C';
213
214 for (long sec = minTime; sec <= maxTime; sec++) {
215
216 bool first = true;
217 QList<p_obs> allObs = _epochs->values(sec);
218 QListIterator<p_obs> it(allObs);
219 while (it.hasNext()) {
220 p_obs obs = it.next();
221
222 if (_samplingRate == 0 || sec % _samplingRate == 0) {
223
224 // Output into the file
225 // --------------------
226 if (_out) {
227 if (first) {
228 _out->setFieldWidth(1); *_out << begEpoch << endl;;
229 }
230 _out->setFieldWidth(0); *_out << obs->_o.StatID;
231 _out->setFieldWidth(1); *_out << " " << obs->_o.satSys;
232 _out->setPadChar('0');
233 _out->setFieldWidth(2); *_out << obs->_o.satNum;
234 _out->setPadChar(' ');
235 _out->setFieldWidth(1); *_out << " ";
236 _out->setFieldWidth(4); *_out << obs->_o.GPSWeek;
237 _out->setFieldWidth(1); *_out << " ";
238 _out->setFieldWidth(14); _out->setRealNumberPrecision(7); *_out << obs->_o.GPSWeeks;
239 _out->setFieldWidth(1); *_out << " ";
240 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C1;
241 _out->setFieldWidth(1); *_out << " ";
242 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.C2;
243 _out->setFieldWidth(1); *_out << " ";
244 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P1;
245 _out->setFieldWidth(1); *_out << " ";
246 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.P2;
247 _out->setFieldWidth(1); *_out << " ";
248 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L1;
249 _out->setFieldWidth(1); *_out << " ";
250 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.L2;
251 _out->setFieldWidth(1); *_out << " ";
252 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S1;
253 _out->setFieldWidth(1); *_out << " ";
254 _out->setFieldWidth(14); _out->setRealNumberPrecision(3); *_out << obs->_o.S2;
255 _out->setFieldWidth(1);
256 *_out << " " << obs->_o.SNR1 << " " << obs->_o.SNR2 << endl;
257 if (!it.hasNext()) {
258 _out->setFieldWidth(1); *_out << endEpoch << endl;
259 }
260 _out->flush();
261 }
262
263 // Output into the socket
264 // ----------------------
265 if (_sockets) {
266 int numBytes = sizeof(obs->_o);
267 QListIterator<QTcpSocket*> is(*_sockets);
268 while (is.hasNext()) {
269 QTcpSocket* sock = is.next();
270 if (sock->state() == QAbstractSocket::ConnectedState) {
271 int fd = sock->socketDescriptor();
272 if (first) {
273 ::write(fd, &begEpoch, 1);
274 }
275 ::write(fd, &begObs, 1);
276 ::write(fd, &obs->_o, numBytes);
277 if (!it.hasNext()) {
278 ::write(fd, &endEpoch, 1);
279 }
280 }
281 }
282 }
283 }
284
285 delete obs;
286 _epochs->remove(sec);
287 first = false;
288 }
289 }
290}
Note: See TracBrowser for help on using the repository browser.