source: ntrip/trunk/BNC/bnccaster.cpp@ 145

Last change on this file since 145 was 142, checked in by mervart, 20 years ago

* empty log message *

File size: 6.2 KB
RevLine 
[35]1
2/* -------------------------------------------------------------------------
[93]3 * BKG NTRIP Client
[35]4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include "bnccaster.h"
19#include "bncgetthread.h"
[134]20#include "bncutils.h"
[35]21#include "RTCM/RTCM.h"
22
23// Constructor
24////////////////////////////////////////////////////////////////////////////
25bncCaster::bncCaster(const QString& outFileName, int port) {
26
27 if ( !outFileName.isEmpty() ) {
[134]28 QString lName = outFileName;
29 expandEnvVar(lName);
30 _outFile = new QFile(lName);
[35]31 _outFile->open(QIODevice::WriteOnly);
32 _out = new QTextStream(_outFile);
33 _out->setRealNumberNotation(QTextStream::FixedNotation);
34 _out->setRealNumberPrecision(5);
35 }
36 else {
37 _outFile = 0;
38 _out = 0;
39 }
40
41 _port = port;
42
43 if (_port != 0) {
44 _server = new QTcpServer;
45 _server->listen(QHostAddress::Any, _port);
46 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
47 _sockets = new QList<QTcpSocket*>;
48 }
49 else {
50 _server = 0;
51 _sockets = 0;
52 }
53
54 _epochs = new QMultiMap<long, Observation*>;
55
56 _lastDumpSec = 0;
[133]57
58 QSettings settings;
59 _samplingRate = settings.value("rnxSampl").toInt();
[136]60 _waitTime = settings.value("waitTime").toInt();
[139]61 if (_waitTime < 1) {
62 _waitTime = 1;
[136]63 }
[35]64}
65
66// Destructor
67////////////////////////////////////////////////////////////////////////////
68bncCaster::~bncCaster() {
69 delete _out;
70 delete _outFile;
71}
72
73// Run
74////////////////////////////////////////////////////////////////////////////
75void bncCaster::run() {
76 exec();
77}
78
79// New Observations
80////////////////////////////////////////////////////////////////////////////
[88]81void bncCaster::slotNewObs(const QByteArray& staID, Observation* obs) {
[35]82
83 long newTime = obs->GPSWeek * 7*24*3600 + obs->GPSWeeks;
84
85 // First time, set the _lastDumpSec immediately
86 // --------------------------------------------
87 if (_lastDumpSec == 0) {
88 _lastDumpSec = newTime - 1;
89 }
90
91 // An old observation - throw it away
92 // ----------------------------------
93 if (newTime <= _lastDumpSec) {
[137]94 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
95 .arg(staID.data()).arg(obs->GPSWeeks).toAscii()) );
[35]96 delete obs;
97 return;
98 }
99
100 // Rename the station and save the observation
101 // -------------------------------------------
[88]102 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
[35]103 _epochs->insert(newTime, obs);
104
105 // Dump older epochs
106 // -----------------
[136]107 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
108 _lastDumpSec = newTime - _waitTime;
[35]109}
110
111// New Connection
112////////////////////////////////////////////////////////////////////////////
113void bncCaster::slotNewConnection() {
114 _sockets->push_back( _server->nextPendingConnection() );
115}
116
117// Add New Thread
118////////////////////////////////////////////////////////////////////////////
119void bncCaster::addGetThread(bncGetThread* getThread) {
120 connect(getThread, SIGNAL(newObs(const QByteArray&, Observation*)),
121 this, SLOT(slotNewObs(const QByteArray&, Observation*)));
122
123 connect(getThread, SIGNAL(error(const QByteArray&)),
124 this, SLOT(slotGetThreadError(const QByteArray&)));
125
[88]126 _staIDs.push_back(getThread->staID());
[35]127}
128
129// Error in get thread
130////////////////////////////////////////////////////////////////////////////
[88]131void bncCaster::slotGetThreadError(const QByteArray& staID) {
132 _staIDs.removeAll(staID);
[82]133 emit( newMessage(
[88]134 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
135 if (_staIDs.size() == 0) {
[82]136 emit(newMessage("bncCaster:: last get thread terminated"));
[35]137 emit getThreadErrors();
138 }
139}
140
141// Dump Complete Epochs
142////////////////////////////////////////////////////////////////////////////
143void bncCaster::dumpEpochs(long minTime, long maxTime) {
144
145 const char begEpoch = 'A';
146 const char begObs = 'B';
147 const char endEpoch = 'C';
148
149 for (long sec = minTime; sec <= maxTime; sec++) {
[140]150
[35]151 bool first = true;
152 QList<Observation*> allObs = _epochs->values(sec);
153 QListIterator<Observation*> it(allObs);
154 while (it.hasNext()) {
155 Observation* obs = it.next();
156
[140]157 if (_samplingRate == 0 || sec % _samplingRate == 0) {
158
159 // Output into the file
160 // --------------------
161 if (_out) {
[35]162 if (first) {
[140]163 *_out << begEpoch << endl;;
[35]164 }
[140]165 *_out << obs->StatID << " "
166 << (int) obs->SVPRN << " "
167 << (int) obs->GPSWeek << " "
168 << obs->GPSWeeks << " "
169 << obs->sec << " "
170 << obs->pCodeIndicator << " "
171 << obs->cumuLossOfCont << " "
172 << obs->C1 << " "
173 << obs->P2 << " "
174 << obs->L1 << " "
175 << obs->L2 << endl;
[35]176 if (!it.hasNext()) {
[140]177 *_out << endEpoch << endl;
[35]178 }
179 }
[140]180
181 // Output into the socket
182 // ----------------------
183 if (_sockets) {
184 int numBytes = sizeof(*obs);
185 QListIterator<QTcpSocket*> is(*_sockets);
186 while (is.hasNext()) {
187 QTcpSocket* sock = is.next();
188 if (first) {
189 sock->write(&begEpoch, 1);
190 }
191 sock->write(&begObs, 1);
192 sock->write((char*) obs, numBytes);
193 if (!it.hasNext()) {
194 sock->write(&endEpoch, 1);
195 }
196 }
197 }
198
199 // Prepare RINEX Output
200 // --------------------
[73]201 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
202 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID));
203 }
204 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
205 rnx->deepCopy(obs);
206 }
207
[35]208 delete obs;
209 _epochs->remove(sec);
210 first = false;
211 }
[73]212
213 // Write RINEX Files
214 // -----------------
215 QMapIterator<QString, bncRinex*> ir(_rinexWriters);
216 while (ir.hasNext()) {
217 bncRinex* rnx = ir.next().value();
218 rnx->dumpEpoch();
219 }
[35]220 }
221}
Note: See TracBrowser for help on using the repository browser.