source: ntrip/trunk/BNC/bnccaster.cpp@ 336

Last change on this file since 336 was 336, checked in by mervart, 17 years ago

* empty log message *

File size: 7.8 KB
Line 
1// Part of BNC, a utility for retrieving decoding and
2// converting GNSS data streams from NTRIP broadcasters,
3// written by Leos Mervart.
4//
5// Copyright (C) 2006
6// German Federal Agency for Cartography and Geodesy (BKG)
7// http://www.bkg.bund.de
8// Czech Technical University Prague, Department of Advanced Geodesy
9// http://www.fsv.cvut.cz
10//
11// Email: euref-ip@bkg.bund.de
12//
13// This program is free software; you can redistribute it and/or
14// modify it under the terms of the GNU General Public License
15// as published by the Free Software Foundation, version 2.
16//
17// This program is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU General Public License for more details.
21//
22// You should have received a copy of the GNU General Public License
23// along with this program; if not, write to the Free Software
24// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26/* -------------------------------------------------------------------------
27 * BKG NTRIP Client
28 * -------------------------------------------------------------------------
29 *
30 * Class: bncCaster
31 *
32 * Purpose: buffers and disseminates the data
33 *
34 * Author: L. Mervart
35 *
36 * Created: 24-Dec-2005
37 *
38 * Changes:
39 *
40 * -----------------------------------------------------------------------*/
41
42#include <math.h>
43
44#include "bnccaster.h"
45#include "bncgetthread.h"
46#include "bncutils.h"
47#include "RTCM/GPSDecoder.h"
48
49// Constructor
50////////////////////////////////////////////////////////////////////////////
51bncCaster::bncCaster(const QString& outFileName, int port) {
52
53 QSettings settings;
54
55 if ( !outFileName.isEmpty() ) {
56 QString lName = outFileName;
57 expandEnvVar(lName);
58 _outFile = new QFile(lName);
59 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
60 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
61 }
62 else {
63 _outFile->open(QIODevice::WriteOnly);
64 }
65 _out = new QTextStream(_outFile);
66 _out->setRealNumberNotation(QTextStream::FixedNotation);
67 _out->setRealNumberPrecision(5);
68 }
69 else {
70 _outFile = 0;
71 _out = 0;
72 }
73
74 _port = port;
75
76 if (_port != 0) {
77 _server = new QTcpServer;
78 _server->listen(QHostAddress::Any, _port);
79 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
80 _sockets = new QList<QTcpSocket*>;
81 }
82 else {
83 _server = 0;
84 _sockets = 0;
85 }
86
87 _epochs = new QMultiMap<long, Observation*>;
88
89 _lastDumpSec = 0;
90
91 _samplingRate = settings.value("rnxSampl").toInt();
92 _waitTime = settings.value("waitTime").toInt();
93 if (_waitTime < 1) {
94 _waitTime = 1;
95 }
96}
97
98// Destructor
99////////////////////////////////////////////////////////////////////////////
100bncCaster::~bncCaster() {
101 QListIterator<bncGetThread*> it(_threads);
102 while(it.hasNext()){
103 bncGetThread* thread = it.next();
104 thread->terminate();
105 thread->wait();
106 delete thread;
107 }
108 delete _out;
109 delete _outFile;
110 delete _server;
111 delete _sockets;
112 delete _epochs;
113}
114
115// New Observations
116////////////////////////////////////////////////////////////////////////////
117void bncCaster::newObs(const QByteArray& staID, const QUrl& mountPoint,
118 bool firstObs, Observation* obs,
119 const QByteArray& format) {
120
121 QMutexLocker locker(&_mutex);
122
123 long iSec = long(floor(obs->GPSWeeks+0.5));
124 long newTime = obs->GPSWeek * 7*24*3600 + iSec;
125
126 // Rename the Station
127 // ------------------
128 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
129 obs->StatID[sizeof(obs->StatID)-1] = '\0';
130
131 // Prepare RINEX Output
132 // --------------------
133 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
134 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID,
135 mountPoint, format));
136 }
137 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
138 if (_samplingRate == 0 || iSec % _samplingRate == 0) {
139 rnx->deepCopy(obs);
140 }
141 rnx->dumpEpoch(newTime);
142
143 // First time, set the _lastDumpSec immediately
144 // --------------------------------------------
145 if (_lastDumpSec == 0) {
146 _lastDumpSec = newTime - 1;
147 }
148
149 // An old observation - throw it away
150 // ----------------------------------
151 if (newTime <= _lastDumpSec) {
152 if (firstObs) {
153 QSettings settings;
154 if ( !settings.value("outFile").toString().isEmpty() ||
155 !settings.value("outPort").toString().isEmpty() ) {
156 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
157 .arg(staID.data()).arg(iSec).toAscii()) );
158 }
159 }
160 delete obs;
161 return;
162 }
163
164 // Save the observation
165 // --------------------
166 _epochs->insert(newTime, obs);
167
168 // Dump older epochs
169 // -----------------
170 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
171
172 if (_lastDumpSec < newTime - _waitTime) {
173 _lastDumpSec = newTime - _waitTime;
174 }
175}
176
177// New Connection
178////////////////////////////////////////////////////////////////////////////
179void bncCaster::slotNewConnection() {
180 _sockets->push_back( _server->nextPendingConnection() );
181}
182
183// Add New Thread
184////////////////////////////////////////////////////////////////////////////
185void bncCaster::addGetThread(bncGetThread* getThread) {
186 connect(getThread, SIGNAL(error(const QByteArray&)),
187 this, SLOT(slotGetThreadError(const QByteArray&)));
188
189 _staIDs.push_back(getThread->staID());
190 _threads.push_back(getThread);
191}
192
193// Error in get thread
194////////////////////////////////////////////////////////////////////////////
195void bncCaster::slotGetThreadError(const QByteArray& staID) {
196 QMutexLocker locker(&_mutex);
197 _staIDs.removeAll(staID);
198 emit( newMessage(
199 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
200 if (_staIDs.size() == 0) {
201 emit(newMessage("bncCaster:: last get thread terminated"));
202 emit getThreadErrors();
203 }
204}
205
206// Dump Complete Epochs
207////////////////////////////////////////////////////////////////////////////
208void bncCaster::dumpEpochs(long minTime, long maxTime) {
209
210 const char begEpoch = 'A';
211 const char begObs = 'B';
212 const char endEpoch = 'C';
213
214 for (long sec = minTime; sec <= maxTime; sec++) {
215
216 bool first = true;
217 QList<Observation*> allObs = _epochs->values(sec);
218 QListIterator<Observation*> it(allObs);
219 while (it.hasNext()) {
220 Observation* obs = it.next();
221
222 if (_samplingRate == 0 || sec % _samplingRate == 0) {
223
224 // Output into the file
225 // --------------------
226 if (_out) {
227 if (first) {
228 *_out << begEpoch << endl;;
229 }
230 *_out << obs->StatID << " "
231 << obs->SVPRN << " "
232 << obs->GPSWeek << " "
233 << obs->GPSWeeks << " "
234 << obs->C1 << " "
235 << obs->P1 << " "
236 << obs->P2 << " "
237 << obs->L1 << " "
238 << obs->L2 << " "
239 << obs->SNR1 << " "
240 << obs->SNR2 << endl;
241 if (!it.hasNext()) {
242 *_out << endEpoch << endl;
243 }
244 }
245
246 // Output into the socket
247 // ----------------------
248 if (_sockets) {
249 int numBytes = sizeof(*obs);
250 QListIterator<QTcpSocket*> is(*_sockets);
251 while (is.hasNext()) {
252 QTcpSocket* sock = is.next();
253 if (first) {
254 sock->write(&begEpoch, 1);
255 }
256 sock->write(&begObs, 1);
257 sock->write((char*) obs, numBytes);
258 if (!it.hasNext()) {
259 sock->write(&endEpoch, 1);
260 }
261 }
262 }
263 }
264
265 delete obs;
266 _epochs->remove(sec);
267 first = false;
268 }
269 }
270}
Note: See TracBrowser for help on using the repository browser.