source: ntrip/trunk/BNC/src/upload/bncuploadcaster.cpp@ 8651

Last change on this file since 8651 was 8275, checked in by stuerze, 7 years ago

some preparation to allow Ntrip version 2 stream upload in future

File size: 4.4 KB
RevLine 
[3172]1/* -------------------------------------------------------------------------
2 * BKG NTRIP Server
3 * -------------------------------------------------------------------------
4 *
5 * Class: bncUploadCaster
6 *
7 * Purpose: Connection to NTRIP Caster
8 *
9 * Author: L. Mervart
10 *
11 * Created: 29-Mar-2011
12 *
[7661]13 * Changes:
[3172]14 *
15 * -----------------------------------------------------------------------*/
16
17#include <math.h>
[7661]18#include "bncuploadcaster.h"
[3172]19#include "bncversion.h"
[5070]20#include "bnccore.h"
[3235]21#include "bnctableitem.h"
[3172]22
23using namespace std;
24
25// Constructor
26////////////////////////////////////////////////////////////////////////////
27bncUploadCaster::bncUploadCaster(const QString& mountpoint,
28 const QString& outHost, int outPort,
[8275]29 const QString& ntripVersion,
30 const QString& userName, const QString& password,
31 int iRow,
[3273]32 int rate) {
[3224]33 _mountpoint = mountpoint;
34 _outHost = outHost;
35 _outPort = outPort;
[8275]36 _ntripVersion = ntripVersion;
37 _userName = userName;
[3224]38 _password = password;
39 _outSocket = 0;
40 _sOpenTrial = 0;
[3233]41 _iRow = iRow;
[3273]42 _rate = rate;
[4809]43 if (_rate < 0) {
44 _rate = 0;
[3273]45 }
46 else if (_rate > 60) {
47 _rate = 60;
48 }
[3207]49 _isToBeDeleted = false;
[3235]50
[7661]51 connect(this, SIGNAL(newMessage(QByteArray,bool)),
[5068]52 BNC_CORE, SLOT(slotMessage(const QByteArray,bool)));
[3235]53
[5068]54 if (BNC_CORE->_uploadTableItems.find(_iRow) != BNC_CORE->_uploadTableItems.end()){
[7661]55 connect(this, SIGNAL(newBytes(QByteArray,double)),
56 BNC_CORE->_uploadTableItems.value(iRow),
[3236]57 SLOT(slotNewBytes(const QByteArray,double)));
[3235]58 }
[3172]59}
60
[3207]61// Safe Desctructor
62////////////////////////////////////////////////////////////////////////////
63void bncUploadCaster::deleteSafely() {
64 _isToBeDeleted = true;
[3208]65 if (!isRunning()) {
66 delete this;
67 }
[3207]68}
69
[3172]70// Destructor
71////////////////////////////////////////////////////////////////////////////
72bncUploadCaster::~bncUploadCaster() {
[3208]73 if (isRunning()) {
74 wait();
75 }
[7661]76 if (_outSocket) {
77 delete _outSocket;
78 }
[3172]79}
80
[3226]81// Endless Loop
82////////////////////////////////////////////////////////////////////////////
83void bncUploadCaster::run() {
84 while (true) {
85 if (_isToBeDeleted) {
86 QThread::quit();
87 deleteLater();
88 return;
89 }
90 open();
91 if (_outSocket && _outSocket->state() == QAbstractSocket::ConnectedState) {
92 QMutexLocker locker(&_mutex);
[4808]93 if (_outBuffer.size() > 0) {
94 _outSocket->write(_outBuffer);
95 _outSocket->flush();
[8204]96 emit newBytes(_mountpoint.toLatin1(), _outBuffer.size());
[4808]97 }
[3226]98 }
[4809]99 if (_rate == 0) {
[4985]100 {
101 QMutexLocker locker(&_mutex);
102 _outBuffer.clear();
103 }
[4810]104 msleep(100); //sleep 0.1 sec
[4809]105 }
106 else {
107 sleep(_rate);
108 }
[3226]109 }
110}
111
[3172]112// Start the Communication with NTRIP Caster
113////////////////////////////////////////////////////////////////////////////
114void bncUploadCaster::open() {
115
116 if (_mountpoint.isEmpty()) {
117 return;
118 }
119
[7661]120 if (_outSocket != 0 &&
[3172]121 _outSocket->state() == QAbstractSocket::ConnectedState) {
122 return;
123 }
124
125 delete _outSocket; _outSocket = 0;
126
127 double minDt = pow(2.0,_sOpenTrial);
128 if (++_sOpenTrial > 4) {
129 _sOpenTrial = 4;
130 }
131 if (_outSocketOpenTime.isValid() &&
132 _outSocketOpenTime.secsTo(QDateTime::currentDateTime()) < minDt) {
133 return;
134 }
135 else {
136 _outSocketOpenTime = QDateTime::currentDateTime();
137 }
138
139 _outSocket = new QTcpSocket();
140 _outSocket->connectToHost(_outHost, _outPort);
141
142 const int timeOut = 5000; // 5 seconds
143 if (!_outSocket->waitForConnected(timeOut)) {
144 delete _outSocket;
145 _outSocket = 0;
[8204]146 emit(newMessage("Broadcaster: Connect timeout for " + _mountpoint.toLatin1(), true));
[3172]147 return;
148 }
149
[8204]150 QByteArray msg = "SOURCE " + _password.toLatin1() + " /" +
151 _mountpoint.toLatin1() + "\r\n" +
[3172]152 "Source-Agent: NTRIP BNC/" BNCVERSION "\r\n\r\n";
153
154 _outSocket->write(msg);
155 _outSocket->waitForBytesWritten();
156
157 _outSocket->waitForReadyRead();
158 QByteArray ans = _outSocket->readLine();
159
160 if (ans.indexOf("OK") == -1) {
161 delete _outSocket;
162 _outSocket = 0;
[8204]163 emit(newMessage("Broadcaster: Connection broken for " + _mountpoint.toLatin1(), true));
[3172]164 }
165 else {
[8204]166 emit(newMessage("Broadcaster: Connection opened for " + _mountpoint.toLatin1(), true));
[3172]167 _sOpenTrial = 0;
168 }
169}
170
Note: See TracBrowser for help on using the repository browser.