source: ntrip/trunk/BNC/bnccaster.cpp@ 251

Last change on this file since 251 was 251, checked in by mervart, 18 years ago

* empty log message *

File size: 6.3 KB
Line 
1
2/* -------------------------------------------------------------------------
3 * BKG NTRIP Client
4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include <math.h>
19
20#include "bnccaster.h"
21#include "bncgetthread.h"
22#include "bncutils.h"
23#include "RTCM/GPSDecoder.h"
24
25// Constructor
26////////////////////////////////////////////////////////////////////////////
27bncCaster::bncCaster(const QString& outFileName, int port) {
28
29 if ( !outFileName.isEmpty() ) {
30 QString lName = outFileName;
31 expandEnvVar(lName);
32 _outFile = new QFile(lName);
33 _outFile->open(QIODevice::WriteOnly);
34 _out = new QTextStream(_outFile);
35 _out->setRealNumberNotation(QTextStream::FixedNotation);
36 _out->setRealNumberPrecision(5);
37 }
38 else {
39 _outFile = 0;
40 _out = 0;
41 }
42
43 _port = port;
44
45 if (_port != 0) {
46 _server = new QTcpServer;
47 _server->listen(QHostAddress::Any, _port);
48 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
49 _sockets = new QList<QTcpSocket*>;
50 }
51 else {
52 _server = 0;
53 _sockets = 0;
54 }
55
56 _epochs = new QMultiMap<long, Observation*>;
57
58 _lastDumpSec = 0;
59
60 QSettings settings;
61 _samplingRate = settings.value("rnxSampl").toInt();
62 _waitTime = settings.value("waitTime").toInt();
63 if (_waitTime < 1) {
64 _waitTime = 1;
65 }
66}
67
68// Destructor
69////////////////////////////////////////////////////////////////////////////
70bncCaster::~bncCaster() {
71 QListIterator<bncGetThread*> it(_threads);
72 while(it.hasNext()){
73 bncGetThread* thread = it.next();
74 thread->terminate();
75 thread->wait();
76 delete thread;
77 }
78 delete _out;
79 delete _outFile;
80 delete _server;
81 delete _sockets;
82 delete _epochs;
83}
84
85// New Observations
86////////////////////////////////////////////////////////////////////////////
87void bncCaster::newObs(const QByteArray& staID, Observation* obs) {
88
89 QMutexLocker locker(&_mutex);
90
91 long iSec = long(floor(obs->GPSWeeks+0.5));
92 long newTime = obs->GPSWeek * 7*24*3600 + iSec;
93
94 // Rename the Station
95 // ------------------
96 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
97
98 // Prepare RINEX Output
99 // --------------------
100 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
101 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID));
102 }
103 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
104 if (_samplingRate == 0 || iSec % _samplingRate == 0) {
105 rnx->deepCopy(obs);
106 }
107 rnx->dumpEpoch(newTime);
108
109 // First time, set the _lastDumpSec immediately
110 // --------------------------------------------
111 if (_lastDumpSec == 0) {
112 _lastDumpSec = newTime - 1;
113 }
114
115 // An old observation - throw it away
116 // ----------------------------------
117 if (newTime <= _lastDumpSec) {
118 QSettings settings;
119 if ( !settings.value("outFile").toString().isEmpty() ||
120 !settings.value("outPort").toString().isEmpty() ) {
121 emit( newMessage(QString("Station %1: old epoch %2 thrown away"
122 "(newTime = %3 lastDump = %4)")
123 .arg(staID.data()).arg(iSec).arg(newTime).arg(_lastDumpSec).toAscii()) );
124 }
125 delete obs;
126 return;
127 }
128
129 // Save the observation
130 // --------------------
131 _epochs->insert(newTime, obs);
132
133 // Dump older epochs
134 // -----------------
135 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
136 _lastDumpSec = newTime - _waitTime;
137}
138
139// New Connection
140////////////////////////////////////////////////////////////////////////////
141void bncCaster::slotNewConnection() {
142 _sockets->push_back( _server->nextPendingConnection() );
143}
144
145// Add New Thread
146////////////////////////////////////////////////////////////////////////////
147void bncCaster::addGetThread(bncGetThread* getThread) {
148 connect(getThread, SIGNAL(error(const QByteArray&)),
149 this, SLOT(slotGetThreadError(const QByteArray&)));
150
151 _staIDs.push_back(getThread->staID());
152 _threads.push_back(getThread);
153}
154
155// Error in get thread
156////////////////////////////////////////////////////////////////////////////
157void bncCaster::slotGetThreadError(const QByteArray& staID) {
158 QMutexLocker locker(&_mutex);
159 _staIDs.removeAll(staID);
160 emit( newMessage(
161 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
162 if (_staIDs.size() == 0) {
163 emit(newMessage("bncCaster:: last get thread terminated"));
164 emit getThreadErrors();
165 }
166}
167
168// Dump Complete Epochs
169////////////////////////////////////////////////////////////////////////////
170void bncCaster::dumpEpochs(long minTime, long maxTime) {
171
172 const char begEpoch = 'A';
173 const char begObs = 'B';
174 const char endEpoch = 'C';
175
176 for (long sec = minTime; sec <= maxTime; sec++) {
177
178 bool first = true;
179 QList<Observation*> allObs = _epochs->values(sec);
180 QListIterator<Observation*> it(allObs);
181 while (it.hasNext()) {
182 Observation* obs = it.next();
183
184 if (_samplingRate == 0 || sec % _samplingRate == 0) {
185
186 // Output into the file
187 // --------------------
188 if (_out) {
189 if (first) {
190 *_out << begEpoch << endl;;
191 }
192 *_out << obs->StatID << " "
193 << obs->SVPRN << " "
194 << obs->GPSWeek << " "
195 << obs->GPSWeeks << " "
196 << obs->C1 << " "
197 << obs->P1 << " "
198 << obs->P2 << " "
199 << obs->L1 << " "
200 << obs->L2 << endl;
201 if (!it.hasNext()) {
202 *_out << endEpoch << endl;
203 }
204 }
205
206 // Output into the socket
207 // ----------------------
208 if (_sockets) {
209 int numBytes = sizeof(*obs);
210 QListIterator<QTcpSocket*> is(*_sockets);
211 while (is.hasNext()) {
212 QTcpSocket* sock = is.next();
213 if (first) {
214 sock->write(&begEpoch, 1);
215 }
216 sock->write(&begObs, 1);
217 sock->write((char*) obs, numBytes);
218 if (!it.hasNext()) {
219 sock->write(&endEpoch, 1);
220 }
221 }
222 }
223 }
224
225 delete obs;
226 _epochs->remove(sec);
227 first = false;
228 }
229 }
230}
Note: See TracBrowser for help on using the repository browser.