source: ntrip/trunk/BNC/bnccaster.cpp@ 186

Last change on this file since 186 was 186, checked in by mervart, 18 years ago

* empty log message *

File size: 6.4 KB
Line 
1
2/* -------------------------------------------------------------------------
3 * BKG NTRIP Client
4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include "bnccaster.h"
19#include "bncgetthread.h"
20#include "bncutils.h"
21#include "RTCM/RTCM.h"
22
23// Constructor
24////////////////////////////////////////////////////////////////////////////
25bncCaster::bncCaster(const QString& outFileName, int port) {
26
27 if ( !outFileName.isEmpty() ) {
28 QString lName = outFileName;
29 expandEnvVar(lName);
30 _outFile = new QFile(lName);
31 _outFile->open(QIODevice::WriteOnly);
32 _out = new QTextStream(_outFile);
33 _out->setRealNumberNotation(QTextStream::FixedNotation);
34 _out->setRealNumberPrecision(5);
35 }
36 else {
37 _outFile = 0;
38 _out = 0;
39 }
40
41 _port = port;
42
43 if (_port != 0) {
44 _server = new QTcpServer;
45 _server->listen(QHostAddress::Any, _port);
46 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
47 _sockets = new QList<QTcpSocket*>;
48 }
49 else {
50 _server = 0;
51 _sockets = 0;
52 }
53
54 _epochs = new QMultiMap<long, Observation*>;
55
56 _lastDumpSec = 0;
57
58 QSettings settings;
59 _samplingRate = settings.value("rnxSampl").toInt();
60 _waitTime = settings.value("waitTime").toInt();
61 if (_waitTime < 1) {
62 _waitTime = 1;
63 }
64}
65
66// Destructor
67////////////////////////////////////////////////////////////////////////////
68bncCaster::~bncCaster() {
69 QListIterator<bncGetThread*> it(_threads);
70 while(it.hasNext()){
71 bncGetThread* thread = it.next();
72 thread->terminate();
73 }
74 delete _out;
75 delete _outFile;
76}
77
78// Run
79////////////////////////////////////////////////////////////////////////////
80void bncCaster::run() {
81 exec();
82}
83
84// New Observations
85////////////////////////////////////////////////////////////////////////////
86void bncCaster::slotNewObs(const QByteArray& staID, Observation* obs) {
87
88 long newTime = obs->GPSWeek * 7*24*3600 + obs->GPSWeeks;
89
90 // Rename the Station
91 // ------------------
92 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
93
94 // Prepare RINEX Output
95 // --------------------
96 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
97 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID));
98 }
99 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
100 if (_samplingRate == 0 || obs->GPSWeeks % _samplingRate == 0) {
101 rnx->deepCopy(obs);
102 }
103 rnx->dumpEpoch(newTime);
104
105 // First time, set the _lastDumpSec immediately
106 // --------------------------------------------
107 if (_lastDumpSec == 0) {
108 _lastDumpSec = newTime - 1;
109 }
110
111 // An old observation - throw it away
112 // ----------------------------------
113 if (newTime <= _lastDumpSec) {
114 QSettings settings;
115 if ( !settings.value("outFile").toString().isEmpty() ||
116 !settings.value("outPort").toString().isEmpty() ) {
117 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
118 .arg(staID.data()).arg(obs->GPSWeeks).toAscii()) );
119 }
120 delete obs;
121 return;
122 }
123
124 // Save the observation
125 // --------------------
126 _epochs->insert(newTime, obs);
127
128 // Dump older epochs
129 // -----------------
130 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
131 _lastDumpSec = newTime - _waitTime;
132}
133
134// New Connection
135////////////////////////////////////////////////////////////////////////////
136void bncCaster::slotNewConnection() {
137 _sockets->push_back( _server->nextPendingConnection() );
138}
139
140// Add New Thread
141////////////////////////////////////////////////////////////////////////////
142void bncCaster::addGetThread(bncGetThread* getThread) {
143 connect(getThread, SIGNAL(newObs(const QByteArray&, Observation*)),
144 this, SLOT(slotNewObs(const QByteArray&, Observation*)));
145
146 connect(getThread, SIGNAL(error(const QByteArray&)),
147 this, SLOT(slotGetThreadError(const QByteArray&)));
148
149 _staIDs.push_back(getThread->staID());
150 _threads.push_back(getThread);
151}
152
153// Error in get thread
154////////////////////////////////////////////////////////////////////////////
155void bncCaster::slotGetThreadError(const QByteArray& staID) {
156 _staIDs.removeAll(staID);
157 emit( newMessage(
158 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
159 if (_staIDs.size() == 0) {
160 emit(newMessage("bncCaster:: last get thread terminated"));
161 emit getThreadErrors();
162 }
163}
164
165// Dump Complete Epochs
166////////////////////////////////////////////////////////////////////////////
167void bncCaster::dumpEpochs(long minTime, long maxTime) {
168
169 const char begEpoch = 'A';
170 const char begObs = 'B';
171 const char endEpoch = 'C';
172
173 for (long sec = minTime; sec <= maxTime; sec++) {
174
175 bool first = true;
176 QList<Observation*> allObs = _epochs->values(sec);
177 QListIterator<Observation*> it(allObs);
178 while (it.hasNext()) {
179 Observation* obs = it.next();
180
181 if (_samplingRate == 0 || sec % _samplingRate == 0) {
182
183 // Output into the file
184 // --------------------
185 if (_out) {
186 if (first) {
187 *_out << begEpoch << endl;;
188 }
189 *_out << obs->StatID << " "
190 << (int) obs->SVPRN << " "
191 << (int) obs->GPSWeek << " "
192 << obs->GPSWeeks << " "
193 << obs->sec << " "
194 << obs->pCodeIndicator << " "
195 << obs->cumuLossOfCont << " "
196 << obs->C1 << " "
197 << obs->P2 << " "
198 << obs->L1 << " "
199 << obs->L2 << endl;
200 if (!it.hasNext()) {
201 *_out << endEpoch << endl;
202 }
203 }
204
205 // Output into the socket
206 // ----------------------
207 if (_sockets) {
208 int numBytes = sizeof(*obs);
209 QListIterator<QTcpSocket*> is(*_sockets);
210 while (is.hasNext()) {
211 QTcpSocket* sock = is.next();
212 if (first) {
213 sock->write(&begEpoch, 1);
214 }
215 sock->write(&begObs, 1);
216 sock->write((char*) obs, numBytes);
217 if (!it.hasNext()) {
218 sock->write(&endEpoch, 1);
219 }
220 }
221 }
222 }
223
224 delete obs;
225 _epochs->remove(sec);
226 first = false;
227 }
228 }
229}
Note: See TracBrowser for help on using the repository browser.