/**************************************************************************/ /* */ /* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */ /* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */ /* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */ /* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */ /* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/ /* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */ /* */ /* NXCOMP, NX protocol compression and NX extensions to this software */ /* are copyright of the aforementioned persons and companies. */ /* */ /* Redistribution and use of the present software is allowed according */ /* to terms specified in the file LICENSE.nxcomp which comes in the */ /* source distribution. */ /* */ /* All rights reserved. */ /* */ /* NOTE: This software has received contributions from various other */ /* contributors, only the core maintainers and supporters are listed as */ /* copyright holders. Please contact us, if you feel you should be listed */ /* as copyright holder, as well. */ /* */ /**************************************************************************/ #include "ReadBuffer.h" #include "Transport.h" // // Set the verbosity level. // #define PANIC #define WARNING #undef TEST #undef DEBUG ReadBuffer::ReadBuffer(Transport *transport) : transport_(transport) { // // The read buffer will grow until // reaching the maximum buffer size // and then will remain stable at // that size. // initialReadSize_ = READ_BUFFER_DEFAULT_SIZE; maximumBufferSize_ = READ_BUFFER_DEFAULT_SIZE; size_ = 0; buffer_ = NULL; owner_ = 1; length_ = 0; start_ = 0; remaining_ = 0; } ReadBuffer::~ReadBuffer() { if (owner_ == 1) { delete [] buffer_; } } void ReadBuffer::readMessage(const unsigned char *message, unsigned int length) { // // To be here we must be the real owner // of the buffer and there must not be // pending bytes in the transport. // #ifdef TEST if (owner_ == 0) { *logofs << "ReadBuffer: PANIC! Class for FD#" << transport_ -> fd() << " doesn't " << "appear to be the owner of the buffer " << "while borrowing from the caller.\n" << logofs_flush; HandleCleanup(); } #endif // // Be sure that any outstanding data from // the transport is appended to our own // byffer. // if (transport_ -> pending() != 0) { #ifdef WARNING *logofs << "ReadBuffer: WARNING! Class for FD#" << transport_ -> fd() << " has pending " << "data in the transport while " << "borrowing from the caller.\n" << logofs_flush; #endif readMessage(); if (owner_ == 0) { convertBuffer(); } } // // Can't borrow the buffer if there is data // from a partial message. In this case add // the new data to the end of our buffer. // if (length_ == 0) { #ifdef TEST *logofs << "ReadBuffer: Borrowing " << length << " bytes from the caller for FD#" << transport_ -> fd() << " with " << length_ << " bytes in the buffer.\n" << logofs_flush; #endif delete [] buffer_; buffer_ = (unsigned char *) message; size_ = length; length_ = length; owner_ = 0; start_ = 0; } else { #ifdef TEST *logofs << "ReadBuffer: Appending " << length << " bytes from the caller for FD#" << transport_ -> fd() << " with " << length_ << " bytes in the buffer.\n" << logofs_flush; #endif appendBuffer(message, length); } } int ReadBuffer::readMessage() { int pendingLength = transport_ -> pending(); if (pendingLength > 0) { // // Can't move the data in the borrowed buffer, // so use the tansport buffer only if we don't // have any partial message. This can happen // with the proxy where we need to deflate the // stream. // if (length_ == 0) { unsigned char *newBuffer; length_ = transport_ -> getPending(newBuffer); if (newBuffer == NULL) { #ifdef PANIC *logofs << "ReadBuffer: PANIC! Failed to borrow " << length_ << " bytes of memory for buffer " << "in context [A].\n" << logofs_flush; #endif cerr << "Error" << ": Failed to borrow memory for " << "read buffer in context [A].\n"; HandleCleanup(); } delete [] buffer_; buffer_ = newBuffer; size_ = length_; owner_ = 0; start_ = 0; #ifdef TEST *logofs << "ReadBuffer: Borrowed " << length_ << " pending bytes for FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif return length_; } #ifdef TEST else { *logofs << "ReadBuffer: WARNING! Cannot borrow " << pendingLength << " bytes for FD#" << transport_ -> fd() << " with " << length_ << " bytes in the buffer.\n" << logofs_flush; } #endif } unsigned int readLength = suggestedLength(pendingLength); #ifdef DEBUG *logofs << "ReadBuffer: Requested " << readLength << " bytes for FD#" << transport_ -> fd() << " with readable " << transport_ -> readable() << " remaining " << remaining_ << " pending " << transport_ -> pending() << ".\n" << logofs_flush; #endif if (readLength < initialReadSize_) { readLength = initialReadSize_; } #ifdef DEBUG *logofs << "ReadBuffer: Buffer size is " << size_ << " length " << length_ << " and start " << start_ << ".\n" << logofs_flush; #endif // // We can't use the transport buffer // to read our own data in it. // #ifdef TEST if (owner_ == 0) { *logofs << "ReadBuffer: PANIC! Class for FD#" << transport_ -> fd() << " doesn't " << "appear to be the owner of the buffer " << "while reading.\n" << logofs_flush; HandleCleanup(); } #endif // // Be sure that we have enough space // to store all the requested data. // if (buffer_ == NULL || length_ + readLength > size_) { unsigned int newSize = length_ + readLength; #ifdef TEST *logofs << "ReadBuffer: Resizing buffer for FD#" << transport_ -> fd() << " in read from " << size_ << " to " << newSize << " bytes.\n" << logofs_flush; #endif unsigned char *newBuffer = allocateBuffer(newSize); memcpy(newBuffer, buffer_ + start_, length_); delete [] buffer_; buffer_ = newBuffer; size_ = newSize; transport_ -> pendingReset(); owner_ = 1; } else if (start_ != 0 && length_ != 0) { // // If any bytes are left due to a partial // message, shift them to the beginning // of the buffer. // #ifdef TEST *logofs << "ReadBuffer: Moving " << length_ << " bytes of data " << "at beginning of " << "the buffer for FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif memmove(buffer_, buffer_ + start_, length_); } start_ = 0; #ifdef DEBUG *logofs << "ReadBuffer: Buffer size is now " << size_ << " length is " << length_ << " and start is " << start_ << ".\n" << logofs_flush; #endif unsigned char *readData = buffer_ + length_; #ifdef DEBUG *logofs << "ReadBuffer: Going to read " << readLength << " bytes from FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif int bytesRead = transport_ -> read(readData, readLength); if (bytesRead > 0) { #ifdef TEST *logofs << "ReadBuffer: Read " << bytesRead << " bytes from FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif length_ += bytesRead; } else if (bytesRead < 0) { // // Check if there is more data pending than the // size of the provided buffer. After reading // the requested amount, in fact, the transport // may have decompressed the data and produced // more input. This trick allows us to always // borrow the buffer from the transport, even // when the partial read would have prevented // that. // if (transport_ -> pending() > 0) { #ifdef TEST *logofs << "ReadBuffer: WARNING! Trying to read some " << "more with " << transport_ -> pending() << " bytes pending for FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif return readMessage(); } #ifdef TEST *logofs << "ReadBuffer: Error detected reading " << "from FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif return -1; } #ifdef TEST else { *logofs << "ReadBuffer: No data read from FD#" << transport_ -> fd() << " with remaining " << remaining_ << ".\n" << logofs_flush; } #endif return bytesRead; } const unsigned char *ReadBuffer::getMessage(unsigned int &controlLength, unsigned int &dataLength) { #ifdef TEST if (transport_ -> pending() > 0) { *logofs << "ReadBuffer: PANIC! The transport " << "appears to have data pending.\n" << logofs_flush; HandleCleanup(); } #endif if (length_ == 0) { #ifdef DEBUG *logofs << "ReadBuffer: No message can be located " << "for FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif if (owner_ == 0) { buffer_ = NULL; size_ = 0; transport_ -> pendingReset(); owner_ = 1; start_ = 0; } return NULL; } unsigned int trailerLength; #ifdef DEBUG *logofs << "ReadBuffer: Going to locate message with " << "start at " << start_ << " and length " << length_ << " for FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif int located = locateMessage(buffer_ + start_, buffer_ + start_ + length_, controlLength, dataLength, trailerLength); if (located == 0) { // // No more complete messages are in // the buffer. // #ifdef DEBUG *logofs << "ReadBuffer: No message was located " << "for FD#" << transport_ -> fd() << ".\n" << logofs_flush; #endif if (owner_ == 0) { // // Must move the remaining bytes in // our own buffer. // convertBuffer(); } return NULL; } else { const unsigned char *result = buffer_ + start_; if (dataLength > 0) { // // Message contains data, so go to the // first byte of payload. // result += trailerLength; start_ += (dataLength + trailerLength); length_ -= (dataLength + trailerLength); } else { // // It is a control message. // start_ += (controlLength + trailerLength); length_ -= (controlLength + trailerLength); } #ifdef DEBUG *logofs << "ReadBuffer: Located message for FD#" << transport_ -> fd() << " with control length " << controlLength << " and data length " << dataLength << ".\n" << logofs_flush; #endif remaining_ = 0; return result; } } int ReadBuffer::setSize(int initialReadSize, int maximumBufferSize) { initialReadSize_ = initialReadSize; maximumBufferSize_ = maximumBufferSize; #ifdef TEST *logofs << "ReadBuffer: WARNING! Set buffer parameters to " << initialReadSize_ << "/" << maximumBufferSize_ << " for object at "<< this << ".\n" << logofs_flush; #endif return 1; } void ReadBuffer::fullReset() { #ifdef TEST if (owner_ == 0) { *logofs << "ReadBuffer: PANIC! Class for FD#" << transport_ -> fd() << " doesn't " << "appear to be the owner of the buffer " << "in reset.\n" << logofs_flush; HandleCleanup(); } #endif if (length_ == 0 && size_ > maximumBufferSize_) { #ifdef TEST *logofs << "ReadBuffer: Resizing buffer for FD#" << transport_ -> fd() << " in reset from " << size_ << " to " << maximumBufferSize_ << " bytes.\n" << logofs_flush; #endif delete [] buffer_; int newSize = maximumBufferSize_; unsigned char *newBuffer = allocateBuffer(newSize); buffer_ = newBuffer; size_ = newSize; transport_ -> pendingReset(); owner_ = 1; start_ = 0; } } unsigned char *ReadBuffer::allocateBuffer(unsigned int newSize) { unsigned char *newBuffer = new unsigned char[newSize]; if (newBuffer == NULL) { #ifdef PANIC *logofs << "ReadBuffer: PANIC! Can't allocate " << newSize << " bytes of memory for buffer " << "in context [B].\n" << logofs_flush; #endif cerr << "Error" << ": Can't allocate memory for " << "read buffer in context [B].\n"; HandleCleanup(); } #ifdef VALGRIND memset(newBuffer, '\0', newSize); #endif return newBuffer; } void ReadBuffer::appendBuffer(const unsigned char *message, unsigned int length) { if (start_ + length_ + length > size_) { unsigned int newSize = length_ + length + initialReadSize_; #ifdef TEST *logofs << "ReadBuffer: WARNING! Resizing buffer " << "for FD#" << transport_ -> fd() << " from " << size_ << " to " << newSize << " bytes.\n" << logofs_flush; #endif unsigned char *newBuffer = allocateBuffer(newSize); memcpy(newBuffer, buffer_ + start_, length_); delete [] buffer_; buffer_ = newBuffer; size_ = newSize; start_ = 0; } memcpy(buffer_ + start_ + length_, message, length); length_ += length; transport_ -> pendingReset(); owner_ = 1; } void ReadBuffer::convertBuffer() { unsigned int newSize = length_ + initialReadSize_; #ifdef TEST *logofs << "ReadBuffer: WARNING! Converting " << length_ << " bytes to own buffer " << "for FD#" << transport_ -> fd() << " with new size " << newSize << " bytes.\n" << logofs_flush; #endif unsigned char *newBuffer = allocateBuffer(newSize); memcpy(newBuffer, buffer_ + start_, length_); buffer_ = newBuffer; size_ = newSize; transport_ -> pendingReset(); owner_ = 1; start_ = 0; }