/**************************************************************************/ /* */ /* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */ /* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */ /* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */ /* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */ /* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/ /* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */ /* */ /* NXCOMP, NX protocol compression and NX extensions to this software */ /* are copyright of the aforementioned persons and companies. */ /* */ /* Redistribution and use of the present software is allowed according */ /* to terms specified in the file LICENSE.nxcomp which comes in the */ /* source distribution. */ /* */ /* All rights reserved. */ /* */ /* NOTE: This software has received contributions from various other */ /* contributors, only the core maintainers and supporters are listed as */ /* copyright holders. Please contact us, if you feel you should be listed */ /* as copyright holder, as well. */ /* */ /**************************************************************************/ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include <stdlib.h> #include <unistd.h> #include <string.h> #include <signal.h> #include <sys/socket.h> #include "Transport.h" #include "Statistics.h" // // Set the verbosity level. You also // need to define DUMP in Misc.cpp // if DUMP is defined here. // #define PANIC #define WARNING #undef TEST #undef DEBUG #undef INSPECT #undef DUMP // // Used to lock and unlock the transport // buffers before they are accessed by // different threads. // #undef THREADS // // Define this to get logging all the // operations performed by the parent // thread, the one that enqueues and // dequeues data. // #define PARENT // // Define this to know when a channel // is created or destroyed. // #undef REFERENCES // // Reference count for allocated buffers. // #ifdef REFERENCES int Transport::references_; int ProxyTransport::references_; int InternalTransport::references_; #endif // // This is the base class providing methods for read // and write buffering. // Transport::Transport(int fd) : fd_(fd) { #ifdef TEST *logofs << "Transport: Going to create base transport " << "for FD#" << fd_ << ".\n" << logofs_flush; #endif type_ = transport_base; // // Set up the write buffer. // w_buffer_.length_ = 0; w_buffer_.start_ = 0; initialSize_ = TRANSPORT_BUFFER_DEFAULT_SIZE; thresholdSize_ = TRANSPORT_BUFFER_DEFAULT_SIZE << 1; maximumSize_ = TRANSPORT_BUFFER_DEFAULT_SIZE << 4; w_buffer_.data_.resize(initialSize_); // // Set non-blocking IO on socket. // SetNonBlocking(fd_, 1); blocked_ = 0; finish_ = 0; #ifdef REFERENCES *logofs << "Transport: Created new object at " << this << " out of " << ++references_ << " allocated references.\n" << logofs_flush; #endif } Transport::~Transport() { #ifdef TEST *logofs << "Transport: Going to destroy base class " << "for FD#" << fd_ << ".\n" << logofs_flush; #endif ::close(fd_); #ifdef REFERENCES *logofs << "Transport: Deleted object at " << this << " out of " << --references_ << " allocated references.\n" << logofs_flush; #endif } // // Read data from its file descriptor. // int Transport::read(unsigned char *data, unsigned int size) { #ifdef DEBUG *logofs << "Transport: Going to read " << size << " bytes from " << "FD#" << fd_ << ".\n" << logofs_flush; #endif // // Read the available data from the socket. // int result = ::read(fd_, data, size); // // Update the current timestamp as the read // can have scheduled some other process. // getNewTimestamp(); if (result < 0) { if (EGET() == EAGAIN) { #ifdef TEST *logofs << "Transport: WARNING! Read of " << size << " bytes from " << "FD#" << fd_ << " would block.\n" << logofs_flush; #endif return 0; } else if (EGET() == EINTR) { #ifdef TEST *logofs << "Transport: Read of " << size << " bytes from " << "FD#" << fd_ << " was interrupted.\n" << logofs_flush; #endif return 0; } else { #ifdef TEST *logofs << "Transport: Error reading from " << "FD#" << fd_ << ".\n" << logofs_flush; #endif finish(); return -1; } } else if (result == 0) { #ifdef TEST *logofs << "Transport: No data read from " << "FD#" << fd_ << ".\n" << logofs_flush; #endif finish(); return -1; } #ifdef TEST *logofs << "Transport: Read " << result << " bytes out of " << size << " from FD#" << fd_ << ".\n" << logofs_flush; #endif #ifdef DUMP *logofs << "Transport: Dumping content of read data.\n" << logofs_flush; DumpData(data, result); #endif return result; } // // Write as many bytes as possible to socket. // Append the remaining data bytes to the end // of the buffer and update length to reflect // changes. // int Transport::write(T_write type, const unsigned char *data, const unsigned int size) { // // If an immediate write was requested then // flush the enqueued data first. // // Alternatively may try to write only if // the socket is not blocked. // // if (w_buffer_.length_ > 0 && blocked_ == 0 && // type == write_immediate) // { // ... // } // if (w_buffer_.length_ > 0 && type == write_immediate) { #ifdef TEST *logofs << "Transport: Writing " << w_buffer_.length_ << " bytes of previous data to FD#" << fd_ << ".\n" << logofs_flush; #endif int result = Transport::flush(); if (result < 0) { return -1; } } // // If nothing is remained, write immediately // to the socket. // unsigned int written = 0; if (w_buffer_.length_ == 0 && blocked_ == 0 && type == write_immediate) { // // Limit the amount of data sent. // unsigned int toWrite = size; #ifdef DUMP *logofs << "Transport: Going to write " << toWrite << " bytes to FD#" << fd_ << " with checksum "; DumpChecksum(data, size); *logofs << ".\n" << logofs_flush; #endif T_timestamp writeTs; int diffTs; while (written < toWrite) { // // Trace system time spent writing data. // writeTs = getTimestamp(); int result = ::write(fd_, data + written, toWrite - written); diffTs = diffTimestamp(writeTs, getNewTimestamp()); statistics -> addWriteTime(diffTs); if (result <= 0) { if (EGET() == EAGAIN) { #ifdef TEST *logofs << "Transport: Write of " << toWrite - written << " bytes on FD#" << fd_ << " would block.\n" << logofs_flush; #endif blocked_ = 1; break; } else if (EGET() == EINTR) { #ifdef TEST *logofs << "Transport: Write of " << toWrite - written << " bytes on FD#" << fd_ << " was interrupted.\n" << logofs_flush; #endif continue; } else { #ifdef TEST *logofs << "Transport: Write to " << "FD#" << fd_ << " failed.\n" << logofs_flush; #endif finish(); return -1; } } else { #ifdef TEST *logofs << "Transport: Immediately written " << result << " bytes on " << "FD#" << fd_ << ".\n" << logofs_flush; #endif written += result; } } #ifdef DUMP if (written > 0) { *logofs << "Transport: Dumping content of immediately written data.\n" << logofs_flush; DumpData(data, written); } #endif } if (written == size) { // // We will not affect the write buffer. // return written; } #ifdef DEBUG *logofs << "Transport: Going to append " << size - written << " bytes to write buffer for " << "FD#" << fd_ << ".\n" << logofs_flush; #endif if (resize(w_buffer_, size - written) < 0) { return -1; } memmove(w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_, data + written, size - written); w_buffer_.length_ += size - written; #ifdef TEST *logofs << "Transport: Write buffer for FD#" << fd_ << " has data for " << w_buffer_.length_ << " bytes.\n" << logofs_flush; *logofs << "Transport: Start is " << w_buffer_.start_ << " length is " << w_buffer_.length_ << " size is " << w_buffer_.data_.size() << " capacity is " << w_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif // // Note that this function always returns the whole // size of buffer that was provided, either if not // all the data could be actually written. // return size; } // // Write pending data to its file descriptor. // int Transport::flush() { if (w_buffer_.length_ == 0) { #ifdef TEST *logofs << "Transport: No data to flush on " << "FD#" << fd_ << ".\n" << logofs_flush; #endif #ifdef WARNING if (blocked_ != 0) { *logofs << "Transport: Blocked flag is " << blocked_ << " with no data to flush on FD#" << fd_ << ".\n" << logofs_flush; } #endif return 0; } // // It's time to move data from the // write buffer to the real link. // int written = 0; int toWrite = w_buffer_.length_; // // We will do our best to write any available // data to the socket, so let's say we start // from a clean state. // blocked_ = 0; #ifdef TEST *logofs << "Transport: Going to flush " << toWrite << " bytes on FD#" << fd_ << ".\n" << logofs_flush; #endif T_timestamp writeTs; int diffTs; while (written < toWrite) { writeTs = getTimestamp(); int result = ::write(fd_, w_buffer_.data_.begin() + w_buffer_.start_ + written, toWrite - written); diffTs = diffTimestamp(writeTs, getNewTimestamp()); statistics -> addWriteTime(diffTs); if (result <= 0) { if (EGET() == EAGAIN) { #ifdef TEST *logofs << "Transport: Write of " << toWrite - written << " bytes on FD#" << fd_ << " would block.\n" << logofs_flush; #endif blocked_ = 1; break; } else if (EGET() == EINTR) { #ifdef TEST *logofs << "Transport: Write of " << toWrite - written << " bytes on FD#" << fd_ << " was interrupted.\n" << logofs_flush; #endif continue; } else { #ifdef TEST *logofs << "Transport: Write to " << "FD#" << fd_ << " failed.\n" << logofs_flush; #endif finish(); return -1; } } else { #ifdef TEST *logofs << "Transport: Flushed " << result << " bytes on " << "FD#" << fd_ << ".\n" << logofs_flush; #endif written += result; } } if (written > 0) { #ifdef DUMP *logofs << "Transport: Dumping content of flushed data.\n" << logofs_flush; DumpData(w_buffer_.data_.begin() + w_buffer_.start_, written); #endif // // Update the buffer status. // w_buffer_.length_ -= written; if (w_buffer_.length_ == 0) { w_buffer_.start_ = 0; } else { w_buffer_.start_ += written; } } // // It can be that we wrote less bytes than // available because of the write limit. // if (w_buffer_.length_ > 0) { #ifdef TEST *logofs << "Transport: There are still " << w_buffer_.length_ << " bytes in write buffer for " << "FD#" << fd_ << ".\n" << logofs_flush; #endif blocked_ = 1; } #ifdef TEST *logofs << "Transport: Write buffer for FD#" << fd_ << " has data for " << w_buffer_.length_ << " bytes.\n" << logofs_flush; *logofs << "Transport: Start is " << w_buffer_.start_ << " length is " << w_buffer_.length_ << " size is " << w_buffer_.data_.size() << " capacity is " << w_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif // // No new data was produced for the link except // any outstanding data from previous writes. // return 0; } int Transport::drain(int limit, int timeout) { if (w_buffer_.length_ <= limit) { return 1; } // // Write the data accumulated in the write // buffer until it is below the limit or // the timeout is elapsed. // int toWrite = w_buffer_.length_; int written = 0; #ifdef TEST *logofs << "Transport: Draining " << toWrite - limit << " bytes on FD#" << fd_ << " with limit set to " << limit << ".\n" << logofs_flush; #endif T_timestamp startTs = getNewTimestamp(); T_timestamp selectTs; T_timestamp writeTs; T_timestamp idleTs; T_timestamp nowTs = startTs; int diffTs; fd_set writeSet; fd_set readSet; FD_ZERO(&writeSet); FD_ZERO(&readSet); int result; int ready; while (w_buffer_.length_ - written > limit) { nowTs = getNewTimestamp(); // // Wait for descriptor to become // readable or writable. // FD_SET(fd_, &writeSet); FD_SET(fd_, &readSet); setTimestamp(selectTs, timeout / 2); idleTs = nowTs; result = select(fd_ + 1, &readSet, &writeSet, NULL, &selectTs); nowTs = getNewTimestamp(); diffTs = diffTimestamp(idleTs, nowTs); statistics -> addIdleTime(diffTs); statistics -> subReadTime(diffTs); if (result < 0) { if (EGET() == EINTR) { #ifdef TEST *logofs << "Transport: Select on FD#" << fd_ << " was interrupted.\n" << logofs_flush; #endif continue; } else { #ifdef TEST *logofs << "Transport: Select on FD#" << fd_ << " failed.\n" << logofs_flush; #endif finish(); return -1; } } else if (result > 0) { ready = result; if (FD_ISSET(fd_, &writeSet)) { writeTs = getNewTimestamp(); result = ::write(fd_, w_buffer_.data_.begin() + w_buffer_.start_ + written, toWrite - written); nowTs = getNewTimestamp(); diffTs = diffTimestamp(writeTs, nowTs); statistics -> addWriteTime(diffTs); if (result > 0) { #ifdef TEST *logofs << "Transport: Forced flush of " << result << " bytes on " << "FD#" << fd_ << ".\n" << logofs_flush; #endif written += result; } else if (result < 0 && EGET() == EINTR) { #ifdef TEST *logofs << "Transport: Write to FD#" << fd_ << " was interrupted.\n" << logofs_flush; #endif continue; } else { #ifdef TEST *logofs << "Transport: Write to FD#" << fd_ << " failed.\n" << logofs_flush; #endif finish(); return -1; } ready--; } if (ready > 0) { if (FD_ISSET(fd_, &readSet)) { #ifdef TEST *logofs << "Transport: Not draining further " << "due to data readable on FD#" << fd_ << ".\n" << logofs_flush; #endif break; } } } #ifdef TEST else { *logofs << "Transport: Timeout encountered " << "waiting for FD#" << fd_ << ".\n" << logofs_flush; } #endif nowTs = getNewTimestamp(); diffTs = diffTimestamp(startTs, nowTs); if (diffTs >= timeout) { #ifdef TEST *logofs << "Transport: Not draining further " << "due to the timeout on FD#" << fd_ << ".\n" << logofs_flush; #endif break; } } if (written > 0) { #ifdef DUMP *logofs << "Transport: Dumping content of flushed data.\n" << logofs_flush; DumpData(w_buffer_.data_.begin() + w_buffer_.start_, written); #endif // // Update the buffer status. // w_buffer_.length_ -= written; if (w_buffer_.length_ == 0) { w_buffer_.start_ = 0; blocked_ = 0; } else { w_buffer_.start_ += written; #ifdef TEST *logofs << "Transport: There are still " << w_buffer_.length_ << " bytes in write buffer for " << "FD#" << fd_ << ".\n" << logofs_flush; #endif blocked_ = 1; } } #ifdef TEST else { *logofs << "Transport: WARNING! No data written to FD#" << fd_ << " with " << toWrite << " bytes to drain and limit " << "set to " << limit << ".\n" << logofs_flush; } #endif #ifdef TEST *logofs << "Transport: Write buffer for FD#" << fd_ << " has data for " << w_buffer_.length_ << " bytes.\n" << logofs_flush; *logofs << "Transport: Start is " << w_buffer_.start_ << " length is " << w_buffer_.length_ << " size is " << w_buffer_.data_.size() << " capacity is " << w_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif return (w_buffer_.length_ <= limit); } int Transport::wait(int timeout) const { T_timestamp startTs = getNewTimestamp(); T_timestamp idleTs; T_timestamp selectTs; T_timestamp nowTs = startTs; long available = 0; int result = 0; int diffTs; fd_set readSet; FD_ZERO(&readSet); FD_SET(fd_, &readSet); for (;;) { available = readable(); diffTs = diffTimestamp(startTs, nowTs); if (available != 0 || timeout == 0 || (diffTs + (timeout / 10)) >= timeout) { #ifdef TEST *logofs << "Transport: There are " << available << " bytes on FD#" << fd_ << " after " << diffTs << " ms.\n" << logofs_flush; #endif return available; } else if (available == 0 && result > 0) { #ifdef TEST *logofs << "Transport: Read on " << "FD#" << fd_ << " failed.\n" << logofs_flush; #endif return -1; } // // TODO: Should subtract the time // already spent in select. // selectTs.tv_sec = 0; selectTs.tv_usec = timeout * 1000; idleTs = nowTs; // // Wait for descriptor to become readable. // result = select(fd_ + 1, &readSet, NULL, NULL, &selectTs); nowTs = getNewTimestamp(); diffTs = diffTimestamp(idleTs, nowTs); statistics -> addIdleTime(diffTs); statistics -> subReadTime(diffTs); if (result < 0) { if (EGET() == EINTR) { #ifdef TEST *logofs << "Transport: Select on FD#" << fd_ << " was interrupted.\n" << logofs_flush; #endif continue; } else { #ifdef TEST *logofs << "Transport: Select on " << "FD#" << fd_ << " failed.\n" << logofs_flush; #endif return -1; } } #ifdef TEST else if (result == 0) { *logofs << "Transport: No data available on FD#" << fd_ << " after " << diffTimestamp(startTs, nowTs) << " ms.\n" << logofs_flush; } else { *logofs << "Transport: Data became available on FD#" << fd_ << " after " << diffTimestamp(startTs, nowTs) << " ms.\n" << logofs_flush; } #endif } } void Transport::setSize(unsigned int initialSize, unsigned int thresholdSize, unsigned int maximumSize) { initialSize_ = initialSize; thresholdSize_ = thresholdSize; maximumSize_ = maximumSize; #ifdef TEST *logofs << "Transport: Set buffer sizes for FD#" << fd_ << " to " << initialSize_ << "/" << thresholdSize_ << "/" << maximumSize_ << ".\n" << logofs_flush; #endif } void Transport::fullReset() { blocked_ = 0; finish_ = 0; fullReset(w_buffer_); } int Transport::resize(T_buffer &buffer, const int &size) { if ((int) buffer.data_.size() >= (buffer.length_ + size) && (buffer.start_ + buffer.length_ + size) > (int) buffer.data_.size()) { if (buffer.length_ > 0) { // // There is enough space in buffer but we need // to move existing data at the beginning. // #ifdef TEST *logofs << "Transport: Moving " << buffer.length_ << " bytes of data for " << "FD#" << fd_ << " to make room in the buffer.\n" << logofs_flush; #endif memmove(buffer.data_.begin(), buffer.data_.begin() + buffer.start_, buffer.length_); } buffer.start_ = 0; #ifdef DEBUG *logofs << "Transport: Made room for " << buffer.data_.size() - buffer.start_ << " bytes in buffer for " << "FD#" << fd_ << ".\n" << logofs_flush; #endif } else if ((buffer.length_ + size) > (int) buffer.data_.size()) { // // Not enough space, so increase // the size of the buffer. // if (buffer.start_ != 0 && buffer.length_ > 0) { #ifdef TEST *logofs << "Transport: Moving " << buffer.length_ << " bytes of data for " << "FD#" << fd_ << " to resize the buffer.\n" << logofs_flush; #endif memmove(buffer.data_.begin(), buffer.data_.begin() + buffer.start_, buffer.length_); } buffer.start_ = 0; unsigned int newSize = thresholdSize_; while (newSize < (unsigned int) buffer.length_ + size) { newSize <<= 1; if (newSize >= maximumSize_) { newSize = buffer.length_ + size + initialSize_; } } #ifdef DEBUG *logofs << "Transport: Buffer for " << "FD#" << fd_ << " will be enlarged from " << buffer.data_.size() << " to at least " << buffer.length_ + size << " bytes.\n" << logofs_flush; #endif buffer.data_.resize(newSize); #ifdef TEST if (newSize >= maximumSize_) { *logofs << "Transport: WARNING! Buffer for FD#" << fd_ << " grown to reach size of " << newSize << " bytes.\n" << logofs_flush; } #endif #ifdef TEST *logofs << "Transport: Data buffer for " << "FD#" << fd_ << " has now size " << buffer.data_.size() << " and capacity " << buffer.data_.capacity() << ".\n" << logofs_flush; #endif } return (buffer.length_ + size); } void Transport::fullReset(T_buffer &buffer) { // // Force deallocation and allocation // of the initial size. // #ifdef TEST *logofs << "Transport: Resetting buffer for " << "FD#" << fd_ << " with size " << buffer.data_.size() << " and capacity " << buffer.data_.capacity() << ".\n" << logofs_flush; #endif buffer.start_ = 0; buffer.length_ = 0; if (buffer.data_.size() > (unsigned int) initialSize_ && buffer.data_.capacity() > (unsigned int) initialSize_) { buffer.data_.clear(); buffer.data_.resize(initialSize_); #ifdef TEST *logofs << "Transport: Data buffer for " << "FD#" << fd_ << " shrunk to size " << buffer.data_.size() << " and capacity " << buffer.data_.capacity() << ".\n" << logofs_flush; #endif } } ProxyTransport::ProxyTransport(int fd) : Transport(fd) { #ifdef TEST *logofs << "ProxyTransport: Going to create proxy transport " << "for FD#" << fd_ << ".\n" << logofs_flush; #endif type_ = transport_proxy; // // Set up the read buffer. // r_buffer_.length_ = 0; r_buffer_.start_ = 0; r_buffer_.data_.resize(initialSize_); // // For now we own the buffer. // owner_ = 1; // // Set up ZLIB compression. // int result; r_stream_.zalloc = NULL; r_stream_.zfree = NULL; r_stream_.opaque = NULL; r_stream_.next_in = NULL; r_stream_.avail_in = 0; if ((result = inflateInit2(&r_stream_, 15)) != Z_OK) { #ifdef PANIC *logofs << "ProxyTransport: PANIC! Failed initialization of ZLIB read stream. " << "Error is '" << zError(result) << "'.\n" << logofs_flush; #endif cerr << "Error" << ": Failed initialization of ZLIB read stream. " << "Error is '" << zError(result) << "'.\n"; HandleCleanup(); } if (control -> LocalStreamCompression) { w_stream_.zalloc = NULL; w_stream_.zfree = NULL; w_stream_.opaque = NULL; if ((result = deflateInit2(&w_stream_, control -> LocalStreamCompressionLevel, Z_DEFLATED, 15, 9, Z_DEFAULT_STRATEGY)) != Z_OK) { #ifdef PANIC *logofs << "ProxyTransport: PANIC! Failed initialization of ZLIB write stream. " << "Error is '" << zError(result) << "'.\n" << logofs_flush; #endif cerr << "Error" << ": Failed initialization of ZLIB write stream. " << "Error is '" << zError(result) << "'.\n"; HandleCleanup(); } } // // No ZLIB stream to flush yet. // flush_ = 0; #ifdef REFERENCES *logofs << "ProxyTransport: Created new object at " << this << " out of " << ++references_ << " allocated references.\n" << logofs_flush; #endif } ProxyTransport::~ProxyTransport() { #ifdef TEST *logofs << "ProxyTransport: Going to destroy derived class " << "for FD#" << fd_ << ".\n" << logofs_flush; #endif // // Deallocate the ZLIB stream state. // inflateEnd(&r_stream_); if (control -> LocalStreamCompression) { deflateEnd(&w_stream_); } #ifdef REFERENCES *logofs << "ProxyTransport: Deleted object at " << this << " out of " << --references_ << " allocated references.\n" << logofs_flush; #endif } // // Read data from its file descriptor. // int ProxyTransport::read(unsigned char *data, unsigned int size) { // // If the remote peer is not compressing // the stream then just return any byte // read from the socket. // if (control -> RemoteStreamCompression == 0) { int result = Transport::read(data, size); if (result <= 0) { return result; } statistics -> addBytesIn(result); return result; } // // Return any pending data first. // if (r_buffer_.length_ > 0) { // // If the size of the buffer doesn't // match the amount of data pending, // force the caller to retry. // if ((int) size < r_buffer_.length_) { #ifdef TEST *logofs << "ProxyTransport: WARNING! Forcing a retry with " << r_buffer_.length_ << " bytes pending and " << size << " in the buffer.\n" << logofs_flush; #endif ESET(EAGAIN); return -1; } int copied = (r_buffer_.length_ > ((int) size) ? ((int) size) : r_buffer_.length_); memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied); // // Update the buffer status. // #ifdef DEBUG *logofs << "ProxyTransport: Going to immediately return " << copied << " bytes from proxy FD#" << fd_ << ".\n" << logofs_flush; #endif r_buffer_.length_ -= copied; if (r_buffer_.length_ == 0) { r_buffer_.start_ = 0; } else { r_buffer_.start_ += copied; #ifdef TEST *logofs << "ProxyTransport: There are still " << r_buffer_.length_ << " bytes in read buffer for proxy " << "FD#" << fd_ << ".\n" << logofs_flush; #endif } return copied; } // // Read data in the user buffer. // int result = Transport::read(data, size); if (result <= 0) { return result; } statistics -> addBytesIn(result); // // Decompress the data into the read // buffer. // #ifdef DEBUG *logofs << "ProxyTransport: Going to decompress data for " << "proxy FD#" << fd_ << ".\n" << logofs_flush; #endif int saveTotalIn = r_stream_.total_in; int saveTotalOut = r_stream_.total_out; int oldTotalIn = saveTotalIn; int oldTotalOut = saveTotalOut; int diffTotalIn; int diffTotalOut; #ifdef INSPECT *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut << ".\n" << logofs_flush; #endif r_stream_.next_in = (Bytef *) data; r_stream_.avail_in = result; // // Let ZLIB use all the space already // available in the buffer. // unsigned int newAvailOut = r_buffer_.data_.size() - r_buffer_.start_ - r_buffer_.length_; #ifdef TEST *logofs << "ProxyTransport: Initial decompress buffer is " << newAvailOut << " bytes.\n" << logofs_flush; #endif for (;;) { #ifdef INSPECT *logofs << "\nProxyTransport: Running the decompress loop.\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_buffer_.length_ = " << r_buffer_.length_ << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_buffer_.data_.size() = " << r_buffer_.data_.size() << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: newAvailOut = " << newAvailOut << ".\n" << logofs_flush; #endif if (resize(r_buffer_, newAvailOut) < 0) { return -1; } #ifdef INSPECT *logofs << "ProxyTransport: r_buffer_.data_.size() = " << r_buffer_.data_.size() << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_stream_.next_in = " << (void *) r_stream_.next_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_stream_.avail_in = " << r_stream_.avail_in << ".\n" << logofs_flush; #endif r_stream_.next_out = r_buffer_.data_.begin() + r_buffer_.start_ + r_buffer_.length_; r_stream_.avail_out = newAvailOut; #ifdef INSPECT *logofs << "ProxyTransport: r_stream_.next_out = " << (void *) r_stream_.next_out << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_stream_.avail_out = " << r_stream_.avail_out << ".\n" << logofs_flush; #endif result = inflate(&r_stream_, Z_SYNC_FLUSH); #ifdef INSPECT *logofs << "ProxyTransport: Called inflate() result is " << result << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_stream_.avail_in = " << r_stream_.avail_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_stream_.avail_out = " << r_stream_.avail_out << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_stream_.total_in = " << r_stream_.total_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_stream_.total_out = " << r_stream_.total_out << ".\n" << logofs_flush; #endif diffTotalIn = r_stream_.total_in - oldTotalIn; diffTotalOut = r_stream_.total_out - oldTotalOut; #ifdef INSPECT *logofs << "ProxyTransport: diffTotalIn = " << diffTotalIn << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: diffTotalOut = " << diffTotalOut << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: r_buffer_.length_ = " << r_buffer_.length_ << ".\n" << logofs_flush; #endif r_buffer_.length_ += diffTotalOut; #ifdef INSPECT *logofs << "ProxyTransport: r_buffer_.length_ = " << r_buffer_.length_ << ".\n" << logofs_flush; #endif oldTotalIn = r_stream_.total_in; oldTotalOut = r_stream_.total_out; if (result == Z_OK) { if (r_stream_.avail_in == 0 && r_stream_.avail_out > 0) { break; } } else if (result == Z_BUF_ERROR && r_stream_.avail_out > 0 && r_stream_.avail_in == 0) { #ifdef TEST *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR decompressing data.\n" << logofs_flush; #endif break; } else { #ifdef PANIC *logofs << "ProxyTransport: PANIC! Decompression of data failed. " << "Error is '" << zError(result) << "'.\n" << logofs_flush; #endif cerr << "Error" << ": Decompression of data failed. Error is '" << zError(result) << "'.\n"; finish(); return -1; } // // Add more bytes to the buffer. // if (newAvailOut < thresholdSize_) { newAvailOut = thresholdSize_; } #ifdef TEST *logofs << "ProxyTransport: Need to add " << newAvailOut << " bytes to the decompress buffer in read.\n" << logofs_flush; #endif } diffTotalIn = r_stream_.total_in - saveTotalIn; diffTotalOut = r_stream_.total_out - saveTotalOut; #ifdef DEBUG *logofs << "ProxyTransport: Decompressed data from " << diffTotalIn << " to " << diffTotalOut << " bytes.\n" << logofs_flush; #endif statistics -> addDecompressedBytes(diffTotalIn, diffTotalOut); // // Check if the size of the buffer // matches the produced data. // if ((int) size < r_buffer_.length_) { #ifdef TEST *logofs << "ProxyTransport: WARNING! Forcing a retry with " << r_buffer_.length_ << " bytes pending and " << size << " in the buffer.\n" << logofs_flush; #endif ESET(EAGAIN); return -1; } // // Copy the decompressed data to the // provided buffer. // int copied = (r_buffer_.length_ > ((int) size) ? ((int) size) : r_buffer_.length_); #ifdef DEBUG *logofs << "ProxyTransport: Going to return " << copied << " bytes from proxy FD#" << fd_ << ".\n" << logofs_flush; #endif memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied); // // Update the buffer status. // r_buffer_.length_ -= copied; if (r_buffer_.length_ == 0) { r_buffer_.start_ = 0; } else { r_buffer_.start_ += copied; #ifdef TEST *logofs << "ProxyTransport: There are still " << r_buffer_.length_ << " bytes in read buffer for proxy FD#" << fd_ << ".\n" << logofs_flush; #endif } return copied; } // // If required compress data, else write it to socket. // int ProxyTransport::write(T_write type, const unsigned char *data, const unsigned int size) { #ifdef TEST if (size == 0) { *logofs << "ProxyTransport: WARNING! Write called for FD#" << fd_ << " without any data to write.\n" << logofs_flush; return 0; } #endif // // If there is no compression revert to // plain socket management. // if (control -> LocalStreamCompression == 0) { int result = Transport::write(type, data, size); if (result <= 0) { return result; } statistics -> addBytesOut(result); statistics -> updateBitrate(result); FlushCallback(result); return result; } #ifdef DEBUG *logofs << "ProxyTransport: Going to compress " << size << " bytes to write buffer for proxy FD#" << fd_ << ".\n" << logofs_flush; #endif // // Compress data into the write buffer. // int saveTotalIn = w_stream_.total_in; int saveTotalOut = w_stream_.total_out; int oldTotalIn = saveTotalIn; int oldTotalOut = saveTotalOut; int diffTotalIn; int diffTotalOut; #ifdef INSPECT *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut << ".\n" << logofs_flush; #endif w_stream_.next_in = (Bytef *) data; w_stream_.avail_in = size; // // Let ZLIB use all the space already // available in the buffer. // unsigned int newAvailOut = w_buffer_.data_.size() - w_buffer_.start_ - w_buffer_.length_; #ifdef TEST *logofs << "ProxyTransport: Initial compress buffer is " << newAvailOut << " bytes.\n" << logofs_flush; #endif for (;;) { #ifdef INSPECT *logofs << "\nProxyTransport: Running the compress loop.\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.length_ = " << w_buffer_.length_ << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.data_.size() = " << w_buffer_.data_.size() << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: newAvailOut = " << newAvailOut << ".\n" << logofs_flush; #endif if (resize(w_buffer_, newAvailOut) < 0) { return -1; } #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.data_.size() = " << w_buffer_.data_.size() << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.next_in = " << (void *) w_stream_.next_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.avail_in = " << w_stream_.avail_in << ".\n" << logofs_flush; #endif w_stream_.next_out = w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_; w_stream_.avail_out = newAvailOut; #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.next_out = " << (void *) w_stream_.next_out << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.avail_out = " << w_stream_.avail_out << ".\n" << logofs_flush; #endif int result = deflate(&w_stream_, (type == write_delayed ? Z_NO_FLUSH : Z_SYNC_FLUSH)); #ifdef INSPECT *logofs << "ProxyTransport: Called deflate() result is " << result << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.avail_in = " << w_stream_.avail_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.avail_out = " << w_stream_.avail_out << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.total_in = " << w_stream_.total_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.total_out = " << w_stream_.total_out << ".\n" << logofs_flush; #endif diffTotalOut = w_stream_.total_out - oldTotalOut; diffTotalIn = w_stream_.total_in - oldTotalIn; #ifdef INSPECT *logofs << "ProxyTransport: diffTotalIn = " << diffTotalIn << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: diffTotalOut = " << diffTotalOut << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.length_ = " << w_buffer_.length_ << ".\n" << logofs_flush; #endif w_buffer_.length_ += diffTotalOut; #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.length_ = " << w_buffer_.length_ << ".\n" << logofs_flush; #endif oldTotalOut = w_stream_.total_out; oldTotalIn = w_stream_.total_in; if (result == Z_OK) { if (w_stream_.avail_in == 0 && w_stream_.avail_out > 0) { break; } } else if (result == Z_BUF_ERROR && w_stream_.avail_out > 0 && w_stream_.avail_in == 0) { #ifdef TEST *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR compressing data.\n" << logofs_flush; #endif break; } else { #ifdef PANIC *logofs << "ProxyTransport: PANIC! Compression of data failed. " << "Error is '" << zError(result) << "'.\n" << logofs_flush; #endif cerr << "Error" << ": Compression of data failed. Error is '" << zError(result) << "'.\n"; finish(); return -1; } // // Add more bytes to the buffer. // if (newAvailOut < thresholdSize_) { newAvailOut = thresholdSize_; } #ifdef TEST *logofs << "ProxyTransport: Need to add " << newAvailOut << " bytes to the compress buffer in write.\n" << logofs_flush; #endif } diffTotalIn = w_stream_.total_in - saveTotalIn; diffTotalOut = w_stream_.total_out - saveTotalOut; #ifdef TEST *logofs << "ProxyTransport: Compressed data from " << diffTotalIn << " to " << diffTotalOut << " bytes.\n" << logofs_flush; if (diffTotalIn != (int) size) { #ifdef PANIC *logofs << "ProxyTransport: PANIC! Bytes provided to ZLIB stream " << "should be " << size << " but they look to be " << diffTotalIn << ".\n" << logofs_flush; #endif } #endif // // Find out what we have to do with the // produced data. // if (type == write_immediate) { // // If user requested an immediate write we // flushed the ZLIB buffer. We can now reset // the counter and write data to socket. // flush_ = 0; #ifdef TEST *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_ << " has data for " << w_buffer_.length_ << " bytes.\n" << logofs_flush; *logofs << "ProxyTransport: Start is " << w_buffer_.start_ << " length is " << w_buffer_.length_ << " flush is " << flush_ << " size is " << w_buffer_.data_.size() << " capacity is " << w_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif // // Alternatively may try to write only if // the socket is not blocked. // // if (w_buffer_.length_ > 0 && blocked_ == 0) // { // ... // } // if (w_buffer_.length_ > 0) { #ifdef TEST *logofs << "ProxyTransport: Writing " << w_buffer_.length_ << " bytes of produced data to FD#" << fd_ << ".\n" << logofs_flush; #endif int result = Transport::flush(); if (result < 0) { return -1; } } } else { // // We haven't flushed the ZLIB compression // buffer, so user will have to call proxy // transport's flush explicitly. // flush_ += diffTotalIn; } // // We either wrote the data or added it to the // write buffer. It's convenient to update the // counters at this stage to get the current // bitrate earlier. // statistics -> addCompressedBytes(diffTotalIn, diffTotalOut); statistics -> addBytesOut(diffTotalOut); statistics -> updateBitrate(diffTotalOut); FlushCallback(diffTotalOut); #ifdef TEST *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_ << " has data for " << w_buffer_.length_ << " bytes.\n" << logofs_flush; *logofs << "ProxyTransport: Start is " << w_buffer_.start_ << " length is " << w_buffer_.length_ << " flush is " << flush_ << " size is " << w_buffer_.data_.size() << " capacity is " << w_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif return size; } // // Write data to its file descriptor. // int ProxyTransport::flush() { // // If there is no compression or we already compressed // outgoing data and just need to write it to socket // because of previous incomplete writes then revert // to plain socket management. // if (flush_ == 0 || control -> LocalStreamCompression == 0) { int result = Transport::flush(); if (result < 0) { return -1; } return result; } #ifdef DEBUG *logofs << "ProxyTransport: Going to flush compression on " << "proxy FD#" << fd_ << ".\n" << logofs_flush; #endif #ifdef TEST *logofs << "ProxyTransport: Flush counter for proxy FD#" << fd_ << " is " << flush_ << " bytes.\n" << logofs_flush; #endif // // Flush ZLIB stream into the write buffer. // int saveTotalIn = w_stream_.total_in; int saveTotalOut = w_stream_.total_out; int oldTotalIn = saveTotalIn; int oldTotalOut = saveTotalOut; int diffTotalOut; int diffTotalIn; #ifdef INSPECT *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut << ".\n" << logofs_flush; #endif w_stream_.next_in = w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_; w_stream_.avail_in = 0; // // Let ZLIB use all the space already // available in the buffer. // unsigned int newAvailOut = w_buffer_.data_.size() - w_buffer_.start_ - w_buffer_.length_; #ifdef DEBUG *logofs << "ProxyTransport: Initial flush buffer is " << newAvailOut << " bytes.\n" << logofs_flush; #endif for (;;) { #ifdef INSPECT *logofs << "\nProxyTransport: Running the flush loop.\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.length_ = " << w_buffer_.length_ << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.data_.size() = " << w_buffer_.data_.size() << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: newAvailOut = " << newAvailOut << ".\n" << logofs_flush; #endif if (resize(w_buffer_, newAvailOut) < 0) { return -1; } #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.data_.size() = " << w_buffer_.data_.size() << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.next_in = " << (void *) w_stream_.next_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.avail_in = " << w_stream_.avail_in << ".\n" << logofs_flush; #endif w_stream_.next_out = w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_; w_stream_.avail_out = newAvailOut; #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.next_out = " << (void *) w_stream_.next_out << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.avail_out = " << w_stream_.avail_out << ".\n" << logofs_flush; #endif int result = deflate(&w_stream_, Z_SYNC_FLUSH); #ifdef INSPECT *logofs << "ProxyTransport: Called deflate() result is " << result << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.avail_in = " << w_stream_.avail_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.avail_out = " << w_stream_.avail_out << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.total_in = " << w_stream_.total_in << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_stream_.total_out = " << w_stream_.total_out << ".\n" << logofs_flush; #endif diffTotalOut = w_stream_.total_out - oldTotalOut; diffTotalIn = w_stream_.total_in - oldTotalIn; #ifdef INSPECT *logofs << "ProxyTransport: diffTotalIn = " << diffTotalIn << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: diffTotalOut = " << diffTotalOut << ".\n" << logofs_flush; #endif #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.length_ = " << w_buffer_.length_ << ".\n" << logofs_flush; #endif w_buffer_.length_ += diffTotalOut; #ifdef INSPECT *logofs << "ProxyTransport: w_buffer_.length_ = " << w_buffer_.length_ << ".\n" << logofs_flush; #endif oldTotalOut = w_stream_.total_out; oldTotalIn = w_stream_.total_in; if (result == Z_OK) { if (w_stream_.avail_in == 0 && w_stream_.avail_out > 0) { break; } } else if (result == Z_BUF_ERROR && w_stream_.avail_out > 0 && w_stream_.avail_in == 0) { #ifdef TEST *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR flushing data.\n" << logofs_flush; #endif break; } else { #ifdef PANIC *logofs << "ProxyTransport: PANIC! Flush of compressed data failed. " << "Error is '" << zError(result) << "'.\n" << logofs_flush; #endif cerr << "Error" << ": Flush of compressed data failed. Error is '" << zError(result) << "'.\n"; finish(); return -1; } // // Add more bytes to the buffer. // if (newAvailOut < thresholdSize_) { newAvailOut = thresholdSize_; } #ifdef TEST *logofs << "ProxyTransport: Need to add " << newAvailOut << " bytes to the compress buffer in flush.\n" << logofs_flush; #endif } diffTotalIn = w_stream_.total_in - saveTotalIn; diffTotalOut = w_stream_.total_out - saveTotalOut; #ifdef TEST *logofs << "ProxyTransport: Compressed flush data from " << diffTotalIn << " to " << diffTotalOut << " bytes.\n" << logofs_flush; #endif // // Time to move data from the write // buffer to the real link. // #ifdef DEBUG *logofs << "ProxyTransport: Reset flush counter for proxy FD#" << fd_ << ".\n" << logofs_flush; #endif flush_ = 0; #ifdef TEST *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_ << " has data for " << w_buffer_.length_ << " bytes.\n" << logofs_flush; *logofs << "ProxyTransport: Start is " << w_buffer_.start_ << " length is " << w_buffer_.length_ << " flush is " << flush_ << " size is " << w_buffer_.data_.size() << " capacity is " << w_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif int result = Transport::flush(); if (result < 0) { return -1; } // // Update all the counters. // statistics -> addCompressedBytes(diffTotalIn, diffTotalOut); statistics -> addBytesOut(diffTotalOut); statistics -> updateBitrate(diffTotalOut); FlushCallback(diffTotalOut); return result; } unsigned int ProxyTransport::getPending(unsigned char *&data) { // // Return a pointer to the data in the // read buffer. It is up to the caller // to ensure that the data is consumed // before the read buffer is reused. // if (r_buffer_.length_ > 0) { unsigned int size = r_buffer_.length_; data = r_buffer_.data_.begin() + r_buffer_.start_; #ifdef DEBUG *logofs << "ProxyTransport: Returning " << size << " pending bytes from proxy FD#" << fd_ << ".\n" << logofs_flush; #endif r_buffer_.length_ = 0; r_buffer_.start_ = 0; // // Prevent the deletion of the buffer. // owner_ = 0; return size; } #ifdef TEST *logofs << "ProxyTransport: WARNING! No pending data " << "for proxy FD#" << fd_ << ".\n" << logofs_flush; #endif data = NULL; return 0; } void ProxyTransport::fullReset() { blocked_ = 0; finish_ = 0; flush_ = 0; if (control -> RemoteStreamCompression) { inflateReset(&r_stream_); } if (control -> LocalStreamCompression) { deflateReset(&w_stream_); } if (owner_ == 1) { Transport::fullReset(r_buffer_); } Transport::fullReset(w_buffer_); } AgentTransport::AgentTransport(int fd) : Transport(fd) { #ifdef TEST *logofs << "AgentTransport: Going to create agent transport " << "for FD#" << fd_ << ".\n" << logofs_flush; #endif type_ = transport_agent; // // Set up the read buffer. // r_buffer_.length_ = 0; r_buffer_.start_ = 0; r_buffer_.data_.resize(initialSize_); // // For now we own the buffer. // owner_ = 1; // // Set up the mutexes. // #ifdef THREADS pthread_mutexattr_t m_attributes; pthread_mutexattr_init(&m_attributes); // // Interfaces in pthread to handle mutex // type do not work in current version. // m_attributes.__mutexkind = PTHREAD_MUTEX_ERRORCHECK_NP; if (pthread_mutex_init(&m_read_, &m_attributes) != 0) { #ifdef TEST *logofs << "AgentTransport: Child: Creation of read mutex failed. " << "Error is " << EGET() << " '" << ESTR() << "'.\n" << logofs_flush; #endif } if (pthread_mutex_init(&m_write_, &m_attributes) != 0) { #ifdef TEST *logofs << "AgentTransport: Child: Creation of write mutex failed. " << "Error is " << EGET() << " '" << ESTR() << "'.\n" << logofs_flush; #endif } #endif #ifdef REFERENCES *logofs << "AgentTransport: Child: Created new object at " << this << " out of " << ++references_ << " allocated references.\n" << logofs_flush; #endif } AgentTransport::~AgentTransport() { #ifdef TEST *logofs << "AgentTransport: Going to destroy derived class " << "for FD#" << fd_ << ".\n" << logofs_flush; #endif // // Unlock and free all mutexes. // #ifdef THREADS pthread_mutex_unlock(&m_read_); pthread_mutex_unlock(&m_write_); pthread_mutex_destroy(&m_read_); pthread_mutex_destroy(&m_write_); #endif #ifdef REFERENCES *logofs << "AgentTransport: Child: Deleted object at " << this << " out of " << --references_ << " allocated references.\n" << logofs_flush; #endif } // // Read data enqueued by the other thread. // int AgentTransport::read(unsigned char *data, unsigned int size) { #ifdef THREADS lockRead(); #endif #ifdef DEBUG *logofs << "AgentTransport: Child: Going to read " << size << " bytes from " << "FD#" << fd_ << ".\n" << logofs_flush; #endif int copied = -1; if (r_buffer_.length_ > 0) { if ((int) size < r_buffer_.length_) { #ifdef TEST *logofs << "AgentTransport: WARNING! Forcing a retry with " << r_buffer_.length_ << " bytes pending and " << size << " in the buffer.\n" << logofs_flush; #endif ESET(EAGAIN); } else { copied = (r_buffer_.length_ > ((int) size) ? ((int) size) : r_buffer_.length_); memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied); // // Update the buffer status. // #ifdef TEST *logofs << "AgentTransport: Child: Going to immediately return " << copied << " bytes from FD#" << fd_ << ".\n" << logofs_flush; #endif #ifdef DUMP *logofs << "AgentTransport: Child: Dumping content of read data.\n" << logofs_flush; DumpData(data, copied); #endif r_buffer_.length_ -= copied; if (r_buffer_.length_ == 0) { r_buffer_.start_ = 0; } else { r_buffer_.start_ += copied; #ifdef TEST *logofs << "AgentTransport: Child: There are still " << r_buffer_.length_ << " bytes in read buffer for " << "FD#" << fd_ << ".\n" << logofs_flush; #endif } } } else { #ifdef DEBUG *logofs << "AgentTransport: Child: No data can be got " << "from read buffer for FD#" << fd_ << ".\n" << logofs_flush; #endif ESET(EAGAIN); } #ifdef THREADS unlockRead(); #endif return copied; } // // Write data to buffer so that the other // thread can get it. // int AgentTransport::write(T_write type, const unsigned char *data, const unsigned int size) { #ifdef THREADS lockWrite(); #endif // // Just append data to socket's write buffer. // Note that we don't care if buffer exceeds // the size limits set for this type of // transport. // #ifdef TEST *logofs << "AgentTransport: Child: Going to append " << size << " bytes to write buffer for " << "FD#" << fd_ << ".\n" << logofs_flush; #endif int copied = -1; if (resize(w_buffer_, size) < 0) { finish(); ESET(EPIPE); } else { memmove(w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_, data, size); w_buffer_.length_ += size; #ifdef DUMP *logofs << "AgentTransport: Child: Dumping content of written data.\n" << logofs_flush; DumpData(data, size); #endif #ifdef TEST *logofs << "AgentTransport: Child: Write buffer for FD#" << fd_ << " has data for " << w_buffer_.length_ << " bytes.\n" << logofs_flush; *logofs << "AgentTransport: Child: Start is " << w_buffer_.start_ << " length is " << w_buffer_.length_ << " size is " << w_buffer_.data_.size() << " capacity is " << w_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif copied = size; } // // Let child access again the read buffer. // #ifdef THREADS unlockWrite(); #endif return copied; } int AgentTransport::flush() { // // In case of memory-to-memory transport // this function should never be called. // #ifdef PANIC *logofs << "AgentTransport: Child: PANIC! Called flush() for " << "memory to memory transport on " << "FD#" << fd_ << ".\n" << logofs_flush; #endif cerr << "Error" << ": Called flush() for " << "memory to memory transport on " << "FD#" << fd_ << ".\n"; HandleAbort(); } int AgentTransport::drain(int limit, int timeout) { // // We can't drain the channel in the case // of the memory-to-memory transport. Data // is enqueued for the agent to read but // the agent could require multiple loops // to read it all. // // // In case of memory-to-memory transport // this function should never be called. // #ifdef PANIC *logofs << "AgentTransport: Child: PANIC! Called drain() for " << "memory to memory transport on " << "FD#" << fd_ << ".\n" << logofs_flush; #endif cerr << "Error" << ": Called drain() for " << "memory to memory transport on " << "FD#" << fd_ << ".\n"; HandleAbort(); } unsigned int AgentTransport::getPending(unsigned char *&data) { #ifdef THREADS lockRead(); #endif if (r_buffer_.length_ > 0) { unsigned int size = r_buffer_.length_; data = r_buffer_.data_.begin() + r_buffer_.start_; #ifdef DEBUG *logofs << "AgentTransport: Child: Returning " << size << " pending bytes from FD#" << fd_ << ".\n" << logofs_flush; #endif r_buffer_.length_ = 0; r_buffer_.start_ = 0; #ifdef THREADS unlockRead(); #endif // // Prevent the deletion of the buffer. // owner_ = 0; return size; } #ifdef TEST *logofs << "AgentTransport: WARNING! No pending data " << "for FD#" << fd_ << ".\n" << logofs_flush; #endif #ifdef THREADS unlockRead(); #endif data = NULL; return 0; } void AgentTransport::fullReset() { #ifdef THREADS lockRead(); lockWrite(); #endif #ifdef TEST *logofs << "AgentTransport: Child: Resetting transport " << "for FD#" << fd_ << ".\n" << logofs_flush; #endif blocked_ = 0; finish_ = 0; if (owner_ == 1) { Transport::fullReset(r_buffer_); } Transport::fullReset(w_buffer_); } int AgentTransport::enqueue(const char *data, const int size) { #ifdef THREADS lockRead(); #endif if (finish_ == 1) { #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: Returning EPIPE in " << "write for finishing FD#" << fd_ << ".\n" << logofs_flush; #endif ESET(EPIPE); return -1; } // // Always allow the agent to write // all its data. // int toPut = size; #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: Going to put " << toPut << " bytes into read buffer for FD#" << fd_ << ". Buffer length is " << r_buffer_.length_ << ".\n" << logofs_flush; #endif if (resize(r_buffer_, toPut) < 0) { finish(); #ifdef THREADS unlockRead(); #endif return -1; } memcpy(r_buffer_.data_.begin() + r_buffer_.start_ + r_buffer_.length_, data, toPut); r_buffer_.length_ += toPut; #if defined(DUMP) && defined(PARENT) *logofs << "AgentTransport: Parent: Dumping content of enqueued data.\n" << logofs_flush; DumpData((const unsigned char *) data, toPut); #endif #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: Read buffer for FD#" << fd_ << " has now data for " << r_buffer_.length_ << " bytes.\n" << logofs_flush; *logofs << "AgentTransport: Parent: Start is " << r_buffer_.start_ << " length is " << r_buffer_.length_ << " size is " << r_buffer_.data_.size() << " capacity is " << r_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif #ifdef THREADS unlockRead(); #endif return toPut; } int AgentTransport::dequeue(char *data, int size) { #ifdef THREADS lockWrite(); #endif if (w_buffer_.length_ == 0) { if (finish_ == 1) { #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: Returning 0 in read " << "for finishing FD#" << fd_ << ".\n" << logofs_flush; #endif return 0; } #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: No data can be read " << "from write buffer for FD#" << fd_ << ".\n" << logofs_flush; #endif ESET(EAGAIN); #ifdef THREADS unlockWrite(); #endif return -1; } // // Return as many bytes as possible. // int toGet = ((int) size > w_buffer_.length_ ? w_buffer_.length_ : size); #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: Going to get " << toGet << " bytes from write buffer for FD#" << fd_ << ".\n" << logofs_flush; #endif memcpy(data, w_buffer_.data_.begin() + w_buffer_.start_, toGet); w_buffer_.start_ += toGet; w_buffer_.length_ -= toGet; #if defined(DUMP) && defined(PARENT) *logofs << "AgentTransport: Parent: Dumping content of dequeued data.\n" << logofs_flush; DumpData((const unsigned char *) data, toGet); #endif #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: Write buffer for FD#" << fd_ << " has now data for " << length() << " bytes.\n" << logofs_flush; *logofs << "AgentTransport: Parent: Start is " << w_buffer_.start_ << " length is " << w_buffer_.length_ << " size is " << w_buffer_.data_.size() << " capacity is " << w_buffer_.data_.capacity() << ".\n" << logofs_flush; #endif #ifdef THREADS unlockWrite(); #endif return toGet; } int AgentTransport::dequeuable() { if (finish_ == 1) { #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: Returning EPIPE in " << "readable for finishing FD#" << fd_ << ".\n" << logofs_flush; #endif ESET(EPIPE); return -1; } #if defined(PARENT) && defined(TEST) *logofs << "AgentTransport: Parent: Returning " << w_buffer_.length_ << " as data readable " << "from read buffer for FD#" << fd_ << ".\n" << logofs_flush; #endif return w_buffer_.length_; } #ifdef THREADS int AgentTransport::lockRead() { for (;;) { int result = pthread_mutex_lock(&m_read_); if (result == 0) { #ifdef DEBUG *logofs << "AgentTransport: Read mutex locked by thread id " << pthread_self() << ".\n" << logofs_flush; #endif return 0; } else if (EGET() == EINTR) { continue; } else { #ifdef WARNING *logofs << "AgentTransport: WARNING! Locking of read mutex by thread id " << pthread_self() << " returned " << result << ". Error is '" << ESTR() << "'.\n" << logofs_flush; #endif return result; } } } int AgentTransport::lockWrite() { for (;;) { int result = pthread_mutex_lock(&m_write_); if (result == 0) { #ifdef DEBUG *logofs << "AgentTransport: Write mutex locked by thread id " << pthread_self() << ".\n" << logofs_flush; #endif return 0; } else if (EGET() == EINTR) { continue; } else { #ifdef WARNING *logofs << "AgentTransport: WARNING! Locking of write mutex by thread id " << pthread_self() << " returned " << result << ". Error is '" << ESTR() << "'.\n" << logofs_flush; #endif return result; } } } int AgentTransport::unlockRead() { for (;;) { int result = pthread_mutex_unlock(&m_read_); if (result == 0) { #ifdef DEBUG *logofs << "AgentTransport: Read mutex unlocked by thread id " << pthread_self() << ".\n" << logofs_flush; #endif return 0; } else if (EGET() == EINTR) { continue; } else { #ifdef WARNING *logofs << "AgentTransport: WARNING! Unlocking of read mutex by thread id " << pthread_self() << " returned " << result << ". Error is '" << ESTR() << "'.\n" << logofs_flush; #endif return result; } } } int AgentTransport::unlockWrite() { for (;;) { int result = pthread_mutex_unlock(&m_write_); if (result == 0) { #ifdef DEBUG *logofs << "AgentTransport: Write mutex unlocked by thread id " << pthread_self() << ".\n" << logofs_flush; #endif return 0; } else if (EGET() == EINTR) { continue; } else { #ifdef WARNING *logofs << "AgentTransport: WARNING! Unlocking of write mutex by thread id " << pthread_self() << " returned " << result << ". Error is '" << ESTR() << "'.\n" << logofs_flush; #endif return result; } } } #endif