Simplify the code that reads from the socket line by line

Since sockets are buffered `QIODevices` we can use `canReadLine()` to check
if we have a full line, instead of using a custom `SocketLineReader` class
(and the copy-pasted `DeviceLineReader` in the Bluetooth backend). 

We can also loop through all the lines instead of queuing calls to `dataReceived`.

And we don't need transactions.
This commit is contained in:
Albert Vaca Cintora 2023-07-10 15:58:56 +00:00
parent 0a3ca9e90b
commit 5796b561bf
13 changed files with 47 additions and 410 deletions

View file

@ -32,7 +32,6 @@ set(kdeconnectcore_SRCS
backends/linkprovider.cpp backends/linkprovider.cpp
backends/devicelink.cpp backends/devicelink.cpp
backends/pairinghandler.cpp backends/pairinghandler.cpp
backends/devicelinereader.cpp
kdeconnectplugin.cpp kdeconnectplugin.cpp
kdeconnectpluginconfig.cpp kdeconnectpluginconfig.cpp

View file

@ -19,12 +19,11 @@ BluetoothDeviceLink::BluetoothDeviceLink(const DeviceInfo &deviceInfo,
ConnectionMultiplexer *connection, ConnectionMultiplexer *connection,
QSharedPointer<MultiplexChannel> socket) QSharedPointer<MultiplexChannel> socket)
: DeviceLink(deviceInfo.id, parent) : DeviceLink(deviceInfo.id, parent)
, mSocketReader(new DeviceLineReader(socket.data(), this))
, mConnection(connection) , mConnection(connection)
, mChannel(socket) , mChannel(socket)
, mDeviceInfo(deviceInfo) , mDeviceInfo(deviceInfo)
{ {
connect(mSocketReader, &DeviceLineReader::readyRead, this, &BluetoothDeviceLink::dataReceived); connect(socket.data(), &QIODevice::readyRead, this, &BluetoothDeviceLink::dataReceived);
// We take ownership of the connection. // We take ownership of the connection.
// When the link provider destroys us, // When the link provider destroys us,
@ -42,16 +41,14 @@ bool BluetoothDeviceLink::sendPacket(NetworkPacket &np)
uploadJob->start(); uploadJob->start();
} }
// TODO: handle too-big packets // TODO: handle too-big packets
int written = mSocketReader->write(np.serialize()); int written = mChannel->write(np.serialize());
return (written != -1); return (written != -1);
} }
void BluetoothDeviceLink::dataReceived() void BluetoothDeviceLink::dataReceived()
{ {
if (mSocketReader->bytesAvailable() == 0) while (mChannel->canReadLine()) {
return; const QByteArray serializedPacket = mChannel->readLine();
const QByteArray serializedPacket = mSocketReader->readLine();
// qCDebug(KDECONNECT_CORE) << "BluetoothDeviceLink dataReceived" << packet; // qCDebug(KDECONNECT_CORE) << "BluetoothDeviceLink dataReceived" << packet;
@ -65,8 +62,5 @@ void BluetoothDeviceLink::dataReceived()
} }
Q_EMIT receivedPacket(packet); Q_EMIT receivedPacket(packet);
if (mSocketReader->bytesAvailable() > 0) {
QMetaObject::invokeMethod(this, &BluetoothDeviceLink::dataReceived, Qt::QueuedConnection);
} }
} }

View file

@ -12,7 +12,6 @@
#include <QSslCertificate> #include <QSslCertificate>
#include <QString> #include <QString>
#include "../devicelinereader.h"
#include "../devicelink.h" #include "../devicelink.h"
class ConnectionMultiplexer; class ConnectionMultiplexer;
@ -40,7 +39,6 @@ private Q_SLOTS:
void dataReceived(); void dataReceived();
private: private:
DeviceLineReader *mSocketReader;
ConnectionMultiplexer *mConnection; ConnectionMultiplexer *mConnection;
QSharedPointer<MultiplexChannel> mChannel; QSharedPointer<MultiplexChannel> mChannel;
DeviceInfo mDeviceInfo; DeviceInfo mDeviceInfo;

View file

