source: ntrip/trunk/BNC/bnccaster.cpp@ 105

Last change on this file since 105 was 93, checked in by mervart, 18 years ago

* empty log message *

File size: 5.7 KB
RevLine 
[35]1
2/* -------------------------------------------------------------------------
[93]3 * BKG NTRIP Client
[35]4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include "bnccaster.h"
19#include "bncgetthread.h"
20#include "RTCM/RTCM.h"
21
22// Constructor
23////////////////////////////////////////////////////////////////////////////
24bncCaster::bncCaster(const QString& outFileName, int port) {
25
26 if ( !outFileName.isEmpty() ) {
27 _outFile = new QFile(outFileName);
28 _outFile->open(QIODevice::WriteOnly);
29 _out = new QTextStream(_outFile);
30 _out->setRealNumberNotation(QTextStream::FixedNotation);
31 _out->setRealNumberPrecision(5);
32 }
33 else {
34 _outFile = 0;
35 _out = 0;
36 }
37
38 _port = port;
39
40 if (_port != 0) {
41 _server = new QTcpServer;
42 _server->listen(QHostAddress::Any, _port);
43 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
44 _sockets = new QList<QTcpSocket*>;
45 }
46 else {
47 _server = 0;
48 _sockets = 0;
49 }
50
51 _epochs = new QMultiMap<long, Observation*>;
52
53 _lastDumpSec = 0;
54}
55
56// Destructor
57////////////////////////////////////////////////////////////////////////////
58bncCaster::~bncCaster() {
59 delete _out;
60 delete _outFile;
61}
62
63// Run
64////////////////////////////////////////////////////////////////////////////
65void bncCaster::run() {
66 exec();
67}
68
69// New Observations
70////////////////////////////////////////////////////////////////////////////
[88]71void bncCaster::slotNewObs(const QByteArray& staID, Observation* obs) {
[35]72
73 long newTime = obs->GPSWeek * 7*24*3600 + obs->GPSWeeks;
74
75 // First time, set the _lastDumpSec immediately
76 // --------------------------------------------
77 if (_lastDumpSec == 0) {
78 _lastDumpSec = newTime - 1;
79 }
80
81 // An old observation - throw it away
82 // ----------------------------------
83 if (newTime <= _lastDumpSec) {
84 delete obs;
85 return;
86 }
87
88 // Rename the station and save the observation
89 // -------------------------------------------
[88]90 strncpy(obs->StatID, staID.constData(),sizeof(obs->StatID));
[35]91 _epochs->insert(newTime, obs);
92
93 // Dump older epochs
94 // -----------------
95 const long waitTime = 5;
96 dumpEpochs(_lastDumpSec + 1, newTime - waitTime);
97 _lastDumpSec = newTime - waitTime;
98}
99
100// New Connection
101////////////////////////////////////////////////////////////////////////////
102void bncCaster::slotNewConnection() {
103 _sockets->push_back( _server->nextPendingConnection() );
104}
105
106// Add New Thread
107////////////////////////////////////////////////////////////////////////////
108void bncCaster::addGetThread(bncGetThread* getThread) {
109 connect(getThread, SIGNAL(newObs(const QByteArray&, Observation*)),
110 this, SLOT(slotNewObs(const QByteArray&, Observation*)));
111
112 connect(getThread, SIGNAL(error(const QByteArray&)),
113 this, SLOT(slotGetThreadError(const QByteArray&)));
114
[88]115 _staIDs.push_back(getThread->staID());
[35]116}
117
118// Error in get thread
119////////////////////////////////////////////////////////////////////////////
[88]120void bncCaster::slotGetThreadError(const QByteArray& staID) {
121 _staIDs.removeAll(staID);
[82]122 emit( newMessage(
[88]123 QString("Mountpoint size %1").arg(_staIDs.size()).toAscii()) );
124 if (_staIDs.size() == 0) {
[82]125 emit(newMessage("bncCaster:: last get thread terminated"));
[35]126 emit getThreadErrors();
127 }
128}
129
130// Dump Complete Epochs
131////////////////////////////////////////////////////////////////////////////
132void bncCaster::dumpEpochs(long minTime, long maxTime) {
133
134 const char begEpoch = 'A';
135 const char begObs = 'B';
136 const char endEpoch = 'C';
137
138 for (long sec = minTime; sec <= maxTime; sec++) {
139
140 bool first = true;
141 QList<Observation*> allObs = _epochs->values(sec);
142 QListIterator<Observation*> it(allObs);
143 while (it.hasNext()) {
144 Observation* obs = it.next();
145
146 // Output into the file
147 // --------------------
148 if (_out) {
149 if (first) {
150 *_out << begEpoch << endl;;
151 }
152 *_out << obs->StatID << " "
153 << (int) obs->SVPRN << " "
154 << (int) obs->GPSWeek << " "
155 << obs->GPSWeeks << " "
156 << obs->sec << " "
157 << obs->pCodeIndicator << " "
158 << obs->cumuLossOfCont << " "
159 << obs->C1 << " "
160 << obs->P2 << " "
161 << obs->L1 << " "
162 << obs->L2 << endl;
163 if (!it.hasNext()) {
164 *_out << endEpoch << endl;
165 }
166 }
167
168 // Output into the socket
169 // ----------------------
170 if (_sockets) {
171 int numBytes = sizeof(*obs);
172 QListIterator<QTcpSocket*> is(*_sockets);
173 while (is.hasNext()) {
174 QTcpSocket* sock = is.next();
175 if (first) {
176 sock->write(&begEpoch, 1);
177 }
178 sock->write(&begObs, 1);
179 sock->write((char*) obs, numBytes);
180 if (!it.hasNext()) {
181 sock->write(&endEpoch, 1);
182 }
183 }
184 }
185
[73]186 // Prepare RINEX Output
187 // --------------------
188 if (1) {
189 if (_rinexWriters.find(obs->StatID) == _rinexWriters.end()) {
190 _rinexWriters.insert(obs->StatID, new bncRinex(obs->StatID));
191 }
192 bncRinex* rnx = _rinexWriters.find(obs->StatID).value();
193 rnx->deepCopy(obs);
194 }
195
[35]196 delete obs;
197 _epochs->remove(sec);
198 first = false;
199 }
[73]200
201 // Write RINEX Files
202 // -----------------
203 QMapIterator<QString, bncRinex*> ir(_rinexWriters);
204 while (ir.hasNext()) {
205 bncRinex* rnx = ir.next().value();
206 rnx->dumpEpoch();
207 }
[35]208 }
209}
Note: See TracBrowser for help on using the repository browser.