source: ntrip/trunk/BNC/bnccaster.cpp@ 56

Last change on this file since 56 was 35, checked in by mervart, 18 years ago

Imported sources

File size: 5.2 KB
RevLine 
[35]1
2/* -------------------------------------------------------------------------
3 * Bernese NTRIP Client
4 * -------------------------------------------------------------------------
5 *
6 * Class: bncCaster
7 *
8 * Purpose: buffers and disseminates the data
9 *
10 * Author: L. Mervart
11 *
12 * Created: 24-Dec-2005
13 *
14 * Changes:
15 *
16 * -----------------------------------------------------------------------*/
17
18#include "bnccaster.h"
19#include "bncgetthread.h"
20#include "RTCM/RTCM.h"
21
22// Constructor
23////////////////////////////////////////////////////////////////////////////
24bncCaster::bncCaster(const QString& outFileName, int port) {
25
26 if ( !outFileName.isEmpty() ) {
27 _outFile = new QFile(outFileName);
28 _outFile->open(QIODevice::WriteOnly);
29 _out = new QTextStream(_outFile);
30 _out->setRealNumberNotation(QTextStream::FixedNotation);
31 _out->setRealNumberPrecision(5);
32 }
33 else {
34 _outFile = 0;
35 _out = 0;
36 }
37
38 _port = port;
39
40 if (_port != 0) {
41 _server = new QTcpServer;
42 _server->listen(QHostAddress::Any, _port);
43 connect(_server, SIGNAL(newConnection()), this, SLOT(slotNewConnection()));
44 _sockets = new QList<QTcpSocket*>;
45 }
46 else {
47 _server = 0;
48 _sockets = 0;
49 }
50
51 _epochs = new QMultiMap<long, Observation*>;
52
53 _lastDumpSec = 0;
54}
55
56// Destructor
57////////////////////////////////////////////////////////////////////////////
58bncCaster::~bncCaster() {
59 delete _out;
60 delete _outFile;
61}
62
63// Run
64////////////////////////////////////////////////////////////////////////////
65void bncCaster::run() {
66 exec();
67}
68
69// New Observations
70////////////////////////////////////////////////////////////////////////////
71void bncCaster::slotNewObs(const QByteArray& mountPoint, Observation* obs) {
72
73 long newTime = obs->GPSWeek * 7*24*3600 + obs->GPSWeeks;
74
75 // First time, set the _lastDumpSec immediately
76 // --------------------------------------------
77 if (_lastDumpSec == 0) {
78 _lastDumpSec = newTime - 1;
79 }
80
81 // An old observation - throw it away
82 // ----------------------------------
83 if (newTime <= _lastDumpSec) {
84 delete obs;
85 return;
86 }
87
88 // Rename the station and save the observation
89 // -------------------------------------------
90 strncpy(obs->StatID, mountPoint.constData(),sizeof(obs->StatID));
91 _epochs->insert(newTime, obs);
92
93 // Dump older epochs
94 // -----------------
95 const long waitTime = 5;
96 dumpEpochs(_lastDumpSec + 1, newTime - waitTime);
97 _lastDumpSec = newTime - waitTime;
98}
99
100// New Connection
101////////////////////////////////////////////////////////////////////////////
102void bncCaster::slotNewConnection() {
103 _sockets->push_back( _server->nextPendingConnection() );
104}
105
106// Add New Thread
107////////////////////////////////////////////////////////////////////////////
108void bncCaster::addGetThread(bncGetThread* getThread) {
109 connect(getThread, SIGNAL(newObs(const QByteArray&, Observation*)),
110 this, SLOT(slotNewObs(const QByteArray&, Observation*)));
111
112 connect(getThread, SIGNAL(error(const QByteArray&)),
113 this, SLOT(slotGetThreadError(const QByteArray&)));
114
115 _mountPoints.push_back(getThread->mountPoint());
116}
117
118// Error in get thread
119////////////////////////////////////////////////////////////////////////////
120void bncCaster::slotGetThreadError(const QByteArray& mountPoint) {
121 _mountPoints.removeAll(mountPoint);
122 qWarning("Mountpoint size %d", _mountPoints.size());
123 if (_mountPoints.size() == 0) {
124 qWarning("bncCaster:: last get thread terminated\n");
125 emit getThreadErrors();
126 }
127}
128
129// Dump Complete Epochs
130////////////////////////////////////////////////////////////////////////////
131void bncCaster::dumpEpochs(long minTime, long maxTime) {
132
133 const char begEpoch = 'A';
134 const char begObs = 'B';
135 const char endEpoch = 'C';
136
137 for (long sec = minTime; sec <= maxTime; sec++) {
138
139 bool first = true;
140 QList<Observation*> allObs = _epochs->values(sec);
141 QListIterator<Observation*> it(allObs);
142 while (it.hasNext()) {
143 Observation* obs = it.next();
144
145 // Output into the file
146 // --------------------
147 if (_out) {
148 if (first) {
149 *_out << begEpoch << endl;;
150 }
151 *_out << obs->StatID << " "
152 << (int) obs->SVPRN << " "
153 << (int) obs->GPSWeek << " "
154 << obs->GPSWeeks << " "
155 << obs->sec << " "
156 << obs->pCodeIndicator << " "
157 << obs->cumuLossOfCont << " "
158 << obs->C1 << " "
159 << obs->P2 << " "
160 << obs->L1 << " "
161 << obs->L2 << endl;
162 if (!it.hasNext()) {
163 *_out << endEpoch << endl;
164 }
165 }
166
167 // Output into the socket
168 // ----------------------
169 if (_sockets) {
170 int numBytes = sizeof(*obs);
171 QListIterator<QTcpSocket*> is(*_sockets);
172 while (is.hasNext()) {
173 QTcpSocket* sock = is.next();
174 if (first) {
175 sock->write(&begEpoch, 1);
176 }
177 sock->write(&begObs, 1);
178 sock->write((char*) obs, numBytes);
179 if (!it.hasNext()) {
180 sock->write(&endEpoch, 1);
181 }
182 }
183 }
184
185 delete obs;
186 _epochs->remove(sec);
187 first = false;
188 }
189 }
190}
Note: See TracBrowser for help on using the repository browser.