source: ntrip/trunk/BNC/bnccaster.cpp@ 265

Last change on this file since 265 was 258, checked in by mervart, 18 years ago

* empty log message *

File size: 6.4 KB
RevLine 
[35]1
2/* -------------------------------------------------------------------------
[93]3 * BKG NTRIP Client
[35]4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
[222]18#include <math.h>
19
[35]20#include "bnccaster.h"
21#include "bncgetthread.h"
[134]22#include "bncutils.h"
[207]23#include "RTCM/GPSDecoder.h"
[35]24
25// Constructor
26////////////////////////////////////////////////////////////////////////////
27bncCaster::bncCaster(const QString& outFileName, int port) {
28
29 if ( !outFileName.isEmpty() ) {
[134]30 QString lName = outFileName;
31 expandEnvVar(lName);
32 _outFile = new QFile(lName);
[35]33 _outFile->open(QIODevice::WriteOnly);
34 _out = new QTextStream(_outFile);
35 _out->setRealNumberNotation(QTextStream::FixedNotation);
36 _out->setRealNumberPrecision(5);
37 }
38 else {
39 _outFile = 0;
40 _out = 0;
41 }
42
43 _port = port;
44
45 if (_port != 0) {
46 _server = new QTcpServer;
47 _server->listen(QHostAddress::Any, _port);
48 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
49 _sockets = new QList<QTcpSocket*>;
50 }
51 else {
52 _server = 0;
53 _sockets = 0;
54 }
55
56 _epochs = new QMultiMap<long, Observation*>;
57
58 _lastDumpSec = 0;
[133]59
60 QSettings settings;
61 _samplingRate = settings.value("rnxSampl").toInt();
[136]62 _waitTime = settings.value("waitTime").toInt();
[139]63 if (_waitTime < 1) {
64 _waitTime = 1;
[136]65 }
[35]66}
67
68// Destructor
69////////////////////////////////////////////////////////////////////////////
70bncCaster::~bncCaster() {
[186]71 QListIterator<bncGetThread*> it(_threads);
72 while(it.hasNext()){
73 bncGetThread* thread = it.next();
74 thread->terminate();
[188]75 thread->wait();
[230]76 delete thread;
[186]77 }
[35]78 delete _out;
79 delete _outFile;
[187]80 delete _server;
81 delete _sockets;
82 delete _epochs;
[35]83}
84
85// New Observations
86////////////////////////////////////////////////////////////////////////////
[256]87void bncCaster::newObs(const QByteArray& staID, const QUrl& mountPoint,
[257]88 bool firstObs, Observation* obs) {
[35]89
[243]90 QMutexLocker locker(&_mutex);
91
[222]92 long iSec = long(floor(obs->GPSWeeks+0.5));
93 long newTime = obs->GPSWeek * 7*24*3600 + iSec;
[35]94
[160]95 // Rename the Station
96 // ------------------
97 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
98
99 // Prepare RINEX Output
100 // --------------------
101 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
[256]102 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID, mountPoint));
[160]103 }
104 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
[222]105 if (_samplingRate == 0 || iSec % _samplingRate == 0) {
[165]106 rnx->deepCopy(obs);
107 }
[160]108 rnx->dumpEpoch(newTime);
109
[35]110 // First time, set the _lastDumpSec immediately
111 // --------------------------------------------
112 if (_lastDumpSec == 0) {
113 _lastDumpSec = newTime - 1;
114 }
115
116 // An old observation - throw it away
117 // ----------------------------------
118 if (newTime <= _lastDumpSec) {
[257]119 if (firstObs) {
120 QSettings settings;
121 if ( !settings.value("outFile").toString().isEmpty() ||
122 !settings.value("outPort").toString().isEmpty() ) {
[258]123 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
124 .arg(staID.data()).arg(iSec).toAscii()) );
[257]125 }
[181]126 }
[35]127 delete obs;
128 return;
129 }
130
[160]131 // Save the observation
132 // --------------------
[35]133 _epochs->insert(newTime, obs);
134
135 // Dump older epochs
136 // -----------------
[136]137 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
[252]138
139 if (_lastDumpSec < newTime - _waitTime) {
140 _lastDumpSec = newTime - _waitTime;
141 }
[35]142}
143
144// New Connection
145////////////////////////////////////////////////////////////////////////////
146void bncCaster::slotNewConnection() {
147 _sockets->push_back( _server->nextPendingConnection() );
148}
149
150// Add New Thread
151////////////////////////////////////////////////////////////////////////////
152void bncCaster::addGetThread(bncGetThread* getThread) {
153 connect(getThread, SIGNAL(error(const QByteArray&)),
154 this, SLOT(slotGetThreadError(const QByteArray&)));
155
[88]156 _staIDs.push_back(getThread->staID());
[186]157 _threads.push_back(getThread);
[35]158}
159
160// Error in get thread
161////////////////////////////////////////////////////////////////////////////
[88]162void bncCaster::slotGetThreadError(const QByteArray& staID) {
[243]163 QMutexLocker locker(&_mutex);
[88]164 _staIDs.removeAll(staID);
[82]165 emit( newMessage(
[88]166 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
167 if (_staIDs.size() == 0) {
[82]168 emit(newMessage("bncCaster:: last get thread terminated"));
[35]169 emit getThreadErrors();
170 }
171}
172
173// Dump Complete Epochs
174////////////////////////////////////////////////////////////////////////////
175void bncCaster::dumpEpochs(long minTime, long maxTime) {
176
177 const char begEpoch = 'A';
178 const char begObs = 'B';
179 const char endEpoch = 'C';
180
181 for (long sec = minTime; sec <= maxTime; sec++) {
[140]182
[35]183 bool first = true;
184 QList<Observation*> allObs = _epochs->values(sec);
185 QListIterator<Observation*> it(allObs);
186 while (it.hasNext()) {
187 Observation* obs = it.next();
188
[140]189 if (_samplingRate == 0 || sec % _samplingRate == 0) {
190
191 // Output into the file
192 // --------------------
193 if (_out) {
[35]194 if (first) {
[140]195 *_out << begEpoch << endl;;
[35]196 }
[222]197 *_out << obs->StatID << " "
198 << obs->SVPRN << " "
199 << obs->GPSWeek << " "
200 << obs->GPSWeeks << " "
201 << obs->C1 << " "
202 << obs->P1 << " "
203 << obs->P2 << " "
204 << obs->L1 << " "
205 << obs->L2 << endl;
[35]206 if (!it.hasNext()) {
[140]207 *_out << endEpoch << endl;
[35]208 }
209 }
[140]210
211 // Output into the socket
212 // ----------------------
213 if (_sockets) {
214 int numBytes = sizeof(*obs);
215 QListIterator<QTcpSocket*> is(*_sockets);
216 while (is.hasNext()) {
217 QTcpSocket* sock = is.next();
218 if (first) {
219 sock->write(&begEpoch, 1);
220 }
221 sock->write(&begObs, 1);
222 sock->write((char*) obs, numBytes);
223 if (!it.hasNext()) {
224 sock->write(&endEpoch, 1);
225 }
226 }
227 }
[73]228 }
229
[35]230 delete obs;
231 _epochs->remove(sec);
232 first = false;
233 }
234 }
235}
Note: See TracBrowser for help on using the repository browser.