source: ntrip/trunk/BNC/bnccaster.cpp@ 275

Last change on this file since 275 was 275, checked in by mervart, 17 years ago

* empty log message *

File size: 6.6 KB
Line 
1
2/* -------------------------------------------------------------------------
3 * BKG NTRIP Client
4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include <math.h>
19
20#include "bnccaster.h"
21#include "bncgetthread.h"
22#include "bncutils.h"
23#include "RTCM/GPSDecoder.h"
24
25// Constructor
26////////////////////////////////////////////////////////////////////////////
27bncCaster::bncCaster(const QString& outFileName, int port) {
28
29 QSettings settings;
30
31 if ( !outFileName.isEmpty() ) {
32 QString lName = outFileName;
33 expandEnvVar(lName);
34 _outFile = new QFile(lName);
35 if ( Qt::CheckState(settings.value("rnxAppend").toInt()) == Qt::Checked) {
36 _outFile->open(QIODevice::WriteOnly | QIODevice::Append);
37 }
38 else {
39 _outFile->open(QIODevice::WriteOnly);
40 }
41 _out = new QTextStream(_outFile);
42 _out->setRealNumberNotation(QTextStream::FixedNotation);
43 _out->setRealNumberPrecision(5);
44 }
45 else {
46 _outFile = 0;
47 _out = 0;
48 }
49
50 _port = port;
51
52 if (_port != 0) {
53 _server = new QTcpServer;
54 _server->listen(QHostAddress::Any, _port);
55 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
56 _sockets = new QList<QTcpSocket*>;
57 }
58 else {
59 _server = 0;
60 _sockets = 0;
61 }
62
63 _epochs = new QMultiMap<long, Observation*>;
64
65 _lastDumpSec = 0;
66
67 _samplingRate = settings.value("rnxSampl").toInt();
68 _waitTime = settings.value("waitTime").toInt();
69 if (_waitTime < 1) {
70 _waitTime = 1;
71 }
72}
73
74// Destructor
75////////////////////////////////////////////////////////////////////////////
76bncCaster::~bncCaster() {
77 QListIterator<bncGetThread*> it(_threads);
78 while(it.hasNext()){
79 bncGetThread* thread = it.next();
80 thread->terminate();
81 thread->wait();
82 delete thread;
83 }
84 delete _out;
85 delete _outFile;
86 delete _server;
87 delete _sockets;
88 delete _epochs;
89}
90
91// New Observations
92////////////////////////////////////////////////////////////////////////////
93void bncCaster::newObs(const QByteArray& staID, const QUrl& mountPoint,
94 bool firstObs, Observation* obs) {
95
96 QMutexLocker locker(&_mutex);
97
98 long iSec = long(floor(obs->GPSWeeks+0.5));
99 long newTime = obs->GPSWeek * 7*24*3600 + iSec;
100
101 // Rename the Station
102 // ------------------
103 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
104
105 // Prepare RINEX Output
106 // --------------------
107 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
108 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID, mountPoint));
109 }
110 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
111 if (_samplingRate == 0 || iSec % _samplingRate == 0) {
112 rnx->deepCopy(obs);
113 }
114 rnx->dumpEpoch(newTime);
115
116 // First time, set the _lastDumpSec immediately
117 // --------------------------------------------
118 if (_lastDumpSec == 0) {
119 _lastDumpSec = newTime - 1;
120 }
121
122 // An old observation - throw it away
123 // ----------------------------------
124 if (newTime <= _lastDumpSec) {
125 if (firstObs) {
126 QSettings settings;
127 if ( !settings.value("outFile").toString().isEmpty() ||
128 !settings.value("outPort").toString().isEmpty() ) {
129 emit( newMessage(QString("Station %1: old epoch %2 thrown away")
130 .arg(staID.data()).arg(iSec).toAscii()) );
131 }
132 }
133 delete obs;
134 return;
135 }
136
137 // Save the observation
138 // --------------------
139 _epochs->insert(newTime, obs);
140
141 // Dump older epochs
142 // -----------------
143 dumpEpochs(_lastDumpSec + 1, newTime - _waitTime);
144
145 if (_lastDumpSec < newTime - _waitTime) {
146 _lastDumpSec = newTime - _waitTime;
147 }
148}
149
150// New Connection
151////////////////////////////////////////////////////////////////////////////
152void bncCaster::slotNewConnection() {
153 _sockets->push_back( _server->nextPendingConnection() );
154}
155
156// Add New Thread
157////////////////////////////////////////////////////////////////////////////
158void bncCaster::addGetThread(bncGetThread* getThread) {
159 connect(getThread, SIGNAL(error(const QByteArray&)),
160 this, SLOT(slotGetThreadError(const QByteArray&)));
161
162 _staIDs.push_back(getThread->staID());
163 _threads.push_back(getThread);
164}
165
166// Error in get thread
167////////////////////////////////////////////////////////////////////////////
168void bncCaster::slotGetThreadError(const QByteArray& staID) {
169 QMutexLocker locker(&_mutex);
170 _staIDs.removeAll(staID);
171 emit( newMessage(
172 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
173 if (_staIDs.size() == 0) {
174 emit(newMessage("bncCaster:: last get thread terminated"));
175 emit getThreadErrors();
176 }
177}
178
179// Dump Complete Epochs
180////////////////////////////////////////////////////////////////////////////
181void bncCaster::dumpEpochs(long minTime, long maxTime) {
182
183 const char begEpoch = 'A';
184 const char begObs = 'B';
185 const char endEpoch = 'C';
186
187 for (long sec = minTime; sec <= maxTime; sec++) {
188
189 bool first = true;
190 QList<Observation*> allObs = _epochs->values(sec);
191 QListIterator<Observation*> it(allObs);
192 while (it.hasNext()) {
193 Observation* obs = it.next();
194
195 if (_samplingRate == 0 || sec % _samplingRate == 0) {
196
197 // Output into the file
198 // --------------------
199 if (_out) {
200 if (first) {
201 *_out << begEpoch << endl;;
202 }
203 *_out << obs->StatID << " "
204 << obs->SVPRN << " "
205 << obs->GPSWeek << " "
206 << obs->GPSWeeks << " "
207 << obs->C1 << " "
208 << obs->P1 << " "
209 << obs->P2 << " "
210 << obs->L1 << " "
211 << obs->L2 << " "
212 << obs->SNR1 << " "
213 << obs->SNR2 << endl;
214 if (!it.hasNext()) {
215 *_out << endEpoch << endl;
216 }
217 }
218
219 // Output into the socket
220 // ----------------------
221 if (_sockets) {
222 int numBytes = sizeof(*obs);
223 QListIterator<QTcpSocket*> is(*_sockets);
224 while (is.hasNext()) {
225 QTcpSocket* sock = is.next();
226 if (first) {
227 sock->write(&begEpoch, 1);
228 }
229 sock->write(&begObs, 1);
230 sock->write((char*) obs, numBytes);
231 if (!it.hasNext()) {
232 sock->write(&endEpoch, 1);
233 }
234 }
235 }
236 }
237
238 delete obs;
239 _epochs->remove(sec);
240 first = false;
241 }
242 }
243}
Note: See TracBrowser for help on using the repository browser.