diff options
Diffstat (limited to 'nxcomp/src/Transport.cpp')
-rw-r--r-- | nxcomp/src/Transport.cpp | 3068 |
1 files changed, 3068 insertions, 0 deletions
diff --git a/nxcomp/src/Transport.cpp b/nxcomp/src/Transport.cpp new file mode 100644 index 000000000..e34544994 --- /dev/null +++ b/nxcomp/src/Transport.cpp @@ -0,0 +1,3068 @@ +/**************************************************************************/ +/* */ +/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */ +/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */ +/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */ +/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */ +/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/ +/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */ +/* */ +/* NXCOMP, NX protocol compression and NX extensions to this software */ +/* are copyright of the aforementioned persons and companies. */ +/* */ +/* Redistribution and use of the present software is allowed according */ +/* to terms specified in the file LICENSE.nxcomp which comes in the */ +/* source distribution. */ +/* */ +/* All rights reserved. */ +/* */ +/* NOTE: This software has received contributions from various other */ +/* contributors, only the core maintainers and supporters are listed as */ +/* copyright holders. Please contact us, if you feel you should be listed */ +/* as copyright holder, as well. */ +/* */ +/**************************************************************************/ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <signal.h> +#include <sys/socket.h> + +#include "Transport.h" + +#include "Statistics.h" + +// +// Set the verbosity level. You also +// need to define DUMP in Misc.cpp +// if DUMP is defined here. +// + +#define PANIC +#define WARNING +#undef TEST +#undef DEBUG +#undef INSPECT +#undef DUMP + +// +// Used to lock and unlock the transport +// buffers before they are accessed by +// different threads. +// + +#undef THREADS + +// +// Define this to get logging all the +// operations performed by the parent +// thread, the one that enqueues and +// dequeues data. +// + +#define PARENT + +// +// Define this to know when a channel +// is created or destroyed. +// + +#undef REFERENCES + +// +// Reference count for allocated buffers. +// + +#ifdef REFERENCES + +int Transport::references_; +int ProxyTransport::references_; +int InternalTransport::references_; + +#endif + +// +// This is the base class providing methods for read +// and write buffering. +// + +Transport::Transport(int fd) : fd_(fd) +{ + #ifdef TEST + *logofs << "Transport: Going to create base transport " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + type_ = transport_base; + + // + // Set up the write buffer. + // + + w_buffer_.length_ = 0; + w_buffer_.start_ = 0; + + initialSize_ = TRANSPORT_BUFFER_DEFAULT_SIZE; + thresholdSize_ = TRANSPORT_BUFFER_DEFAULT_SIZE << 1; + maximumSize_ = TRANSPORT_BUFFER_DEFAULT_SIZE << 4; + + w_buffer_.data_.resize(initialSize_); + + // + // Set non-blocking IO on socket. + // + + SetNonBlocking(fd_, 1); + + blocked_ = 0; + finish_ = 0; + + #ifdef REFERENCES + *logofs << "Transport: Created new object at " + << this << " out of " << ++references_ + << " allocated references.\n" << logofs_flush; + #endif +} + +Transport::~Transport() +{ + #ifdef TEST + *logofs << "Transport: Going to destroy base class " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + ::close(fd_); + + #ifdef REFERENCES + *logofs << "Transport: Deleted object at " + << this << " out of " << --references_ + << " allocated references.\n" << logofs_flush; + #endif +} + +// +// Read data from its file descriptor. +// + +int Transport::read(unsigned char *data, unsigned int size) +{ + #ifdef DEBUG + *logofs << "Transport: Going to read " << size << " bytes from " + << "FD#" << fd_ << ".\n" << logofs_flush; + #endif + + // + // Read the available data from the socket. + // + + int result = ::read(fd_, data, size); + + // + // Update the current timestamp as the read + // can have scheduled some other process. + // + + getNewTimestamp(); + + if (result < 0) + { + if (EGET() == EAGAIN) + { + #ifdef TEST + *logofs << "Transport: WARNING! Read of " << size << " bytes from " + << "FD#" << fd_ << " would block.\n" << logofs_flush; + #endif + + return 0; + } + else if (EGET() == EINTR) + { + #ifdef TEST + *logofs << "Transport: Read of " << size << " bytes from " + << "FD#" << fd_ << " was interrupted.\n" + << logofs_flush; + #endif + + return 0; + } + else + { + #ifdef TEST + *logofs << "Transport: Error reading from " + << "FD#" << fd_ << ".\n" << logofs_flush; + #endif + + finish(); + + return -1; + } + } + else if (result == 0) + { + #ifdef TEST + *logofs << "Transport: No data read from " + << "FD#" << fd_ << ".\n" << logofs_flush; + #endif + + finish(); + + return -1; + } + + #ifdef TEST + *logofs << "Transport: Read " << result << " bytes out of " + << size << " from FD#" << fd_ << ".\n" << logofs_flush; + #endif + + #ifdef DUMP + + *logofs << "Transport: Dumping content of read data.\n" + << logofs_flush; + + DumpData(data, result); + + #endif + + return result; +} + +// +// Write as many bytes as possible to socket. +// Append the remaining data bytes to the end +// of the buffer and update length to reflect +// changes. +// + +int Transport::write(T_write type, const unsigned char *data, const unsigned int size) +{ + // + // If an immediate write was requested then + // flush the enqueued data first. + // + // Alternatively may try to write only if + // the socket is not blocked. + // + // if (w_buffer_.length_ > 0 && blocked_ == 0 && + // type == write_immediate) + // { + // ... + // } + // + + if (w_buffer_.length_ > 0 && type == write_immediate) + + { + #ifdef TEST + *logofs << "Transport: Writing " << w_buffer_.length_ + << " bytes of previous data to FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + int result = Transport::flush(); + + if (result < 0) + { + return -1; + } + } + + // + // If nothing is remained, write immediately + // to the socket. + // + + unsigned int written = 0; + + if (w_buffer_.length_ == 0 && blocked_ == 0 && + type == write_immediate) + { + // + // Limit the amount of data sent. + // + + unsigned int toWrite = size; + + #ifdef DUMP + + *logofs << "Transport: Going to write " << toWrite + << " bytes to FD#" << fd_ << " with checksum "; + + DumpChecksum(data, size); + + *logofs << ".\n" << logofs_flush; + + #endif + + T_timestamp writeTs; + + int diffTs; + + while (written < toWrite) + { + // + // Trace system time spent writing data. + // + + writeTs = getTimestamp(); + + int result = ::write(fd_, data + written, toWrite - written); + + diffTs = diffTimestamp(writeTs, getNewTimestamp()); + + statistics -> addWriteTime(diffTs); + + if (result <= 0) + { + if (EGET() == EAGAIN) + { + #ifdef TEST + *logofs << "Transport: Write of " << toWrite - written + << " bytes on FD#" << fd_ << " would block.\n" + << logofs_flush; + #endif + + blocked_ = 1; + + break; + } + else if (EGET() == EINTR) + { + #ifdef TEST + *logofs << "Transport: Write of " << toWrite - written + << " bytes on FD#" << fd_ << " was interrupted.\n" + << logofs_flush; + #endif + + continue; + } + else + { + #ifdef TEST + *logofs << "Transport: Write to " << "FD#" + << fd_ << " failed.\n" << logofs_flush; + #endif + + finish(); + + return -1; + } + } + else + { + #ifdef TEST + *logofs << "Transport: Immediately written " << result + << " bytes on " << "FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + written += result; + } + } + + #ifdef DUMP + + if (written > 0) + { + *logofs << "Transport: Dumping content of immediately written data.\n" + << logofs_flush; + + DumpData(data, written); + } + + #endif + } + + if (written == size) + { + // + // We will not affect the write buffer. + // + + return written; + } + + #ifdef DEBUG + *logofs << "Transport: Going to append " << size - written + << " bytes to write buffer for " << "FD#" << fd_ + << ".\n" << logofs_flush; + #endif + + if (resize(w_buffer_, size - written) < 0) + { + return -1; + } + + memmove(w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_, + data + written, size - written); + + w_buffer_.length_ += size - written; + + #ifdef TEST + *logofs << "Transport: Write buffer for FD#" << fd_ + << " has data for " << w_buffer_.length_ << " bytes.\n" + << logofs_flush; + + *logofs << "Transport: Start is " << w_buffer_.start_ + << " length is " << w_buffer_.length_ << " size is " + << w_buffer_.data_.size() << " capacity is " + << w_buffer_.data_.capacity() << ".\n" + << logofs_flush; + #endif + + // + // Note that this function always returns the whole + // size of buffer that was provided, either if not + // all the data could be actually written. + // + + return size; +} + +// +// Write pending data to its file descriptor. +// + +int Transport::flush() +{ + if (w_buffer_.length_ == 0) + { + #ifdef TEST + *logofs << "Transport: No data to flush on " + << "FD#" << fd_ << ".\n" << logofs_flush; + #endif + + #ifdef WARNING + if (blocked_ != 0) + { + *logofs << "Transport: Blocked flag is " << blocked_ + << " with no data to flush on FD#" << fd_ + << ".\n" << logofs_flush; + } + #endif + + return 0; + } + + // + // It's time to move data from the + // write buffer to the real link. + // + + int written = 0; + + int toWrite = w_buffer_.length_; + + // + // We will do our best to write any available + // data to the socket, so let's say we start + // from a clean state. + // + + blocked_ = 0; + + #ifdef TEST + *logofs << "Transport: Going to flush " << toWrite + << " bytes on FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + T_timestamp writeTs; + + int diffTs; + + while (written < toWrite) + { + writeTs = getTimestamp(); + + int result = ::write(fd_, w_buffer_.data_.begin() + w_buffer_.start_ + + written, toWrite - written); + + diffTs = diffTimestamp(writeTs, getNewTimestamp()); + + statistics -> addWriteTime(diffTs); + + if (result <= 0) + { + if (EGET() == EAGAIN) + { + #ifdef TEST + *logofs << "Transport: Write of " << toWrite - written + << " bytes on FD#" << fd_ << " would block.\n" + << logofs_flush; + #endif + + blocked_ = 1; + + break; + } + else if (EGET() == EINTR) + { + #ifdef TEST + *logofs << "Transport: Write of " << toWrite - written + << " bytes on FD#" << fd_ << " was interrupted.\n" + << logofs_flush; + #endif + + continue; + } + else + { + #ifdef TEST + *logofs << "Transport: Write to " << "FD#" + << fd_ << " failed.\n" << logofs_flush; + #endif + + finish(); + + return -1; + } + } + else + { + #ifdef TEST + *logofs << "Transport: Flushed " << result << " bytes on " + << "FD#" << fd_ << ".\n" << logofs_flush; + #endif + + written += result; + } + } + + if (written > 0) + { + #ifdef DUMP + + *logofs << "Transport: Dumping content of flushed data.\n" + << logofs_flush; + + DumpData(w_buffer_.data_.begin() + w_buffer_.start_, written); + + #endif + + // + // Update the buffer status. + // + + w_buffer_.length_ -= written; + + if (w_buffer_.length_ == 0) + { + w_buffer_.start_ = 0; + } + else + { + w_buffer_.start_ += written; + } + } + + // + // It can be that we wrote less bytes than + // available because of the write limit. + // + + if (w_buffer_.length_ > 0) + { + #ifdef TEST + *logofs << "Transport: There are still " << w_buffer_.length_ + << " bytes in write buffer for " << "FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + blocked_ = 1; + } + + #ifdef TEST + *logofs << "Transport: Write buffer for FD#" << fd_ + << " has data for " << w_buffer_.length_ << " bytes.\n" + << logofs_flush; + + *logofs << "Transport: Start is " << w_buffer_.start_ + << " length is " << w_buffer_.length_ << " size is " + << w_buffer_.data_.size() << " capacity is " + << w_buffer_.data_.capacity() << ".\n" + << logofs_flush; + #endif + + // + // No new data was produced for the link except + // any outstanding data from previous writes. + // + + return 0; +} + +int Transport::drain(int limit, int timeout) +{ + if (w_buffer_.length_ <= limit) + { + return 1; + } + + // + // Write the data accumulated in the write + // buffer until it is below the limit or + // the timeout is elapsed. + // + + int toWrite = w_buffer_.length_; + + int written = 0; + + #ifdef TEST + *logofs << "Transport: Draining " << toWrite - limit + << " bytes on FD#" << fd_ << " with limit set to " + << limit << ".\n" << logofs_flush; + #endif + + T_timestamp startTs = getNewTimestamp(); + + T_timestamp selectTs; + T_timestamp writeTs; + T_timestamp idleTs; + + T_timestamp nowTs = startTs; + + int diffTs; + + fd_set writeSet; + fd_set readSet; + + FD_ZERO(&writeSet); + FD_ZERO(&readSet); + + int result; + int ready; + + while (w_buffer_.length_ - written > limit) + { + nowTs = getNewTimestamp(); + + // + // Wait for descriptor to become + // readable or writable. + // + + FD_SET(fd_, &writeSet); + FD_SET(fd_, &readSet); + + setTimestamp(selectTs, timeout / 2); + + idleTs = nowTs; + + result = select(fd_ + 1, &readSet, &writeSet, NULL, &selectTs); + + nowTs = getNewTimestamp(); + + diffTs = diffTimestamp(idleTs, nowTs); + + statistics -> addIdleTime(diffTs); + + statistics -> subReadTime(diffTs); + + if (result < 0) + { + if (EGET() == EINTR) + { + #ifdef TEST + *logofs << "Transport: Select on FD#" << fd_ + << " was interrupted.\n" << logofs_flush; + #endif + + continue; + } + else + { + #ifdef TEST + *logofs << "Transport: Select on FD#" << fd_ + << " failed.\n" << logofs_flush; + #endif + + finish(); + + return -1; + } + } + else if (result > 0) + { + ready = result; + + if (FD_ISSET(fd_, &writeSet)) + { + writeTs = getNewTimestamp(); + + result = ::write(fd_, w_buffer_.data_.begin() + w_buffer_.start_ + + written, toWrite - written); + + nowTs = getNewTimestamp(); + + diffTs = diffTimestamp(writeTs, nowTs); + + statistics -> addWriteTime(diffTs); + + if (result > 0) + { + #ifdef TEST + *logofs << "Transport: Forced flush of " << result + << " bytes on " << "FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + written += result; + } + else if (result < 0 && EGET() == EINTR) + { + #ifdef TEST + *logofs << "Transport: Write to FD#" << fd_ + << " was interrupted.\n" << logofs_flush; + #endif + + continue; + } + else + { + #ifdef TEST + *logofs << "Transport: Write to FD#" << fd_ + << " failed.\n" << logofs_flush; + #endif + + finish(); + + return -1; + } + + ready--; + } + + if (ready > 0) + { + if (FD_ISSET(fd_, &readSet)) + { + #ifdef TEST + *logofs << "Transport: Not draining further " + << "due to data readable on FD#" << fd_ + << ".\n" << logofs_flush; + #endif + + break; + } + } + } + #ifdef TEST + else + { + *logofs << "Transport: Timeout encountered " + << "waiting for FD#" << fd_ << ".\n" + << logofs_flush; + } + #endif + + nowTs = getNewTimestamp(); + + diffTs = diffTimestamp(startTs, nowTs); + + if (diffTs >= timeout) + { + #ifdef TEST + *logofs << "Transport: Not draining further " + << "due to the timeout on FD#" << fd_ + << ".\n" << logofs_flush; + #endif + + break; + } + } + + if (written > 0) + { + #ifdef DUMP + + *logofs << "Transport: Dumping content of flushed data.\n" + << logofs_flush; + + DumpData(w_buffer_.data_.begin() + w_buffer_.start_, written); + + #endif + + // + // Update the buffer status. + // + + w_buffer_.length_ -= written; + + if (w_buffer_.length_ == 0) + { + w_buffer_.start_ = 0; + + blocked_ = 0; + } + else + { + w_buffer_.start_ += written; + + #ifdef TEST + *logofs << "Transport: There are still " << w_buffer_.length_ + << " bytes in write buffer for " << "FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + blocked_ = 1; + } + } + #ifdef TEST + else + { + *logofs << "Transport: WARNING! No data written to FD#" << fd_ + << " with " << toWrite << " bytes to drain and limit " + << "set to " << limit << ".\n" << logofs_flush; + } + #endif + + #ifdef TEST + *logofs << "Transport: Write buffer for FD#" << fd_ + << " has data for " << w_buffer_.length_ << " bytes.\n" + << logofs_flush; + + *logofs << "Transport: Start is " << w_buffer_.start_ + << " length is " << w_buffer_.length_ << " size is " + << w_buffer_.data_.size() << " capacity is " + << w_buffer_.data_.capacity() << ".\n" + << logofs_flush; + #endif + + return (w_buffer_.length_ <= limit); +} + +int Transport::wait(int timeout) const +{ + T_timestamp startTs = getNewTimestamp(); + + T_timestamp idleTs; + T_timestamp selectTs; + + T_timestamp nowTs = startTs; + + long available = 0; + int result = 0; + + int diffTs; + + fd_set readSet; + + FD_ZERO(&readSet); + FD_SET(fd_, &readSet); + + for (;;) + { + available = readable(); + + diffTs = diffTimestamp(startTs, nowTs); + + if (available != 0 || timeout == 0 || + (diffTs + (timeout / 10)) >= timeout) + { + #ifdef TEST + *logofs << "Transport: There are " << available + << " bytes on FD#" << fd_ << " after " + << diffTs << " Ms.\n" << logofs_flush; + #endif + + return available; + } + else if (available == 0 && result > 0) + { + #ifdef TEST + *logofs << "Transport: Read on " << "FD#" + << fd_ << " failed.\n" << logofs_flush; + #endif + + return -1; + } + + // + // TODO: Should subtract the time + // already spent in select. + // + + selectTs.tv_sec = 0; + selectTs.tv_usec = timeout * 1000; + + idleTs = nowTs; + + // + // Wait for descriptor to become readable. + // + + result = select(fd_ + 1, &readSet, NULL, NULL, &selectTs); + + nowTs = getNewTimestamp(); + + diffTs = diffTimestamp(idleTs, nowTs); + + statistics -> addIdleTime(diffTs); + + statistics -> subReadTime(diffTs); + + if (result < 0) + { + if (EGET() == EINTR) + { + #ifdef TEST + *logofs << "Transport: Select on FD#" << fd_ + << " was interrupted.\n" << logofs_flush; + #endif + + continue; + } + else + { + #ifdef TEST + *logofs << "Transport: Select on " << "FD#" + << fd_ << " failed.\n" << logofs_flush; + #endif + + return -1; + } + } + #ifdef TEST + else if (result == 0) + { + *logofs << "Transport: No data available on FD#" << fd_ + << " after " << diffTimestamp(startTs, nowTs) + << " Ms.\n" << logofs_flush; + } + else + { + *logofs << "Transport: Data became available on FD#" << fd_ + << " after " << diffTimestamp(startTs, nowTs) + << " Ms.\n" << logofs_flush; + } + #endif + } +} + +void Transport::setSize(unsigned int initialSize, unsigned int thresholdSize, + unsigned int maximumSize) +{ + initialSize_ = initialSize; + thresholdSize_ = thresholdSize; + maximumSize_ = maximumSize; + + #ifdef TEST + *logofs << "Transport: Set buffer sizes for FD#" << fd_ + << " to " << initialSize_ << "/" << thresholdSize_ + << "/" << maximumSize_ << ".\n" << logofs_flush; + #endif +} + +void Transport::fullReset() +{ + blocked_ = 0; + finish_ = 0; + + fullReset(w_buffer_); +} + +int Transport::resize(T_buffer &buffer, const int &size) +{ + if ((int) buffer.data_.size() >= (buffer.length_ + size) && + (buffer.start_ + buffer.length_ + size) > + (int) buffer.data_.size()) + { + if (buffer.length_ > 0) + { + // + // There is enough space in buffer but we need + // to move existing data at the beginning. + // + + #ifdef TEST + *logofs << "Transport: Moving " << buffer.length_ + << " bytes of data for " << "FD#" << fd_ + << " to make room in the buffer.\n" + << logofs_flush; + #endif + + memmove(buffer.data_.begin(), buffer.data_.begin() + + buffer.start_, buffer.length_); + } + + buffer.start_ = 0; + + #ifdef DEBUG + *logofs << "Transport: Made room for " + << buffer.data_.size() - buffer.start_ + << " bytes in buffer for " << "FD#" + << fd_ << ".\n" << logofs_flush; + #endif + } + else if ((buffer.length_ + size) > (int) buffer.data_.size()) + { + // + // Not enough space, so increase + // the size of the buffer. + // + + if (buffer.start_ != 0 && buffer.length_ > 0) + { + #ifdef TEST + *logofs << "Transport: Moving " << buffer.length_ + << " bytes of data for " << "FD#" << fd_ + << " to resize the buffer.\n" + << logofs_flush; + #endif + + memmove(buffer.data_.begin(), buffer.data_.begin() + + buffer.start_, buffer.length_); + } + + buffer.start_ = 0; + + unsigned int newSize = thresholdSize_; + + while (newSize < (unsigned int) buffer.length_ + size) + { + newSize <<= 1; + + if (newSize >= maximumSize_) + { + newSize = buffer.length_ + size + initialSize_; + } + } + + #ifdef DEBUG + *logofs << "Transport: Buffer for " << "FD#" << fd_ + << " will be enlarged from " << buffer.data_.size() + << " to at least " << buffer.length_ + size + << " bytes.\n" << logofs_flush; + #endif + + buffer.data_.resize(newSize); + + #ifdef TEST + if (newSize >= maximumSize_) + { + *logofs << "Transport: WARNING! Buffer for FD#" << fd_ + << " grown to reach size of " << newSize + << " bytes.\n" << logofs_flush; + } + #endif + + #ifdef TEST + *logofs << "Transport: Data buffer for " << "FD#" + << fd_ << " has now size " << buffer.data_.size() + << " and capacity " << buffer.data_.capacity() + << ".\n" << logofs_flush; + #endif + } + + return (buffer.length_ + size); +} + +void Transport::fullReset(T_buffer &buffer) +{ + // + // Force deallocation and allocation + // of the initial size. + // + + #ifdef TEST + *logofs << "Transport: Resetting buffer for " << "FD#" + << fd_ << " with size " << buffer.data_.size() + << " and capacity " << buffer.data_.capacity() + << ".\n" << logofs_flush; + #endif + + buffer.start_ = 0; + buffer.length_ = 0; + + if (buffer.data_.size() > (unsigned int) initialSize_ && + buffer.data_.capacity() > (unsigned int) initialSize_) + { + buffer.data_.clear(); + + buffer.data_.resize(initialSize_); + + #ifdef TEST + *logofs << "Transport: Data buffer for " << "FD#" + << fd_ << " shrunk to size " << buffer.data_.size() + << " and capacity " << buffer.data_.capacity() + << ".\n" << logofs_flush; + #endif + } +} + +ProxyTransport::ProxyTransport(int fd) : Transport(fd) +{ + #ifdef TEST + *logofs << "ProxyTransport: Going to create proxy transport " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + type_ = transport_proxy; + + // + // Set up the read buffer. + // + + r_buffer_.length_ = 0; + r_buffer_.start_ = 0; + + r_buffer_.data_.resize(initialSize_); + + // + // For now we own the buffer. + // + + owner_ = 1; + + // + // Set up ZLIB compression. + // + + int result; + + r_stream_.zalloc = NULL; + r_stream_.zfree = NULL; + r_stream_.opaque = NULL; + + r_stream_.next_in = NULL; + r_stream_.avail_in = 0; + + if ((result = inflateInit2(&r_stream_, 15)) != Z_OK) + { + #ifdef PANIC + *logofs << "ProxyTransport: PANIC! Failed initialization of ZLIB read stream. " + << "Error is '" << zError(result) << "'.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Failed initialization of ZLIB read stream. " + << "Error is '" << zError(result) << "'.\n"; + + HandleCleanup(); + } + + if (control -> LocalStreamCompression) + { + w_stream_.zalloc = NULL; + w_stream_.zfree = NULL; + w_stream_.opaque = NULL; + + if ((result = deflateInit2(&w_stream_, control -> LocalStreamCompressionLevel, Z_DEFLATED, + 15, 9, Z_DEFAULT_STRATEGY)) != Z_OK) + { + #ifdef PANIC + *logofs << "ProxyTransport: PANIC! Failed initialization of ZLIB write stream. " + << "Error is '" << zError(result) << "'.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Failed initialization of ZLIB write stream. " + << "Error is '" << zError(result) << "'.\n"; + + HandleCleanup(); + } + } + + // + // No ZLIB stream to flush yet. + // + + flush_ = 0; + + #ifdef REFERENCES + *logofs << "ProxyTransport: Created new object at " + << this << " out of " << ++references_ + << " allocated references.\n" << logofs_flush; + #endif +} + +ProxyTransport::~ProxyTransport() +{ + #ifdef TEST + *logofs << "ProxyTransport: Going to destroy derived class " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + // + // Deallocate the ZLIB stream state. + // + + inflateEnd(&r_stream_); + + if (control -> LocalStreamCompression) + { + deflateEnd(&w_stream_); + } + + #ifdef REFERENCES + *logofs << "ProxyTransport: Deleted object at " + << this << " out of " << --references_ + << " allocated references.\n" << logofs_flush; + #endif +} + +// +// Read data from its file descriptor. +// + +int ProxyTransport::read(unsigned char *data, unsigned int size) +{ + // + // If the remote peer is not compressing + // the stream then just return any byte + // read from the socket. + // + + if (control -> RemoteStreamCompression == 0) + { + int result = Transport::read(data, size); + + if (result <= 0) + { + return result; + } + + statistics -> addBytesIn(result); + + return result; + } + + // + // Return any pending data first. + // + + if (r_buffer_.length_ > 0) + { + // + // If the size of the buffer doesn't + // match the amount of data pending, + // force the caller to retry. + // + + if ((int) size < r_buffer_.length_) + { + #ifdef TEST + *logofs << "ProxyTransport: WARNING! Forcing a retry with " + << r_buffer_.length_ << " bytes pending and " + << size << " in the buffer.\n" + << logofs_flush; + #endif + + ESET(EAGAIN); + + return -1; + } + + int copied = (r_buffer_.length_ > ((int) size) ? + ((int) size) : r_buffer_.length_); + + memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied); + + // + // Update the buffer status. + // + + #ifdef DEBUG + *logofs << "ProxyTransport: Going to immediately return " << copied + << " bytes from proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + r_buffer_.length_ -= copied; + + if (r_buffer_.length_ == 0) + { + r_buffer_.start_ = 0; + } + else + { + r_buffer_.start_ += copied; + + #ifdef TEST + *logofs << "ProxyTransport: There are still " << r_buffer_.length_ + << " bytes in read buffer for proxy " << "FD#" + << fd_ << ".\n" << logofs_flush; + #endif + } + + return copied; + } + + // + // Read data in the user buffer. + // + + int result = Transport::read(data, size); + + if (result <= 0) + { + return result; + } + + statistics -> addBytesIn(result); + + // + // Decompress the data into the read + // buffer. + // + + #ifdef DEBUG + *logofs << "ProxyTransport: Going to decompress data for " + << "proxy FD#" << fd_ << ".\n" << logofs_flush; + #endif + + int saveTotalIn = r_stream_.total_in; + int saveTotalOut = r_stream_.total_out; + + int oldTotalIn = saveTotalIn; + int oldTotalOut = saveTotalOut; + + int diffTotalIn; + int diffTotalOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn + << ".\n" << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut + << ".\n" << logofs_flush; + #endif + + r_stream_.next_in = (Bytef *) data; + r_stream_.avail_in = result; + + // + // Let ZLIB use all the space already + // available in the buffer. + // + + unsigned int newAvailOut = r_buffer_.data_.size() - r_buffer_.start_ - + r_buffer_.length_; + + #ifdef TEST + *logofs << "ProxyTransport: Initial decompress buffer is " + << newAvailOut << " bytes.\n" << logofs_flush; + #endif + + for (;;) + { + #ifdef INSPECT + *logofs << "\nProxyTransport: Running the decompress loop.\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_buffer_.length_ = " << r_buffer_.length_ + << ".\n" << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_buffer_.data_.size() = " << r_buffer_.data_.size() + << ".\n" << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: newAvailOut = " << newAvailOut + << ".\n" << logofs_flush; + #endif + + if (resize(r_buffer_, newAvailOut) < 0) + { + return -1; + } + + #ifdef INSPECT + *logofs << "ProxyTransport: r_buffer_.data_.size() = " + << r_buffer_.data_.size() << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_stream_.next_in = " + << (void *) r_stream_.next_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_stream_.avail_in = " + << r_stream_.avail_in << ".\n" + << logofs_flush; + #endif + + r_stream_.next_out = r_buffer_.data_.begin() + r_buffer_.start_ + + r_buffer_.length_; + + r_stream_.avail_out = newAvailOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: r_stream_.next_out = " + << (void *) r_stream_.next_out << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_stream_.avail_out = " + << r_stream_.avail_out << ".\n" + << logofs_flush; + #endif + + int result = inflate(&r_stream_, Z_SYNC_FLUSH); + + #ifdef INSPECT + *logofs << "ProxyTransport: Called inflate() result is " + << result << ".\n" << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_stream_.avail_in = " + << r_stream_.avail_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_stream_.avail_out = " + << r_stream_.avail_out << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_stream_.total_in = " + << r_stream_.total_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_stream_.total_out = " + << r_stream_.total_out << ".\n" + << logofs_flush; + #endif + + diffTotalIn = r_stream_.total_in - oldTotalIn; + diffTotalOut = r_stream_.total_out - oldTotalOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: diffTotalIn = " + << diffTotalIn << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: diffTotalOut = " + << diffTotalOut << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: r_buffer_.length_ = " + << r_buffer_.length_ << ".\n" + << logofs_flush; + #endif + + r_buffer_.length_ += diffTotalOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: r_buffer_.length_ = " + << r_buffer_.length_ << ".\n" + << logofs_flush; + #endif + + oldTotalIn = r_stream_.total_in; + oldTotalOut = r_stream_.total_out; + + if (result == Z_OK) + { + if (r_stream_.avail_in == 0 && r_stream_.avail_out > 0) + { + break; + } + } + else if (result == Z_BUF_ERROR && r_stream_.avail_out > 0 && + r_stream_.avail_in == 0) + { + #ifdef TEST + *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR decompressing data.\n" + << logofs_flush; + #endif + + break; + } + else + { + #ifdef PANIC + *logofs << "ProxyTransport: PANIC! Decompression of data failed. " + << "Error is '" << zError(result) << "'.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Decompression of data failed. Error is '" + << zError(result) << "'.\n"; + + finish(); + + return -1; + } + + // + // Add more bytes to the buffer. + // + + if (newAvailOut < thresholdSize_) + { + newAvailOut = thresholdSize_; + } + + #ifdef TEST + *logofs << "ProxyTransport: Need to add " << newAvailOut + << " bytes to the decompress buffer in read.\n" + << logofs_flush; + #endif + } + + diffTotalIn = r_stream_.total_in - saveTotalIn; + diffTotalOut = r_stream_.total_out - saveTotalOut; + + #ifdef DEBUG + *logofs << "ProxyTransport: Decompressed data from " + << diffTotalIn << " to " << diffTotalOut + << " bytes.\n" << logofs_flush; + #endif + + statistics -> addDecompressedBytes(diffTotalIn, diffTotalOut); + + // + // Check if the size of the buffer + // matches the produced data. + // + + if ((int) size < r_buffer_.length_) + { + #ifdef TEST + *logofs << "ProxyTransport: WARNING! Forcing a retry with " + << r_buffer_.length_ << " bytes pending and " + << size << " in the buffer.\n" + << logofs_flush; + #endif + + ESET(EAGAIN); + + return -1; + } + + // + // Copy the decompressed data to the + // provided buffer. + // + + int copied = (r_buffer_.length_ > ((int) size) ? + ((int) size) : r_buffer_.length_); + + #ifdef DEBUG + *logofs << "ProxyTransport: Going to return " << copied + << " bytes from proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied); + + // + // Update the buffer status. + // + + r_buffer_.length_ -= copied; + + if (r_buffer_.length_ == 0) + { + r_buffer_.start_ = 0; + } + else + { + r_buffer_.start_ += copied; + + #ifdef TEST + *logofs << "ProxyTransport: There are still " << r_buffer_.length_ + << " bytes in read buffer for proxy FD#" << fd_ + << ".\n" << logofs_flush; + #endif + } + + return copied; +} + +// +// If required compress data, else write it to socket. +// + +int ProxyTransport::write(T_write type, const unsigned char *data, const unsigned int size) +{ + #ifdef TEST + if (size == 0) + { + *logofs << "ProxyTransport: WARNING! Write called for FD#" + << fd_ << " without any data to write.\n" + << logofs_flush; + + return 0; + } + #endif + + // + // If there is no compression revert to + // plain socket management. + // + + if (control -> LocalStreamCompression == 0) + { + int result = Transport::write(type, data, size); + + if (result <= 0) + { + return result; + } + + statistics -> addBytesOut(result); + + statistics -> updateBitrate(result); + + FlushCallback(result); + + return result; + } + + #ifdef DEBUG + *logofs << "ProxyTransport: Going to compress " << size + << " bytes to write buffer for proxy FD#" << fd_ + << ".\n" << logofs_flush; + #endif + + // + // Compress data into the write buffer. + // + + int saveTotalIn = w_stream_.total_in; + int saveTotalOut = w_stream_.total_out; + + int oldTotalIn = saveTotalIn; + int oldTotalOut = saveTotalOut; + + int diffTotalIn; + int diffTotalOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn + << ".\n" << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut + << ".\n" << logofs_flush; + #endif + + w_stream_.next_in = (Bytef *) data; + w_stream_.avail_in = size; + + // + // Let ZLIB use all the space already + // available in the buffer. + // + + unsigned int newAvailOut = w_buffer_.data_.size() - w_buffer_.start_ - + w_buffer_.length_; + + #ifdef TEST + *logofs << "ProxyTransport: Initial compress buffer is " + << newAvailOut << " bytes.\n" << logofs_flush; + #endif + + for (;;) + { + #ifdef INSPECT + *logofs << "\nProxyTransport: Running the compress loop.\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.length_ = " + << w_buffer_.length_ << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.data_.size() = " + << w_buffer_.data_.size() << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: newAvailOut = " + << newAvailOut << ".\n" + << logofs_flush; + #endif + + if (resize(w_buffer_, newAvailOut) < 0) + { + return -1; + } + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.data_.size() = " + << w_buffer_.data_.size() << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.next_in = " + << (void *) w_stream_.next_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.avail_in = " + << w_stream_.avail_in << ".\n" + << logofs_flush; + #endif + + w_stream_.next_out = w_buffer_.data_.begin() + w_buffer_.start_ + + w_buffer_.length_; + + w_stream_.avail_out = newAvailOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.next_out = " + << (void *) w_stream_.next_out << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.avail_out = " + << w_stream_.avail_out << ".\n" + << logofs_flush; + #endif + + int result = deflate(&w_stream_, (type == write_delayed ? + Z_NO_FLUSH : Z_SYNC_FLUSH)); + + #ifdef INSPECT + *logofs << "ProxyTransport: Called deflate() result is " + << result << ".\n" << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.avail_in = " + << w_stream_.avail_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.avail_out = " + << w_stream_.avail_out << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.total_in = " + << w_stream_.total_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.total_out = " + << w_stream_.total_out << ".\n" + << logofs_flush; + #endif + + diffTotalOut = w_stream_.total_out - oldTotalOut; + diffTotalIn = w_stream_.total_in - oldTotalIn; + + #ifdef INSPECT + *logofs << "ProxyTransport: diffTotalIn = " + << diffTotalIn << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: diffTotalOut = " + << diffTotalOut << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.length_ = " + << w_buffer_.length_ << ".\n" + << logofs_flush; + #endif + + w_buffer_.length_ += diffTotalOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.length_ = " + << w_buffer_.length_ << ".\n" + << logofs_flush; + #endif + + oldTotalOut = w_stream_.total_out; + oldTotalIn = w_stream_.total_in; + + if (result == Z_OK) + { + if (w_stream_.avail_in == 0 && w_stream_.avail_out > 0) + { + break; + } + } + else if (result == Z_BUF_ERROR && w_stream_.avail_out > 0 && + w_stream_.avail_in == 0) + { + #ifdef TEST + *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR compressing data.\n" + << logofs_flush; + #endif + + break; + } + else + { + #ifdef PANIC + *logofs << "ProxyTransport: PANIC! Compression of data failed. " + << "Error is '" << zError(result) << "'.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Compression of data failed. Error is '" + << zError(result) << "'.\n"; + + finish(); + + return -1; + } + + // + // Add more bytes to the buffer. + // + + if (newAvailOut < thresholdSize_) + { + newAvailOut = thresholdSize_; + } + + #ifdef TEST + *logofs << "ProxyTransport: Need to add " << newAvailOut + << " bytes to the compress buffer in write.\n" + << logofs_flush; + #endif + } + + diffTotalIn = w_stream_.total_in - saveTotalIn; + diffTotalOut = w_stream_.total_out - saveTotalOut; + + #ifdef TEST + + *logofs << "ProxyTransport: Compressed data from " + << diffTotalIn << " to " << diffTotalOut + << " bytes.\n" << logofs_flush; + + if (diffTotalIn != (int) size) + { + #ifdef PANIC + *logofs << "ProxyTransport: PANIC! Bytes provided to ZLIB stream " + << "should be " << size << " but they look to be " + << diffTotalIn << ".\n" << logofs_flush; + #endif + } + + #endif + + // + // Find out what we have to do with the + // produced data. + // + + if (type == write_immediate) + { + // + // If user requested an immediate write we + // flushed the ZLIB buffer. We can now reset + // the counter and write data to socket. + // + + flush_ = 0; + + #ifdef TEST + *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_ + << " has data for " << w_buffer_.length_ << " bytes.\n" + << logofs_flush; + + *logofs << "ProxyTransport: Start is " << w_buffer_.start_ + << " length is " << w_buffer_.length_ << " flush is " + << flush_ << " size is " << w_buffer_.data_.size() + << " capacity is " << w_buffer_.data_.capacity() + << ".\n" << logofs_flush; + #endif + + // + // Alternatively may try to write only if + // the socket is not blocked. + // + // if (w_buffer_.length_ > 0 && blocked_ == 0) + // { + // ... + // } + // + + if (w_buffer_.length_ > 0) + + { + #ifdef TEST + *logofs << "ProxyTransport: Writing " << w_buffer_.length_ + << " bytes of produced data to FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + int result = Transport::flush(); + + if (result < 0) + { + return -1; + } + } + } + else + { + // + // We haven't flushed the ZLIB compression + // buffer, so user will have to call proxy + // transport's flush explicitly. + // + + flush_ += diffTotalIn; + } + + // + // We either wrote the data or added it to the + // write buffer. It's convenient to update the + // counters at this stage to get the current + // bitrate earlier. + // + + statistics -> addCompressedBytes(diffTotalIn, diffTotalOut); + + statistics -> addBytesOut(diffTotalOut); + + statistics -> updateBitrate(diffTotalOut); + + FlushCallback(diffTotalOut); + + #ifdef TEST + *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_ + << " has data for " << w_buffer_.length_ << " bytes.\n" + << logofs_flush; + + *logofs << "ProxyTransport: Start is " << w_buffer_.start_ + << " length is " << w_buffer_.length_ << " flush is " + << flush_ << " size is " << w_buffer_.data_.size() + << " capacity is " << w_buffer_.data_.capacity() + << ".\n" << logofs_flush; + #endif + + return size; +} + +// +// Write data to its file descriptor. +// + +int ProxyTransport::flush() +{ + // + // If there is no compression or we already compressed + // outgoing data and just need to write it to socket + // because of previous incomplete writes then revert + // to plain socket management. + // + + if (flush_ == 0 || control -> LocalStreamCompression == 0) + { + int result = Transport::flush(); + + if (result < 0) + { + return -1; + } + + return result; + } + + #ifdef DEBUG + *logofs << "ProxyTransport: Going to flush compression on " + << "proxy FD#" << fd_ << ".\n" << logofs_flush; + #endif + + #ifdef TEST + *logofs << "ProxyTransport: Flush counter for proxy FD#" << fd_ + << " is " << flush_ << " bytes.\n" << logofs_flush; + #endif + + // + // Flush ZLIB stream into the write buffer. + // + + int saveTotalIn = w_stream_.total_in; + int saveTotalOut = w_stream_.total_out; + + int oldTotalIn = saveTotalIn; + int oldTotalOut = saveTotalOut; + + int diffTotalOut; + int diffTotalIn; + + #ifdef INSPECT + *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn + << ".\n" << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut + << ".\n" << logofs_flush; + #endif + + w_stream_.next_in = w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_; + w_stream_.avail_in = 0; + + // + // Let ZLIB use all the space already + // available in the buffer. + // + + unsigned int newAvailOut = w_buffer_.data_.size() - w_buffer_.start_ - + w_buffer_.length_; + + #ifdef DEBUG + *logofs << "ProxyTransport: Initial flush buffer is " + << newAvailOut << " bytes.\n" << logofs_flush; + #endif + + for (;;) + { + #ifdef INSPECT + *logofs << "\nProxyTransport: Running the flush loop.\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.length_ = " + << w_buffer_.length_ << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.data_.size() = " + << w_buffer_.data_.size() << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: newAvailOut = " + << newAvailOut << ".\n" + << logofs_flush; + #endif + + if (resize(w_buffer_, newAvailOut) < 0) + { + return -1; + } + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.data_.size() = " + << w_buffer_.data_.size() << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.next_in = " + << (void *) w_stream_.next_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.avail_in = " + << w_stream_.avail_in << ".\n" + << logofs_flush; + #endif + + w_stream_.next_out = w_buffer_.data_.begin() + w_buffer_.start_ + + w_buffer_.length_; + + w_stream_.avail_out = newAvailOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.next_out = " + << (void *) w_stream_.next_out << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.avail_out = " + << w_stream_.avail_out << ".\n" + << logofs_flush; + #endif + + int result = deflate(&w_stream_, Z_SYNC_FLUSH); + + #ifdef INSPECT + *logofs << "ProxyTransport: Called deflate() result is " + << result << ".\n" << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.avail_in = " + << w_stream_.avail_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.avail_out = " + << w_stream_.avail_out << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.total_in = " + << w_stream_.total_in << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_stream_.total_out = " + << w_stream_.total_out << ".\n" + << logofs_flush; + #endif + + diffTotalOut = w_stream_.total_out - oldTotalOut; + diffTotalIn = w_stream_.total_in - oldTotalIn; + + #ifdef INSPECT + *logofs << "ProxyTransport: diffTotalIn = " + << diffTotalIn << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: diffTotalOut = " + << diffTotalOut << ".\n" + << logofs_flush; + #endif + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.length_ = " + << w_buffer_.length_ << ".\n" + << logofs_flush; + #endif + + w_buffer_.length_ += diffTotalOut; + + #ifdef INSPECT + *logofs << "ProxyTransport: w_buffer_.length_ = " + << w_buffer_.length_ << ".\n" + << logofs_flush; + #endif + + oldTotalOut = w_stream_.total_out; + oldTotalIn = w_stream_.total_in; + + if (result == Z_OK) + { + if (w_stream_.avail_in == 0 && w_stream_.avail_out > 0) + { + break; + } + } + else if (result == Z_BUF_ERROR && w_stream_.avail_out > 0 && + w_stream_.avail_in == 0) + { + #ifdef TEST + *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR flushing data.\n" + << logofs_flush; + #endif + + break; + } + else + { + #ifdef PANIC + *logofs << "ProxyTransport: PANIC! Flush of compressed data failed. " + << "Error is '" << zError(result) << "'.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Flush of compressed data failed. Error is '" + << zError(result) << "'.\n"; + + finish(); + + return -1; + } + + // + // Add more bytes to the buffer. + // + + if (newAvailOut < thresholdSize_) + { + newAvailOut = thresholdSize_; + } + + #ifdef TEST + *logofs << "ProxyTransport: Need to add " << newAvailOut + << " bytes to the compress buffer in flush.\n" + << logofs_flush; + #endif + } + + diffTotalIn = w_stream_.total_in - saveTotalIn; + diffTotalOut = w_stream_.total_out - saveTotalOut; + + #ifdef TEST + *logofs << "ProxyTransport: Compressed flush data from " + << diffTotalIn << " to " << diffTotalOut + << " bytes.\n" << logofs_flush; + #endif + + // + // Time to move data from the write + // buffer to the real link. + // + + #ifdef DEBUG + *logofs << "ProxyTransport: Reset flush counter for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + flush_ = 0; + + #ifdef TEST + *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_ + << " has data for " << w_buffer_.length_ << " bytes.\n" + << logofs_flush; + + *logofs << "ProxyTransport: Start is " << w_buffer_.start_ + << " length is " << w_buffer_.length_ << " flush is " + << flush_ << " size is " << w_buffer_.data_.size() + << " capacity is " << w_buffer_.data_.capacity() + << ".\n" << logofs_flush; + #endif + + int result = Transport::flush(); + + if (result < 0) + { + return -1; + } + + // + // Update all the counters. + // + + statistics -> addCompressedBytes(diffTotalIn, diffTotalOut); + + statistics -> addBytesOut(diffTotalOut); + + statistics -> updateBitrate(diffTotalOut); + + FlushCallback(diffTotalOut); + + return result; +} + +unsigned int ProxyTransport::getPending(unsigned char *&data) +{ + // + // Return a pointer to the data in the + // read buffer. It is up to the caller + // to ensure that the data is consumed + // before the read buffer is reused. + // + + if (r_buffer_.length_ > 0) + { + unsigned int size = r_buffer_.length_; + + data = r_buffer_.data_.begin() + r_buffer_.start_; + + #ifdef DEBUG + *logofs << "ProxyTransport: Returning " << size + << " pending bytes from proxy FD#" << fd_ + << ".\n" << logofs_flush; + #endif + + r_buffer_.length_ = 0; + r_buffer_.start_ = 0; + + // + // Prevent the deletion of the buffer. + // + + owner_ = 0; + + return size; + } + + #ifdef TEST + *logofs << "ProxyTransport: WARNING! No pending data " + << "for proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + data = NULL; + + return 0; +} + +void ProxyTransport::fullReset() +{ + blocked_ = 0; + finish_ = 0; + flush_ = 0; + + if (control -> RemoteStreamCompression) + { + inflateReset(&r_stream_); + } + + if (control -> LocalStreamCompression) + { + deflateReset(&w_stream_); + } + + if (owner_ == 1) + { + Transport::fullReset(r_buffer_); + } + + Transport::fullReset(w_buffer_); +} + +AgentTransport::AgentTransport(int fd) : Transport(fd) +{ + #ifdef TEST + *logofs << "AgentTransport: Going to create agent transport " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + type_ = transport_agent; + + // + // Set up the read buffer. + // + + r_buffer_.length_ = 0; + r_buffer_.start_ = 0; + + r_buffer_.data_.resize(initialSize_); + + // + // For now we own the buffer. + // + + owner_ = 1; + + // + // Set up the mutexes. + // + + #ifdef THREADS + + pthread_mutexattr_t m_attributes; + + pthread_mutexattr_init(&m_attributes); + + // + // Interfaces in pthread to handle mutex + // type do not work in current version. + // + + m_attributes.__mutexkind = PTHREAD_MUTEX_ERRORCHECK_NP; + + if (pthread_mutex_init(&m_read_, &m_attributes) != 0) + { + #ifdef TEST + *logofs << "AgentTransport: Child: Creation of read mutex failed. " + << "Error is " << EGET() << " '" << ESTR() + << "'.\n" << logofs_flush; + #endif + } + + if (pthread_mutex_init(&m_write_, &m_attributes) != 0) + { + #ifdef TEST + *logofs << "AgentTransport: Child: Creation of write mutex failed. " + << "Error is " << EGET() << " '" << ESTR() + << "'.\n" << logofs_flush; + #endif + } + + #endif + + #ifdef REFERENCES + *logofs << "AgentTransport: Child: Created new object at " + << this << " out of " << ++references_ + << " allocated references.\n" << logofs_flush; + #endif +} + +AgentTransport::~AgentTransport() +{ + #ifdef TEST + *logofs << "AgentTransport: Going to destroy derived class " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + // + // Unlock and free all mutexes. + // + + #ifdef THREADS + + pthread_mutex_unlock(&m_read_); + pthread_mutex_unlock(&m_write_); + + pthread_mutex_destroy(&m_read_); + pthread_mutex_destroy(&m_write_); + + #endif + + #ifdef REFERENCES + *logofs << "AgentTransport: Child: Deleted object at " + << this << " out of " << --references_ + << " allocated references.\n" << logofs_flush; + #endif +} + +// +// Read data enqueued by the other thread. +// + +int AgentTransport::read(unsigned char *data, unsigned int size) +{ + #ifdef THREADS + + lockRead(); + + #endif + + #ifdef DEBUG + *logofs << "AgentTransport: Child: Going to read " << size + << " bytes from " << "FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + int copied = -1; + + if (r_buffer_.length_ > 0) + { + if ((int) size < r_buffer_.length_) + { + #ifdef TEST + *logofs << "AgentTransport: WARNING! Forcing a retry with " + << r_buffer_.length_ << " bytes pending and " + << size << " in the buffer.\n" + << logofs_flush; + #endif + + ESET(EAGAIN); + } + else + { + copied = (r_buffer_.length_ > ((int) size) ? + ((int) size) : r_buffer_.length_); + + memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied); + + // + // Update the buffer status. + // + + #ifdef TEST + *logofs << "AgentTransport: Child: Going to immediately return " + << copied << " bytes from FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + #ifdef DUMP + + *logofs << "AgentTransport: Child: Dumping content of read data.\n" + << logofs_flush; + + DumpData(data, copied); + + #endif + + r_buffer_.length_ -= copied; + + if (r_buffer_.length_ == 0) + { + r_buffer_.start_ = 0; + } + else + { + r_buffer_.start_ += copied; + + #ifdef TEST + *logofs << "AgentTransport: Child: There are still " + << r_buffer_.length_ << " bytes in read buffer for " + << "FD#" << fd_ << ".\n" << logofs_flush; + #endif + } + } + } + else + { + #ifdef DEBUG + *logofs << "AgentTransport: Child: No data can be got " + << "from read buffer for FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + ESET(EAGAIN); + } + + #ifdef THREADS + + unlockRead(); + + #endif + + return copied; +} + +// +// Write data to buffer so that the other +// thread can get it. +// + +int AgentTransport::write(T_write type, const unsigned char *data, const unsigned int size) +{ + #ifdef THREADS + + lockWrite(); + + #endif + + // + // Just append data to socket's write buffer. + // Note that we don't care if buffer exceeds + // the size limits set for this type of + // transport. + // + + #ifdef TEST + *logofs << "AgentTransport: Child: Going to append " << size + << " bytes to write buffer for " << "FD#" << fd_ + << ".\n" << logofs_flush; + #endif + + int copied = -1; + + if (resize(w_buffer_, size) < 0) + { + finish(); + + ESET(EPIPE); + } + else + { + memmove(w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_, data, size); + + w_buffer_.length_ += size; + + #ifdef DUMP + + *logofs << "AgentTransport: Child: Dumping content of written data.\n" + << logofs_flush; + + DumpData(data, size); + + #endif + + #ifdef TEST + *logofs << "AgentTransport: Child: Write buffer for FD#" << fd_ + << " has data for " << w_buffer_.length_ << " bytes.\n" + << logofs_flush; + + *logofs << "AgentTransport: Child: Start is " << w_buffer_.start_ + << " length is " << w_buffer_.length_ << " size is " + << w_buffer_.data_.size() << " capacity is " + << w_buffer_.data_.capacity() << ".\n" + << logofs_flush; + #endif + + copied = size; + } + + // + // Let child access again the read buffer. + // + + #ifdef THREADS + + unlockWrite(); + + #endif + + return copied; +} + +int AgentTransport::flush() +{ + // + // In case of memory-to-memory transport + // this function should never be called. + // + + #ifdef PANIC + *logofs << "AgentTransport: Child: PANIC! Called flush() for " + << "memory to memory transport on " << "FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + cerr << "Error" << ": Called flush() for " + << "memory to memory transport on " << "FD#" + << fd_ << ".\n"; + + HandleAbort(); +} + +int AgentTransport::drain(int limit, int timeout) +{ + // + // We can't drain the channel in the case + // of the memory-to-memory transport. Data + // is enqueued for the agent to read but + // the agent could require multiple loops + // to read it all. + // + + // + // In case of memory-to-memory transport + // this function should never be called. + // + + #ifdef PANIC + *logofs << "AgentTransport: Child: PANIC! Called drain() for " + << "memory to memory transport on " << "FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + cerr << "Error" << ": Called drain() for " + << "memory to memory transport on " << "FD#" + << fd_ << ".\n"; + + HandleAbort(); +} + +unsigned int AgentTransport::getPending(unsigned char *&data) +{ + #ifdef THREADS + + lockRead(); + + #endif + + if (r_buffer_.length_ > 0) + { + unsigned int size = r_buffer_.length_; + + data = r_buffer_.data_.begin() + r_buffer_.start_; + + #ifdef DEBUG + *logofs << "AgentTransport: Child: Returning " << size + << " pending bytes from FD#" << fd_ + << ".\n" << logofs_flush; + #endif + + r_buffer_.length_ = 0; + r_buffer_.start_ = 0; + + #ifdef THREADS + + unlockRead(); + + #endif + + // + // Prevent the deletion of the buffer. + // + + owner_ = 0; + + return size; + } + + #ifdef TEST + *logofs << "AgentTransport: WARNING! No pending data " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + #ifdef THREADS + + unlockRead(); + + #endif + + data = NULL; + + return 0; +} + +void AgentTransport::fullReset() +{ + #ifdef THREADS + + lockRead(); + lockWrite(); + + #endif + + #ifdef TEST + *logofs << "AgentTransport: Child: Resetting transport " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + blocked_ = 0; + finish_ = 0; + + if (owner_ == 1) + { + Transport::fullReset(r_buffer_); + } + + Transport::fullReset(w_buffer_); +} + +int AgentTransport::enqueue(const char *data, const int size) +{ + #ifdef THREADS + + lockRead(); + + #endif + + if (finish_ == 1) + { + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: Returning EPIPE in " + << "write for finishing FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + ESET(EPIPE); + + return -1; + } + + // + // Always allow the agent to write + // all its data. + // + + int toPut = size; + + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: Going to put " << toPut + << " bytes into read buffer for FD#" << fd_ + << ". Buffer length is " << r_buffer_.length_ + << ".\n" << logofs_flush; + #endif + + if (resize(r_buffer_, toPut) < 0) + { + finish(); + + #ifdef THREADS + + unlockRead(); + + #endif + + return -1; + } + + memcpy(r_buffer_.data_.begin() + r_buffer_.start_ + r_buffer_.length_, data, toPut); + + r_buffer_.length_ += toPut; + + #if defined(DUMP) && defined(PARENT) + + *logofs << "AgentTransport: Parent: Dumping content of enqueued data.\n" + << logofs_flush; + + DumpData((const unsigned char *) data, toPut); + + #endif + + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: Read buffer for FD#" << fd_ + << " has now data for " << r_buffer_.length_ + << " bytes.\n" << logofs_flush; + + *logofs << "AgentTransport: Parent: Start is " << r_buffer_.start_ + << " length is " << r_buffer_.length_ << " size is " + << r_buffer_.data_.size() << " capacity is " + << r_buffer_.data_.capacity() << ".\n" + << logofs_flush; + #endif + + #ifdef THREADS + + unlockRead(); + + #endif + + return toPut; +} + +int AgentTransport::dequeue(char *data, int size) +{ + #ifdef THREADS + + lockWrite(); + + #endif + + if (w_buffer_.length_ == 0) + { + if (finish_ == 1) + { + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: Returning 0 in read " + << "for finishing FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + return 0; + } + + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: No data can be read " + << "from write buffer for FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + ESET(EAGAIN); + + #ifdef THREADS + + unlockWrite(); + + #endif + + return -1; + } + + // + // Return as many bytes as possible. + // + + int toGet = ((int) size > w_buffer_.length_ ? w_buffer_.length_ : size); + + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: Going to get " << toGet + << " bytes from write buffer for FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + memcpy(data, w_buffer_.data_.begin() + w_buffer_.start_, toGet); + + w_buffer_.start_ += toGet; + w_buffer_.length_ -= toGet; + + #if defined(DUMP) && defined(PARENT) + + *logofs << "AgentTransport: Parent: Dumping content of dequeued data.\n" + << logofs_flush; + + DumpData((const unsigned char *) data, toGet); + + #endif + + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: Write buffer for FD#" << fd_ + << " has now data for " << length() << " bytes.\n" + << logofs_flush; + + *logofs << "AgentTransport: Parent: Start is " << w_buffer_.start_ + << " length is " << w_buffer_.length_ << " size is " + << w_buffer_.data_.size() << " capacity is " + << w_buffer_.data_.capacity() << ".\n" + << logofs_flush; + #endif + + #ifdef THREADS + + unlockWrite(); + + #endif + + return toGet; +} + +int AgentTransport::dequeuable() +{ + if (finish_ == 1) + { + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: Returning EPIPE in " + << "readable for finishing FD#" << fd_ + << ".\n" << logofs_flush; + #endif + + ESET(EPIPE); + + return -1; + } + + #if defined(PARENT) && defined(TEST) + *logofs << "AgentTransport: Parent: Returning " + << w_buffer_.length_ << " as data readable " + << "from read buffer for FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + return w_buffer_.length_; +} + +#ifdef THREADS + +int AgentTransport::lockRead() +{ + for (;;) + { + int result = pthread_mutex_lock(&m_read_); + + if (result == 0) + { + #ifdef DEBUG + *logofs << "AgentTransport: Read mutex locked by thread id " + << pthread_self() << ".\n" << logofs_flush; + #endif + + return 0; + } + else if (EGET() == EINTR) + { + continue; + } + else + { + #ifdef WARNING + *logofs << "AgentTransport: WARNING! Locking of read mutex by thread id " + << pthread_self() << " returned " << result << ". Error is '" + << ESTR() << "'.\n" << logofs_flush; + #endif + + return result; + } + } +} + +int AgentTransport::lockWrite() +{ + for (;;) + { + int result = pthread_mutex_lock(&m_write_); + + if (result == 0) + { + #ifdef DEBUG + *logofs << "AgentTransport: Write mutex locked by thread id " + << pthread_self() << ".\n" << logofs_flush; + #endif + + return 0; + } + else if (EGET() == EINTR) + { + continue; + } + else + { + #ifdef WARNING + *logofs << "AgentTransport: WARNING! Locking of write mutex by thread id " + << pthread_self() << " returned " << result << ". Error is '" + << ESTR() << "'.\n" << logofs_flush; + #endif + + return result; + } + } +} + +int AgentTransport::unlockRead() +{ + for (;;) + { + int result = pthread_mutex_unlock(&m_read_); + + if (result == 0) + { + #ifdef DEBUG + *logofs << "AgentTransport: Read mutex unlocked by thread id " + << pthread_self() << ".\n" << logofs_flush; + #endif + + return 0; + } + else if (EGET() == EINTR) + { + continue; + } + else + { + #ifdef WARNING + *logofs << "AgentTransport: WARNING! Unlocking of read mutex by thread id " + << pthread_self() << " returned " << result << ". Error is '" + << ESTR() << "'.\n" << logofs_flush; + #endif + + return result; + } + } +} + +int AgentTransport::unlockWrite() +{ + for (;;) + { + int result = pthread_mutex_unlock(&m_write_); + + if (result == 0) + { + #ifdef DEBUG + *logofs << "AgentTransport: Write mutex unlocked by thread id " + << pthread_self() << ".\n" << logofs_flush; + #endif + + return 0; + } + else if (EGET() == EINTR) + { + continue; + } + else + { + #ifdef WARNING + *logofs << "AgentTransport: WARNING! Unlocking of write mutex by thread id " + << pthread_self() << " returned " << result << ". Error is '" + << ESTR() << "'.\n" << logofs_flush; + #endif + + return result; + } + } +} + +#endif |