source: ntrip/trunk/BNC/bnccaster.cpp@ 133

Last change on this file since 133 was 133, checked in by mervart, 18 years ago

* empty log message *

File size: 6.0 KB
Line 
1
2/* -------------------------------------------------------------------------
3 * BKG NTRIP Client
4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include "bnccaster.h"
19#include "bncgetthread.h"
20#include "RTCM/RTCM.h"
21
22// Constructor
23////////////////////////////////////////////////////////////////////////////
24bncCaster::bncCaster(const QString& outFileName, int port) {
25
26 if ( !outFileName.isEmpty() ) {
27 _outFile = new QFile(outFileName);
28 _outFile->open(QIODevice::WriteOnly);
29 _out = new QTextStream(_outFile);
30 _out->setRealNumberNotation(QTextStream::FixedNotation);
31 _out->setRealNumberPrecision(5);
32 }
33 else {
34 _outFile = 0;
35 _out = 0;
36 }
37
38 _port = port;
39
40 if (_port != 0) {
41 _server = new QTcpServer;
42 _server->listen(QHostAddress::Any, _port);
43 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
44 _sockets = new QList<QTcpSocket*>;
45 }
46 else {
47 _server = 0;
48 _sockets = 0;
49 }
50
51 _epochs = new QMultiMap<long, Observation*>;
52
53 _lastDumpSec = 0;
54
55 QSettings settings;
56 _samplingRate = settings.value("rnxSampl").toInt();
57}
58
59// Destructor
60////////////////////////////////////////////////////////////////////////////
61bncCaster::~bncCaster() {
62 delete _out;
63 delete _outFile;
64}
65
66// Run
67////////////////////////////////////////////////////////////////////////////
68void bncCaster::run() {
69 exec();
70}
71
72// New Observations
73////////////////////////////////////////////////////////////////////////////
74void bncCaster::slotNewObs(const QByteArray& staID, Observation* obs) {
75
76 long newTime = obs->GPSWeek * 7*24*3600 + obs->GPSWeeks;
77
78 // First time, set the _lastDumpSec immediately
79 // --------------------------------------------
80 if (_lastDumpSec == 0) {
81 _lastDumpSec = newTime - 1;
82 }
83
84 // An old observation - throw it away
85 // ----------------------------------
86 if (newTime <= _lastDumpSec) {
87 delete obs;
88 return;
89 }
90
91 // Check the sampling rate
92 // -----------------------
93 if (_samplingRate != 0) {
94 if ( newTime % _samplingRate ) {
95 delete obs;
96 return;
97 }
98 }
99
100 // Rename the station and save the observation
101 // -------------------------------------------
102 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
103 _epochs->insert(newTime, obs);
104
105 // Dump older epochs
106 // -----------------
107 const long waitTime = 5;
108 dumpEpochs(_lastDumpSec + 1, newTime - waitTime);
109 _lastDumpSec = newTime - waitTime;
110}
111
112// New Connection
113////////////////////////////////////////////////////////////////////////////
114void bncCaster::slotNewConnection() {
115 _sockets->push_back( _server->nextPendingConnection() );
116}
117
118// Add New Thread
119////////////////////////////////////////////////////////////////////////////
120void bncCaster::addGetThread(bncGetThread* getThread) {
121 connect(getThread, SIGNAL(newObs(const QByteArray&, Observation*)),
122 this, SLOT(slotNewObs(const QByteArray&, Observation*)));
123
124 connect(getThread, SIGNAL(error(const QByteArray&)),
125 this, SLOT(slotGetThreadError(const QByteArray&)));
126
127 _staIDs.push_back(getThread->staID());
128}
129
130// Error in get thread
131////////////////////////////////////////////////////////////////////////////
132void bncCaster::slotGetThreadError(const QByteArray& staID) {
133 _staIDs.removeAll(staID);
134 emit( newMessage(
135 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
136 if (_staIDs.size() == 0) {
137 emit(newMessage("bncCaster:: last get thread terminated"));
138 emit getThreadErrors();
139 }
140}
141
142// Dump Complete Epochs
143////////////////////////////////////////////////////////////////////////////
144void bncCaster::dumpEpochs(long minTime, long maxTime) {
145
146 const char begEpoch = 'A';
147 const char begObs = 'B';
148 const char endEpoch = 'C';
149
150 for (long sec = minTime; sec <= maxTime; sec++) {
151
152 bool first = true;
153 QList<Observation*> allObs = _epochs->values(sec);
154 QListIterator<Observation*> it(allObs);
155 while (it.hasNext()) {
156 Observation* obs = it.next();
157
158 // Output into the file
159 // --------------------
160 if (_out) {
161 if (first) {
162 *_out << begEpoch << endl;;
163 }
164 *_out << obs->StatID << " "
165 << (int) obs->SVPRN << " "
166 << (int) obs->GPSWeek << " "
167 << obs->GPSWeeks << " "
168 << obs->sec << " "
169 << obs->pCodeIndicator << " "
170 << obs->cumuLossOfCont << " "
171 << obs->C1 << " "
172 << obs->P2 << " "
173 << obs->L1 << " "
174 << obs->L2 << endl;
175 if (!it.hasNext()) {
176 *_out << endEpoch << endl;
177 }
178 }
179
180 // Output into the socket
181 // ----------------------
182 if (_sockets) {
183 int numBytes = sizeof(*obs);
184 QListIterator<QTcpSocket*> is(*_sockets);
185 while (is.hasNext()) {
186 QTcpSocket* sock = is.next();
187 if (first) {
188 sock->write(&begEpoch, 1);
189 }
190 sock->write(&begObs, 1);
191 sock->write((char*) obs, numBytes);
192 if (!it.hasNext()) {
193 sock->write(&endEpoch, 1);
194 }
195 }
196 }
197
198 // Prepare RINEX Output
199 // --------------------
200 if (1) {
201 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
202 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID));
203 }
204 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
205 rnx->deepCopy(obs);
206 }
207
208 delete obs;
209 _epochs->remove(sec);
210 first = false;
211 }
212
213 // Write RINEX Files
214 // -----------------
215 QMapIterator<QString, bncRinex*> ir(_rinexWriters);
216 while (ir.hasNext()) {
217 bncRinex* rnx = ir.next().value();
218 rnx->dumpEpoch();
219 }
220 }
221}
Note: See TracBrowser for help on using the repository browser.