@ -1,39 +0,0 @@
/**
* SPDX-FileCopyrightText: 2013 Albert Vaca <albertvaka@gmail.com>
* SPDX-FileCopyrightText: 2014 Alejandro Fiestas Olivares <afiestas@kde.org>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#include "devicelinereader.h"
DeviceLineReader::DeviceLineReader(QIODevice *device, QObject *parent)
: QObject(parent)
, m_device(device)
{
connect(m_device, &QIODevice::readyRead, this, &DeviceLineReader::dataReceived);
connect(m_device, &QIODevice::aboutToClose, this, &DeviceLineReader::disconnected);
}
void DeviceLineReader::dataReceived()
{
while (m_device->canReadLine()) {
const QByteArray line = m_device->readLine();
if (line.length() > 1) {
m_packets.enqueue(line); // we don't want single \n
}
}
// If we still have things to read from the device, call dataReceived again
// We do this manually because we do not trust readyRead to be emitted again
// So we call this method again just in case.
if (m_device->bytesAvailable() > 0) {
QMetaObject::invokeMethod(this, "dataReceived", Qt::QueuedConnection);
return;
}
// If we have any packets, tell it to the world.
if (!m_packets.isEmpty()) {
Q_EMIT readyRead();
}
}

View file

@ -1,53 +0,0 @@
/**
* SPDX-FileCopyrightText: 2013 Albert Vaca <albertvaka@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#ifndef DEVICELINEREADER_H
#define DEVICELINEREADER_H
#include <QIODevice>
#include <QObject>
#include <QQueue>
#include <QString>
/*
* Encapsulates a QIODevice and implements the same methods of its API that are
* used by LanDeviceLink and BluetoothDeviceLink, but readyRead is emitted only
* when a newline is found.
*/
class DeviceLineReader : public QObject
{
Q_OBJECT
public:
DeviceLineReader(QIODevice *device, QObject *parent = 0);
QByteArray readLine()
{
return m_packets.dequeue();
}
qint64 write(const QByteArray &data)
{
return m_device->write(data);
}
qint64 bytesAvailable() const
{
return m_packets.size();
}
Q_SIGNALS:
void readyRead();
void disconnected();
private Q_SLOTS:
void dataReceived();
private:
QByteArray m_lastChunk;
QIODevice *m_device;
QQueue<QByteArray> m_packets;
};
#endif

View file

@ -7,7 +7,5 @@ set(backends_kdeconnect_SRCS
backends/lan/landevicelink.cpp backends/lan/landevicelink.cpp
backends/lan/compositeuploadjob.cpp backends/lan/compositeuploadjob.cpp
backends/lan/uploadjob.cpp backends/lan/uploadjob.cpp
backends/lan/socketlinereader.cpp
PARENT_SCOPE PARENT_SCOPE
) )

View file

