diff options
Diffstat (limited to 'nxcomp/src/ReadBuffer.cpp')
-rw-r--r-- | nxcomp/src/ReadBuffer.cpp | 639 |
1 files changed, 639 insertions, 0 deletions
diff --git a/nxcomp/src/ReadBuffer.cpp b/nxcomp/src/ReadBuffer.cpp new file mode 100644 index 000000000..154225e75 --- /dev/null +++ b/nxcomp/src/ReadBuffer.cpp @@ -0,0 +1,639 @@ +/**************************************************************************/ +/* */ +/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */ +/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */ +/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */ +/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */ +/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/ +/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */ +/* */ +/* NXCOMP, NX protocol compression and NX extensions to this software */ +/* are copyright of the aforementioned persons and companies. */ +/* */ +/* Redistribution and use of the present software is allowed according */ +/* to terms specified in the file LICENSE.nxcomp which comes in the */ +/* source distribution. */ +/* */ +/* All rights reserved. */ +/* */ +/* NOTE: This software has received contributions from various other */ +/* contributors, only the core maintainers and supporters are listed as */ +/* copyright holders. Please contact us, if you feel you should be listed */ +/* as copyright holder, as well. */ +/* */ +/**************************************************************************/ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "ReadBuffer.h" + +#include "Transport.h" + +// +// Set the verbosity level. +// + +#define PANIC +#define WARNING +#undef TEST +#undef DEBUG + +ReadBuffer::ReadBuffer(Transport *transport) + + : transport_(transport) +{ + // + // The read buffer will grow until + // reaching the maximum buffer size + // and then will remain stable at + // that size. + // + + initialReadSize_ = READ_BUFFER_DEFAULT_SIZE; + maximumBufferSize_ = READ_BUFFER_DEFAULT_SIZE; + + size_ = 0; + buffer_ = NULL; + + owner_ = 1; + length_ = 0; + start_ = 0; + + remaining_ = 0; +} + +ReadBuffer::~ReadBuffer() +{ + if (owner_ == 1) + { + delete [] buffer_; + } +} + +void ReadBuffer::readMessage(const unsigned char *message, unsigned int length) +{ + // + // To be here we must be the real owner + // of the buffer and there must not be + // pending bytes in the transport. + // + + #ifdef TEST + + if (owner_ == 0) + { + *logofs << "ReadBuffer: PANIC! Class for FD#" + << transport_ -> fd() << " doesn't " + << "appear to be the owner of the buffer " + << "while borrowing from the caller.\n" + << logofs_flush; + + HandleCleanup(); + } + + #endif + + // + // Be sure that any outstanding data from + // the transport is appended to our own + // byffer. + // + + if (transport_ -> pending() != 0) + { + #ifdef WARNING + *logofs << "ReadBuffer: WARNING! Class for FD#" + << transport_ -> fd() << " has pending " + << "data in the transport while " + << "borrowing from the caller.\n" + << logofs_flush; + #endif + + readMessage(); + + if (owner_ == 0) + { + convertBuffer(); + } + } + + // + // Can't borrow the buffer if there is data + // from a partial message. In this case add + // the new data to the end of our buffer. + // + + if (length_ == 0) + { + #ifdef TEST + *logofs << "ReadBuffer: Borrowing " << length + << " bytes from the caller for FD#" + << transport_ -> fd() << " with " + << length_ << " bytes in the buffer.\n" + << logofs_flush; + #endif + + delete [] buffer_; + + buffer_ = (unsigned char *) message; + size_ = length; + + length_ = length; + + owner_ = 0; + start_ = 0; + } + else + { + #ifdef TEST + *logofs << "ReadBuffer: Appending " << length + << " bytes from the caller for FD#" + << transport_ -> fd() << " with " + << length_ << " bytes in the buffer.\n" + << logofs_flush; + #endif + + appendBuffer(message, length); + } +} + +int ReadBuffer::readMessage() +{ + int pendingLength = transport_ -> pending(); + + if (pendingLength > 0) + { + // + // Can't move the data in the borrowed buffer, + // so use the tansport buffer only if we don't + // have any partial message. This can happen + // with the proxy where we need to deflate the + // stream. + // + + if (length_ == 0) + { + unsigned char *newBuffer; + + length_ = transport_ -> getPending(newBuffer); + + if (newBuffer == NULL) + { + #ifdef PANIC + *logofs << "ReadBuffer: PANIC! Failed to borrow " + << length_ << " bytes of memory for buffer " + << "in context [A].\n" << logofs_flush; + #endif + + cerr << "Error" << ": Failed to borrow memory for " + << "read buffer in context [A].\n"; + + HandleCleanup(); + } + + delete [] buffer_; + + buffer_ = newBuffer; + size_ = length_; + + owner_ = 0; + start_ = 0; + + #ifdef TEST + *logofs << "ReadBuffer: Borrowed " << length_ + << " pending bytes for FD#" << transport_ -> + fd() << ".\n" << logofs_flush; + #endif + + return length_; + } + #ifdef TEST + else + { + *logofs << "ReadBuffer: WARNING! Cannot borrow " + << pendingLength << " bytes for FD#" + << transport_ -> fd() << " with " + << length_ << " bytes in the buffer.\n" + << logofs_flush; + } + #endif + } + + unsigned int readLength = suggestedLength(pendingLength); + + #ifdef DEBUG + *logofs << "ReadBuffer: Requested " << readLength + << " bytes for FD#" << transport_ -> fd() + << " with readable " << transport_ -> readable() + << " remaining " << remaining_ << " pending " + << transport_ -> pending() << ".\n" + << logofs_flush; + #endif + + if (readLength < initialReadSize_) + { + readLength = initialReadSize_; + } + + #ifdef DEBUG + *logofs << "ReadBuffer: Buffer size is " << size_ + << " length " << length_ << " and start " + << start_ << ".\n" << logofs_flush; + #endif + + // + // We can't use the transport buffer + // to read our own data in it. + // + + #ifdef TEST + + if (owner_ == 0) + { + *logofs << "ReadBuffer: PANIC! Class for FD#" + << transport_ -> fd() << " doesn't " + << "appear to be the owner of the buffer " + << "while reading.\n" << logofs_flush; + + HandleCleanup(); + } + + #endif + + // + // Be sure that we have enough space + // to store all the requested data. + // + + if (buffer_ == NULL || length_ + readLength > size_) + { + unsigned int newSize = length_ + readLength; + + #ifdef TEST + *logofs << "ReadBuffer: Resizing buffer for FD#" + << transport_ -> fd() << " in read from " + << size_ << " to " << newSize << " bytes.\n" + << logofs_flush; + #endif + + unsigned char *newBuffer = allocateBuffer(newSize); + + memcpy(newBuffer, buffer_ + start_, length_); + + delete [] buffer_; + + buffer_ = newBuffer; + size_ = newSize; + + transport_ -> pendingReset(); + + owner_ = 1; + } + else if (start_ != 0 && length_ != 0) + { + // + // If any bytes are left due to a partial + // message, shift them to the beginning + // of the buffer. + // + + #ifdef TEST + *logofs << "ReadBuffer: Moving " << length_ + << " bytes of data " << "at beginning of " + << "the buffer for FD#" << transport_ -> fd() + << ".\n" << logofs_flush; + #endif + + memmove(buffer_, buffer_ + start_, length_); + } + + start_ = 0; + + #ifdef DEBUG + *logofs << "ReadBuffer: Buffer size is now " << size_ + << " length is " << length_ << " and start is " + << start_ << ".\n" << logofs_flush; + #endif + + unsigned char *readData = buffer_ + length_; + + #ifdef DEBUG + *logofs << "ReadBuffer: Going to read " << readLength + << " bytes from FD#" << transport_ -> fd() << ".\n" + << logofs_flush; + #endif + + int bytesRead = transport_ -> read(readData, readLength); + + if (bytesRead > 0) + { + #ifdef TEST + *logofs << "ReadBuffer: Read " << bytesRead + << " bytes from FD#" << transport_ -> fd() + << ".\n" << logofs_flush; + #endif + + length_ += bytesRead; + } + else if (bytesRead < 0) + { + // + // Check if there is more data pending than the + // size of the provided buffer. After reading + // the requested amount, in fact, the transport + // may have decompressed the data and produced + // more input. This trick allows us to always + // borrow the buffer from the transport, even + // when the partial read would have prevented + // that. + // + + if (transport_ -> pending() > 0) + { + #ifdef TEST + *logofs << "ReadBuffer: WARNING! Trying to read some " + << "more with " << transport_ -> pending() + << " bytes pending for FD#" << transport_ -> + fd() << ".\n" << logofs_flush; + #endif + + return readMessage(); + } + + #ifdef TEST + *logofs << "ReadBuffer: Error detected reading " + << "from FD#" << transport_ -> fd() + << ".\n" << logofs_flush; + #endif + + return -1; + } + #ifdef TEST + else + { + *logofs << "ReadBuffer: No data read from FD#" + << transport_ -> fd() << " with remaining " + << remaining_ << ".\n" << logofs_flush; + } + #endif + + return bytesRead; +} + +const unsigned char *ReadBuffer::getMessage(unsigned int &controlLength, + unsigned int &dataLength) +{ + #ifdef TEST + + if (transport_ -> pending() > 0) + { + *logofs << "ReadBuffer: PANIC! The transport " + << "appears to have data pending.\n" + << logofs_flush; + + HandleCleanup(); + } + + #endif + + if (length_ == 0) + { + #ifdef DEBUG + *logofs << "ReadBuffer: No message can be located " + << "for FD#" << transport_ -> fd() << ".\n" + << logofs_flush; + #endif + + if (owner_ == 0) + { + buffer_ = NULL; + size_ = 0; + + transport_ -> pendingReset(); + + owner_ = 1; + start_ = 0; + } + + return NULL; + } + + unsigned int trailerLength; + + #ifdef DEBUG + *logofs << "ReadBuffer: Going to locate message with " + << "start at " << start_ << " and length " + << length_ << " for FD#" << transport_ -> fd() + << ".\n" << logofs_flush; + #endif + + int located = locateMessage(buffer_ + start_, buffer_ + start_ + length_, + controlLength, dataLength, trailerLength); + + if (located == 0) + { + // + // No more complete messages are in + // the buffer. + // + + #ifdef DEBUG + *logofs << "ReadBuffer: No message was located " + << "for FD#" << transport_ -> fd() + << ".\n" << logofs_flush; + #endif + + if (owner_ == 0) + { + // + // Must move the remaining bytes in + // our own buffer. + // + + convertBuffer(); + } + + return NULL; + } + else + { + const unsigned char *result = buffer_ + start_; + + if (dataLength > 0) + { + // + // Message contains data, so go to the + // first byte of payload. + // + + result += trailerLength; + + start_ += (dataLength + trailerLength); + length_ -= (dataLength + trailerLength); + } + else + { + // + // It is a control message. + // + + start_ += (controlLength + trailerLength); + length_ -= (controlLength + trailerLength); + } + + #ifdef DEBUG + *logofs << "ReadBuffer: Located message for FD#" + << transport_ -> fd() << " with control length " + << controlLength << " and data length " + << dataLength << ".\n" << logofs_flush; + #endif + + remaining_ = 0; + + return result; + } +} + +int ReadBuffer::setSize(int initialReadSize, int maximumBufferSize) +{ + initialReadSize_ = initialReadSize; + maximumBufferSize_ = maximumBufferSize; + + #ifdef TEST + *logofs << "ReadBuffer: WARNING! Set buffer parameters to " + << initialReadSize_ << "/" << maximumBufferSize_ + << " for object at "<< this << ".\n" + << logofs_flush; + #endif + + return 1; +} + +void ReadBuffer::fullReset() +{ + #ifdef TEST + + if (owner_ == 0) + { + *logofs << "ReadBuffer: PANIC! Class for FD#" + << transport_ -> fd() << " doesn't " + << "appear to be the owner of the buffer " + << "in reset.\n" << logofs_flush; + + HandleCleanup(); + } + + #endif + + if (length_ == 0 && size_ > maximumBufferSize_) + { + #ifdef TEST + *logofs << "ReadBuffer: Resizing buffer for FD#" + << transport_ -> fd() << " in reset from " + << size_ << " to " << maximumBufferSize_ + << " bytes.\n" << logofs_flush; + #endif + + delete [] buffer_; + + int newSize = maximumBufferSize_; + + unsigned char *newBuffer = allocateBuffer(newSize); + + buffer_ = newBuffer; + size_ = newSize; + + transport_ -> pendingReset(); + + owner_ = 1; + start_ = 0; + } +} + +unsigned char *ReadBuffer::allocateBuffer(unsigned int newSize) +{ + unsigned char *newBuffer = new unsigned char[newSize]; + + if (newBuffer == NULL) + { + #ifdef PANIC + *logofs << "ReadBuffer: PANIC! Can't allocate " + << newSize << " bytes of memory for buffer " + << "in context [B].\n" << logofs_flush; + #endif + + cerr << "Error" << ": Can't allocate memory for " + << "read buffer in context [B].\n"; + + HandleCleanup(); + } + + #ifdef VALGRIND + + memset(newBuffer, '\0', newSize); + + #endif + + return newBuffer; +} + +void ReadBuffer::appendBuffer(const unsigned char *message, unsigned int length) +{ + if (start_ + length_ + length > size_) + { + unsigned int newSize = length_ + length + initialReadSize_; + + #ifdef TEST + *logofs << "ReadBuffer: WARNING! Resizing buffer " + << "for FD#" << transport_ -> fd() + << " from " << size_ << " to " << newSize + << " bytes.\n" << logofs_flush; + #endif + + unsigned char *newBuffer = allocateBuffer(newSize); + + memcpy(newBuffer, buffer_ + start_, length_); + + delete [] buffer_; + + buffer_ = newBuffer; + size_ = newSize; + + start_ = 0; + } + + memcpy(buffer_ + start_ + length_, message, length); + + length_ += length; + + transport_ -> pendingReset(); + + owner_ = 1; +} + +void ReadBuffer::convertBuffer() +{ + unsigned int newSize = length_ + initialReadSize_; + + #ifdef TEST + *logofs << "ReadBuffer: WARNING! Converting " + << length_ << " bytes to own buffer " + << "for FD#" << transport_ -> fd() + << " with new size " << newSize + << " bytes.\n" << logofs_flush; + #endif + + unsigned char *newBuffer = allocateBuffer(newSize); + + memcpy(newBuffer, buffer_ + start_, length_); + + buffer_ = newBuffer; + size_ = newSize; + + transport_ -> pendingReset(); + + owner_ = 1; + start_ = 0; +} |