source: ntrip/trunk/BNC/bnccaster.cpp@ 135

Last change on this file since 135 was 135, checked in by mervart, 18 years ago

* empty log message *

File size: 6.1 KB
Line 
1
2/* -------------------------------------------------------------------------
3 * BKG NTRIP Client
4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include "bnccaster.h"
19#include "bncgetthread.h"
20#include "bncutils.h"
21#include "RTCM/RTCM.h"
22
23// Constructor
24////////////////////////////////////////////////////////////////////////////
25bncCaster::bncCaster(const QString& outFileName, int port) {
26
27 if ( !outFileName.isEmpty() ) {
28 QString lName = outFileName;
29 expandEnvVar(lName);
30 _outFile = new QFile(lName);
31 _outFile->open(QIODevice::WriteOnly);
32 _out = new QTextStream(_outFile);
33 _out->setRealNumberNotation(QTextStream::FixedNotation);
34 _out->setRealNumberPrecision(5);
35 }
36 else {
37 _outFile = 0;
38 _out = 0;
39 }
40
41 _port = port;
42
43 if (_port != 0) {
44 _server = new QTcpServer;
45 _server->listen(QHostAddress::Any, _port);
46 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
47 _sockets = new QList<QTcpSocket*>;
48 }
49 else {
50 _server = 0;
51 _sockets = 0;
52 }
53
54 _epochs = new QMultiMap<long, Observation*>;
55
56 _lastDumpSec = 0;
57
58 QSettings settings;
59 _samplingRate = settings.value("rnxSampl").toInt();
60}
61
62// Destructor
63////////////////////////////////////////////////////////////////////////////
64bncCaster::~bncCaster() {
65 delete _out;
66 delete _outFile;
67}
68
69// Run
70////////////////////////////////////////////////////////////////////////////
71void bncCaster::run() {
72 exec();
73}
74
75// New Observations
76////////////////////////////////////////////////////////////////////////////
77void bncCaster::slotNewObs(const QByteArray& staID, Observation* obs) {
78
79 long newTime = obs->GPSWeek * 7*24*3600 + obs->GPSWeeks;
80
81 // First time, set the _lastDumpSec immediately
82 // --------------------------------------------
83 if (_lastDumpSec == 0) {
84 _lastDumpSec = newTime - 1;
85 }
86
87 // An old observation - throw it away
88 // ----------------------------------
89 if (newTime <= _lastDumpSec) {
90 emit( newMessage("Old Epochs thrown away: " + staID) );
91 delete obs;
92 return;
93 }
94
95 // Check the sampling rate
96 // -----------------------
97 if (_samplingRate != 0) {
98 if ( newTime % _samplingRate ) {
99 delete obs;
100 return;
101 }
102 }
103
104 // Rename the station and save the observation
105 // -------------------------------------------
106 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
107 _epochs->insert(newTime, obs);
108
109 // Dump older epochs
110 // -----------------
111 const long waitTime = 2;
112 dumpEpochs(_lastDumpSec + 1, newTime - waitTime);
113 _lastDumpSec = newTime - waitTime;
114}
115
116// New Connection
117////////////////////////////////////////////////////////////////////////////
118void bncCaster::slotNewConnection() {
119 _sockets->push_back( _server->nextPendingConnection() );
120}
121
122// Add New Thread
123////////////////////////////////////////////////////////////////////////////
124void bncCaster::addGetThread(bncGetThread* getThread) {
125 connect(getThread, SIGNAL(newObs(const QByteArray&, Observation*)),
126 this, SLOT(slotNewObs(const QByteArray&, Observation*)));
127
128 connect(getThread, SIGNAL(error(const QByteArray&)),
129 this, SLOT(slotGetThreadError(const QByteArray&)));
130
131 _staIDs.push_back(getThread->staID());
132}
133
134// Error in get thread
135////////////////////////////////////////////////////////////////////////////
136void bncCaster::slotGetThreadError(const QByteArray& staID) {
137 _staIDs.removeAll(staID);
138 emit( newMessage(
139 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
140 if (_staIDs.size() == 0) {
141 emit(newMessage("bncCaster:: last get thread terminated"));
142 emit getThreadErrors();
143 }
144}
145
146// Dump Complete Epochs
147////////////////////////////////////////////////////////////////////////////
148void bncCaster::dumpEpochs(long minTime, long maxTime) {
149
150 const char begEpoch = 'A';
151 const char begObs = 'B';
152 const char endEpoch = 'C';
153
154 for (long sec = minTime; sec <= maxTime; sec++) {
155
156 bool first = true;
157 QList<Observation*> allObs = _epochs->values(sec);
158 QListIterator<Observation*> it(allObs);
159 while (it.hasNext()) {
160 Observation* obs = it.next();
161
162 // Output into the file
163 // --------------------
164 if (_out) {
165 if (first) {
166 *_out << begEpoch << endl;;
167 }
168 *_out << obs->StatID << " "
169 << (int) obs->SVPRN << " "
170 << (int) obs->GPSWeek << " "
171 << obs->GPSWeeks << " "
172 << obs->sec << " "
173 << obs->pCodeIndicator << " "
174 << obs->cumuLossOfCont << " "
175 << obs->C1 << " "
176 << obs->P2 << " "
177 << obs->L1 << " "
178 << obs->L2 << endl;
179 if (!it.hasNext()) {
180 *_out << endEpoch << endl;
181 }
182 }
183
184 // Output into the socket
185 // ----------------------
186 if (_sockets) {
187 int numBytes = sizeof(*obs);
188 QListIterator<QTcpSocket*> is(*_sockets);
189 while (is.hasNext()) {
190 QTcpSocket* sock = is.next();
191 if (first) {
192 sock->write(&begEpoch, 1);
193 }
194 sock->write(&begObs, 1);
195 sock->write((char*) obs, numBytes);
196 if (!it.hasNext()) {
197 sock->write(&endEpoch, 1);
198 }
199 }
200 }
201
202 // Prepare RINEX Output
203 // --------------------
204 if (1) {
205 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
206 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID));
207 }
208 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
209 rnx->deepCopy(obs);
210 }
211
212 delete obs;
213 _epochs->remove(sec);
214 first = false;
215 }
216
217 // Write RINEX Files
218 // -----------------
219 QMapIterator<QString, bncRinex*> ir(_rinexWriters);
220 while (ir.hasNext()) {
221 bncRinex* rnx = ir.next().value();
222 rnx->dumpEpoch();
223 }
224 }
225}
Note: See TracBrowser for help on using the repository browser.