source: ntrip/trunk/BNC/bnccaster.cpp@ 203

Last change on this file since 203 was 191, checked in by mervart, 18 years ago

* empty log message *

File size: 6.5 KB
Line 
1
2/* -------------------------------------------------------------------------
3 * BKG NTRIP Client
4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include "bnccaster.h"
19#include "bncgetthread.h"
20#include "bncutils.h"
21#include "RTCM/RTCM.h"
22
23// Constructor
24////////////////////////////////////////////////////////////////////////////
25bncCaster::bncCaster(const QString& outFileName, int port) {
26
27 if ( !outFileName.isEmpty() ) {
28 QString lName = outFileName;
29 expandEnvVar(lName);
30 _outFile = new QFile(lName);
31 _outFile->open(QIODevice::WriteOnly);
32 _out = new QTextStream(_outFile);
33 _out->setRealNumberNotation(QTextStream::FixedNotation);
34 _out->setRealNumberPrecision(5);
35 }
36 else {
37 _outFile = 0;
38 _out = 0;
39 }
40
41 _port = port;
42
43 if (_port != 0) {
44 _server = new QTcpServer;
45 _server->listen(QHostAddress::Any, _port);
46 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
47 _sockets = new QList<QTcpSocket*>;
48 }
49 else {
50 _server = 0;
51 _sockets = 0;
52 }
53
54 _epochs = new QMultiMap<long, Observation*>;
55
56 _lastDumpSec = 0;
57
58 QSettings settings;
59 _samplingRate = settings.value("rnxSampl").toInt();
60 _waitTime = settings.value("waitTime").toInt();
61 if (_waitTime < 1) {
62 _waitTime = 1;
63 }
64}
65
66// Destructor
67////////////////////////////////////////////////////////////////////////////
68bncCaster::~bncCaster() {
69 QListIterator<bncGetThread*> it(_threads);
70 while(it.hasNext()){
71 bncGetThread* thread = it.next();
72 thread->terminate();
73 thread->wait();
74 //// delete thread;
75 }
76 delete _out;
77 delete _outFile;
78 delete _server;
79 delete _sockets;
80 delete _epochs;
81}
82
83// Run
84////////////////////////////////////////////////////////////////////////////
85void bncCaster::run() {
86 exec();
87}
88
89// New Observations
90////////////////////////////////////////////////////////////////////////////
91void bncCaster::slotNewObs(const QByteArray& staID, Observation* obs) {
92
93 long newTime = obs->GPSWeek * 7*24*3600 + obs->GPSWeeks;
94
95 // Rename the Station
96 // ------------------
97 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
98
99 // Prepare RINEX Output
100 // --------------------
101 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
102 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID));
103 }
104 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
105 if (_samplingRate == 0 || obs->GPSWeeks % _samplingRate == 0) {
106 rnx->deepCopy(obs);
107 }
108 rnx->dumpEpoch(newTime);
109
110 // First time, set the _lastDumpSec immediately
111 // --------------------------------------------
112 if (_lastDumpSec == 0) {
113 _lastDumpSec = newTime - 1;
114 }
115
116 // An old observation - throw it away
117 // ----------------------------------
118 if (newTime <= _lastDumpSec) {
119 QSettings settings;
120 if ( !settings.value("outFile").toString().isEmpty() ||
121 !settings.value("outPort").toString().isEmpty() ) {
122 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
123 .arg(staID.data()).arg(obs->GPSWeeks).toAscii()) );
124 }
125 delete obs;
126 return;
127 }
128
129 // Save the observation
130 // --------------------
131 _epochs->insert(newTime, obs);
132
133 // Dump older epochs
134 // -----------------
135 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
136 _lastDumpSec = newTime - _waitTime;
137}
138
139// New Connection
140////////////////////////////////////////////////////////////////////////////
141void bncCaster::slotNewConnection() {
142 _sockets->push_back( _server->nextPendingConnection() );
143}
144
145// Add New Thread
146////////////////////////////////////////////////////////////////////////////
147void bncCaster::addGetThread(bncGetThread* getThread) {
148 connect(getThread, SIGNAL(newObs(const QByteArray&, Observation*)),
149 this, SLOT(slotNewObs(const QByteArray&, Observation*)));
150
151 connect(getThread, SIGNAL(error(const QByteArray&)),
152 this, SLOT(slotGetThreadError(const QByteArray&)));
153
154 _staIDs.push_back(getThread->staID());
155 _threads.push_back(getThread);
156}
157
158// Error in get thread
159////////////////////////////////////////////////////////////////////////////
160void bncCaster::slotGetThreadError(const QByteArray& staID) {
161 _staIDs.removeAll(staID);
162 emit( newMessage(
163 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
164 if (_staIDs.size() == 0) {
165 emit(newMessage("bncCaster:: last get thread terminated"));
166 emit getThreadErrors();
167 }
168}
169
170// Dump Complete Epochs
171////////////////////////////////////////////////////////////////////////////
172void bncCaster::dumpEpochs(long minTime, long maxTime) {
173
174 const char begEpoch = 'A';
175 const char begObs = 'B';
176 const char endEpoch = 'C';
177
178 for (long sec = minTime; sec <= maxTime; sec++) {
179
180 bool first = true;
181 QList<Observation*> allObs = _epochs->values(sec);
182 QListIterator<Observation*> it(allObs);
183 while (it.hasNext()) {
184 Observation* obs = it.next();
185
186 if (_samplingRate == 0 || sec % _samplingRate == 0) {
187
188 // Output into the file
189 // --------------------
190 if (_out) {
191 if (first) {
192 *_out << begEpoch << endl;;
193 }
194 *_out << obs->StatID << " "
195 << (int) obs->SVPRN << " "
196 << (int) obs->GPSWeek << " "
197 << obs->GPSWeeks << " "
198 << obs->sec << " "
199 << obs->pCodeIndicator << " "
200 << obs->cumuLossOfCont << " "
201 << obs->C1 << " "
202 << obs->P2 << " "
203 << obs->L1 << " "
204 << obs->L2 << endl;
205 if (!it.hasNext()) {
206 *_out << endEpoch << endl;
207 }
208 }
209
210 // Output into the socket
211 // ----------------------
212 if (_sockets) {
213 int numBytes = sizeof(*obs);
214 QListIterator<QTcpSocket*> is(*_sockets);
215 while (is.hasNext()) {
216 QTcpSocket* sock = is.next();
217 if (first) {
218 sock->write(&begEpoch, 1);
219 }
220 sock->write(&begObs, 1);
221 sock->write((char*) obs, numBytes);
222 if (!it.hasNext()) {
223 sock->write(&endEpoch, 1);
224 }
225 }
226 }
227 }
228
229 delete obs;
230 _epochs->remove(sec);
231 first = false;
232 }
233 }
234}
Note: See TracBrowser for help on using the repository browser.