@ -13,11 +13,10 @@
#include "kdeconnectconfig.h" #include "kdeconnectconfig.h"
#include "lanlinkprovider.h" #include "lanlinkprovider.h"
#include "plugins/share/shareplugin.h" #include "plugins/share/shareplugin.h"
#include "socketlinereader.h"
LanDeviceLink::LanDeviceLink(const DeviceInfo &deviceInfo, LanLinkProvider *parent, QSslSocket *socket) LanDeviceLink::LanDeviceLink(const DeviceInfo &deviceInfo, LanLinkProvider *parent, QSslSocket *socket)
: DeviceLink(deviceInfo.id, parent) : DeviceLink(deviceInfo.id, parent)
, m_socketLineReader(nullptr) , m_socket(nullptr)
, m_deviceInfo(deviceInfo) , m_deviceInfo(deviceInfo)
{ {
reset(socket); reset(socket);
@ -25,29 +24,24 @@ LanDeviceLink::LanDeviceLink(const DeviceInfo &deviceInfo, LanLinkProvider *pare
void LanDeviceLink::reset(QSslSocket *socket) void LanDeviceLink::reset(QSslSocket *socket)
{ {
if (m_socketLineReader) { if (m_socket) {
disconnect(m_socketLineReader->m_socket, &QAbstractSocket::disconnected, this, &QObject::deleteLater); disconnect(m_socket, &QAbstractSocket::disconnected, this, &QObject::deleteLater);
delete m_socketLineReader; delete m_socket;
} }
m_socketLineReader = new SocketLineReader(socket, this); m_socket = socket;
socket->setParent(this);
connect(socket, &QAbstractSocket::disconnected, this, &QObject::deleteLater); connect(socket, &QAbstractSocket::disconnected, this, &QObject::deleteLater);
connect(m_socketLineReader, &SocketLineReader::readyRead, this, &LanDeviceLink::dataReceived); connect(socket, &QAbstractSocket::readyRead, this, &LanDeviceLink::dataReceived);
// We take ownership of the socket.
// When the link provider destroys us,
// the socket (and the reader) will be
// destroyed as well
socket->setParent(m_socketLineReader);
} }
QHostAddress LanDeviceLink::hostAddress() const QHostAddress LanDeviceLink::hostAddress() const
{ {
if (!m_socketLineReader) { if (!m_socket) {
return QHostAddress::Null; return QHostAddress::Null;
} }
QHostAddress addr = m_socketLineReader->m_socket->peerAddress(); QHostAddress addr = m_socket->peerAddress();
if (addr.protocol() == QAbstractSocket::IPv6Protocol) { if (addr.protocol() == QAbstractSocket::IPv6Protocol) {
bool success; bool success;
QHostAddress convertedAddr = QHostAddress(addr.toIPv4Address(&success)); QHostAddress convertedAddr = QHostAddress(addr.toIPv4Address(&success));
@ -80,7 +74,7 @@ bool LanDeviceLink::sendPacket(NetworkPacket &np)
return true; return true;
} else { } else {
int written = m_socketLineReader->write(np.serialize()); int written = m_socket->write(np.serialize());
// Actually we can't detect if a packet is received or not. We keep TCP // Actually we can't detect if a packet is received or not. We keep TCP
//"ESTABLISHED" connections that look legit (return true when we use them), //"ESTABLISHED" connections that look legit (return true when we use them),
@ -91,10 +85,8 @@ bool LanDeviceLink::sendPacket(NetworkPacket &np)
void LanDeviceLink::dataReceived() void LanDeviceLink::dataReceived()
{ {
if (!m_socketLineReader->hasPacketsAvailable()) while (m_socket->canReadLine()) {
return; const QByteArray serializedPacket = m_socket->readLine();
const QByteArray serializedPacket = m_socketLineReader->readLine();
NetworkPacket packet; NetworkPacket packet;
NetworkPacket::unserialize(serializedPacket, &packet); NetworkPacket::unserialize(serializedPacket, &packet);
@ -112,15 +104,12 @@ void LanDeviceLink::dataReceived()
// Needs investigation and upstreaming of the fix. QTBUG-62257 // Needs investigation and upstreaming of the fix. QTBUG-62257
connect(socket.data(), &QAbstractSocket::disconnected, socket.data(), &QAbstractSocket::readChannelFinished); connect(socket.data(), &QAbstractSocket::disconnected, socket.data(), &QAbstractSocket::readChannelFinished);
const QString address = m_socketLineReader->peerAddress().toString(); const QString address = m_socket->peerAddress().toString();
const quint16 port = transferInfo[QStringLiteral("port")].toInt(); const quint16 port = transferInfo[QStringLiteral("port")].toInt();
socket->connectToHostEncrypted(address, port, QIODevice::ReadWrite); socket->connectToHostEncrypted(address, port, QIODevice::ReadWrite);
packet.setPayload(socket, packet.payloadSize()); packet.setPayload(socket, packet.payloadSize());
} }
Q_EMIT receivedPacket(packet); Q_EMIT receivedPacket(packet);
if (m_socketLineReader->hasPacketsAvailable()) {
QMetaObject::invokeMethod(this, "dataReceived", Qt::QueuedConnection);
} }
} }

View file

@ -18,7 +18,6 @@
#include "uploadjob.h" #include "uploadjob.h"
#include <kdeconnectcore_export.h> #include <kdeconnectcore_export.h>
class SocketLineReader;
class LanLinkProvider; class LanLinkProvider;
class KDECONNECTCORE_EXPORT LanDeviceLink : public DeviceLink class KDECONNECTCORE_EXPORT LanDeviceLink : public DeviceLink
@ -26,8 +25,6 @@ class KDECONNECTCORE_EXPORT LanDeviceLink : public DeviceLink
Q_OBJECT Q_OBJECT
public: public:
enum ConnectionStarted : bool { Locally, Remotely };
LanDeviceLink(const DeviceInfo &deviceInfo, LanLinkProvider *parent, QSslSocket *socket); LanDeviceLink(const DeviceInfo &deviceInfo, LanLinkProvider *parent, QSslSocket *socket);
void reset(QSslSocket *socket); void reset(QSslSocket *socket);
@ -44,8 +41,7 @@ private Q_SLOTS:
void dataReceived(); void dataReceived();
private: private:
SocketLineReader *m_socketLineReader; QSslSocket *m_socket;
ConnectionStarted m_connectionSource;
QPointer<CompositeUploadJob> m_compositeUploadJob; QPointer<CompositeUploadJob> m_compositeUploadJob;
DeviceInfo m_deviceInfo; DeviceInfo m_deviceInfo;
}; };

View file

@ -427,7 +427,7 @@ void LanLinkProvider::newConnection()
} }
} }
// I'm the new device and this is the answer to my UDP identity packet (data received) // I'm the new device and this is the TCP response to my UDP identity packet
void LanLinkProvider::dataReceived() void LanLinkProvider::dataReceived()
{ {
QSslSocket *socket = qobject_cast<QSslSocket *>(sender()); QSslSocket *socket = qobject_cast<QSslSocket *>(sender());
@ -440,12 +440,10 @@ void LanLinkProvider::dataReceived()
return; return;
} }
#if QT_VERSION < QT_VERSION_CHECK(5, 7, 0) if (!socket->canReadLine()) {
if (!socket->canReadLine()) // This can happen if the packet is large enough to be split in two chunks
return; return;
#else }
socket->startTransaction();
#endif
const QByteArray data = socket->readLine(); const QByteArray data = socket->readLine();
@ -454,19 +452,10 @@ void LanLinkProvider::dataReceived()
NetworkPacket *np = new NetworkPacket(); NetworkPacket *np = new NetworkPacket();
bool success = NetworkPacket::unserialize(data, np); bool success = NetworkPacket::unserialize(data, np);
#if QT_VERSION < QT_VERSION_CHECK(5, 7, 0)
if (!success) { if (!success) {
delete np; delete np;
return; return;
} }
#else
if (!success) {
delete np;
socket->rollbackTransaction();
return;
}
socket->commitTransaction();
#endif
if (np->type() != PACKET_TYPE_IDENTITY) { if (np->type() != PACKET_TYPE_IDENTITY) {
qCWarning(KDECONNECT_CORE) << "LanLinkProvider/newConnection: Expected identity, received " << np->type(); qCWarning(KDECONNECT_CORE) << "LanLinkProvider/newConnection: Expected identity, received " << np->type();

View file

@ -1,30 +0,0 @@
/**
* SPDX-FileCopyrightText: 2013 Albert Vaca <albertvaka@gmail.com>
* SPDX-FileCopyrightText: 2014 Alejandro Fiestas Olivares <afiestas@kde.org>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#include "socketlinereader.h"
SocketLineReader::SocketLineReader(QSslSocket *socket, QObject *parent)
: QObject(parent)
, m_socket(socket)
{
connect(m_socket, &QIODevice::readyRead, this, &SocketLineReader::dataReceived);
}
void SocketLineReader::dataReceived()
{
while (m_socket->canReadLine()) {
const QByteArray line = m_socket->readLine();
if (line.length() > 1) { // we don't want a single \n
m_packets.enqueue(line);
}
}
// If we have any packets, tell it to the world.
if (!m_packets.isEmpty()) {
Q_EMIT readyRead();
}
}

View file

@ -1,62 +0,0 @@
/**
* SPDX-FileCopyrightText: 2013 Albert Vaca <albertvaka@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#ifndef SOCKETLINEREADER_H
#define SOCKETLINEREADER_H
#include <QHostAddress>
#include <QObject>
#include <QQueue>
#include <QSslSocket>
#include <kdeconnectcore_export.h>
/*
* Encapsulates a QTcpSocket and implements the same methods of its API that are
* used by LanDeviceLink, but readyRead is emitted only when a newline is found.
*/
class KDECONNECTCORE_EXPORT SocketLineReader : public QObject
{
Q_OBJECT
public:
explicit SocketLineReader(QSslSocket *socket, QObject *parent = nullptr);
bool hasPacketsAvailable() const
{
return !m_packets.isEmpty();
}
QByteArray readLine()
{
return m_packets.dequeue();
}
qint64 write(const QByteArray &data)
{
return m_socket->write(data);
}
QHostAddress peerAddress() const
{
return m_socket->peerAddress();
}
QSslCertificate peerCertificate() const
{
return m_socket->peerCertificate();
}
QSslSocket *m_socket;
Q_SIGNALS:
void readyRead();
private Q_SLOTS:
void dataReceived();
private:
QByteArray m_lastChunk;
QQueue<QByteArray> m_packets;
};
#endif

View file

@ -15,4 +15,3 @@ endif()
ecm_add_test(pluginloadtest.cpp LINK_LIBRARIES ${kdeconnect_libraries}) ecm_add_test(pluginloadtest.cpp LINK_LIBRARIES ${kdeconnect_libraries})
ecm_add_test(sendfiletest.cpp LINK_LIBRARIES ${kdeconnect_libraries}) ecm_add_test(sendfiletest.cpp LINK_LIBRARIES ${kdeconnect_libraries})
ecm_add_test(testsocketlinereader.cpp TEST_NAME testsocketlinereader LINK_LIBRARIES ${kdeconnect_libraries})

View file

@ -1,141 +0,0 @@
/*************************************************************************************
* SPDX-FileCopyrightText: 2014 Alejandro Fiestas Olivares <afiestas@kde.org> *
* *
* SPDX-License-Identifier: LGPL-2.1-or-later
*************************************************************************************/
#include "../core/backends/lan/server.h"
#include "../core/backends/lan/socketlinereader.h"
#include "../core/qtcompat_p.h"
#include <QEventLoop>
#include <QProcess>
#include <QSignalSpy>
#include <QSslSocket>
#include <QTest>
#include <QTimer>
class TestSocketLineReader : public QObject
{
Q_OBJECT
public Q_SLOTS:
void init();
void cleanup()
{
delete m_server;
}
void newPacket();
private Q_SLOTS:
void socketLineReader();
void badData();
private:
QTimer m_timer;
QEventLoop m_loop;
QList<QByteArray> m_packets;
Server *m_server;
QSslSocket *m_conn;
SocketLineReader *m_reader;
};
void TestSocketLineReader::init()
{
m_packets.clear();
m_server = new Server(this);
QVERIFY2(m_server->listen(QHostAddress::LocalHost, 8694), "Failed to create local tcp server");
m_timer.setInterval(4000); // For second is more enough to send some data via local socket
m_timer.setSingleShot(true);
connect(&m_timer, &QTimer::timeout, &m_loop, &QEventLoop::quit);
m_conn = new QSslSocket(this);
m_conn->connectToHost(QHostAddress::LocalHost, 8694);
connect(m_conn, &QAbstractSocket::connected, &m_loop, &QEventLoop::quit);
m_timer.start();
m_loop.exec();
QVERIFY2(m_conn->isOpen(), "Could not connect to local tcp server");
}
void TestSocketLineReader::socketLineReader()
{
QList<QByteArray> dataToSend;
dataToSend << "foobar\n"
<< "barfoo\n"
<< "foobar?\n"
<< "\n"
<< "barfoo!\n"
<< "panda\n";
for (const QByteArray &line : qAsConst(dataToSend)) {
m_conn->write(line);
}
m_conn->flush();
int maxAttemps = 5;
while (!m_server->hasPendingConnections() && maxAttemps > 0) {
--maxAttemps;
QTest::qSleep(1000);
}
QSslSocket *sock = m_server->nextPendingConnection();
QVERIFY2(sock != nullptr, "Could not open a connection to the client");
m_reader = new SocketLineReader(sock, this);
connect(m_reader, &SocketLineReader::readyRead, this, &TestSocketLineReader::newPacket);
m_timer.start();
m_loop.exec();
/* remove the empty line before compare */
dataToSend.removeOne("\n");
QCOMPARE(m_packets.count(), 5); // We expect 5 Packets
for (int x = 0; x < 5; ++x) {
QCOMPARE(m_packets[x], dataToSend[x]);
}
}
void TestSocketLineReader::badData()
{
const QList<QByteArray> dataToSend = {"data1\n", "data"}; // does not end in a \n
for (const QByteArray &line : qAsConst(dataToSend)) {
m_conn->write(line);
}
m_conn->flush();
QSignalSpy spy(m_server, &QTcpServer::newConnection);
QVERIFY(m_server->hasPendingConnections() || spy.wait(1000));
QSslSocket *sock = m_server->nextPendingConnection();
QVERIFY2(sock != nullptr, "Could not open a connection to the client");
m_reader = new SocketLineReader(sock, this);
connect(m_reader, &SocketLineReader::readyRead, this, &TestSocketLineReader::newPacket);
m_timer.start();
m_loop.exec();
QCOMPARE(m_packets.count(), 1);
QCOMPARE(m_packets[0], dataToSend[0]);
}
void TestSocketLineReader::newPacket()
{
int maxLoops = 5;
while (m_reader->hasPacketsAvailable() && maxLoops > 0) {
--maxLoops;
const QByteArray packet = m_reader->readLine();
if (!packet.isEmpty()) {
m_packets.append(packet);
}
if (m_packets.count() == 5) {
m_loop.exit();
}
}
}
QTEST_GUILESS_MAIN(TestSocketLineReader)
#include "testsocketlinereader.moc"