/**************************************************************************/
/*                                                                        */
/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com)          */
/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de>  */
/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de>                 */
/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de>                */
/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/
/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com)           */
/*                                                                        */
/* NXCOMP, NX protocol compression and NX extensions to this software     */
/* are copyright of the aforementioned persons and companies.             */
/*                                                                        */
/* Redistribution and use of the present software is allowed according    */
/* to terms specified in the file LICENSE.nxcomp which comes in the       */
/* source distribution.                                                   */
/*                                                                        */
/* All rights reserved.                                                   */
/*                                                                        */
/* NOTE: This software has received contributions from various other      */
/* contributors, only the core maintainers and supporters are listed as   */
/* copyright holders. Please contact us, if you feel you should be listed */
/* as copyright holder, as well.                                          */
/*                                                                        */
/**************************************************************************/

#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <sys/socket.h>

#include "Transport.h"

#include "Statistics.h"

//
// Set the verbosity level. You also
// need to define DUMP in Misc.cpp
// if DUMP is defined here.
//

#define PANIC
#define WARNING
#undef  TEST
#undef  DEBUG
#undef  INSPECT
#undef  DUMP

//
// Used to lock and unlock the transport
// buffers before they are accessed by
// different threads.
//

#undef  THREADS

//
// Define this to get logging all the
// operations performed by the parent
// thread, the one that enqueues and
// dequeues data.
//

#define PARENT

//
// Define this to know when a channel
// is created or destroyed.
//

#undef  REFERENCES

//
// Reference count for allocated buffers.
//

#ifdef REFERENCES

int Transport::references_;
int ProxyTransport::references_;
int InternalTransport::references_;

#endif

//
// This is the base class providing methods for read
// and write buffering.
//

Transport::Transport(int fd) : fd_(fd)
{
  #ifdef TEST
  *logofs << "Transport: Going to create base transport "
          << "for FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  type_ = transport_base;

  //
  // Set up the write buffer.
  //

  w_buffer_.length_ = 0;
  w_buffer_.start_  = 0;

  initialSize_   = TRANSPORT_BUFFER_DEFAULT_SIZE;
  thresholdSize_ = TRANSPORT_BUFFER_DEFAULT_SIZE << 1;
  maximumSize_   = TRANSPORT_BUFFER_DEFAULT_SIZE << 4;

  w_buffer_.data_.resize(initialSize_);

  //
  // Set non-blocking IO on socket.
  //

  SetNonBlocking(fd_, 1);

  blocked_ = 0;
  finish_  = 0;

  #ifdef REFERENCES
  *logofs << "Transport: Created new object at " 
          << this << " out of " << ++references_ 
          << " allocated references.\n" << logofs_flush;
  #endif
}

Transport::~Transport()
{
  #ifdef TEST
  *logofs << "Transport: Going to destroy base class "
          << "for FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  ::close(fd_);

  #ifdef REFERENCES
  *logofs << "Transport: Deleted object at " 
          << this << " out of " << --references_ 
          << " allocated references.\n" << logofs_flush;
  #endif
}

//
// Read data from its file descriptor.
//

int Transport::read(unsigned char *data, unsigned int size)
{
  #ifdef DEBUG
  *logofs << "Transport: Going to read " << size << " bytes from " 
          << "FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  //
  // Read the available data from the socket.
  //

  int result = ::read(fd_, data, size);

  //
  // Update the current timestamp as the read
  // can have scheduled some other process.
  //

  getNewTimestamp();

  if (result < 0)
  {
    if (EGET() == EAGAIN)
    {
      #ifdef TEST
      *logofs << "Transport: WARNING! Read of " << size << " bytes from "
              << "FD#" << fd_ << " would block.\n" << logofs_flush;
      #endif

      return 0;
    }
    else if (EGET() == EINTR)
    {
      #ifdef TEST
      *logofs << "Transport: Read of " << size << " bytes from "
              << "FD#" << fd_ << " was interrupted.\n"
              << logofs_flush;
      #endif

      return 0;
    }
    else
    {
      #ifdef TEST
      *logofs << "Transport: Error reading from " 
              << "FD#" << fd_ << ".\n" << logofs_flush;
      #endif

      finish();

      return -1;
    }
  }
  else if (result == 0)
  {
    #ifdef TEST
    *logofs << "Transport: No data read from " 
            << "FD#" << fd_ << ".\n" << logofs_flush;
    #endif

    finish();

    return -1;
  }

  #ifdef TEST
  *logofs << "Transport: Read " << result << " bytes out of "
          << size << " from FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  #ifdef DUMP

  *logofs << "Transport: Dumping content of read data.\n"
          << logofs_flush;

  DumpData(data, result);

  #endif

  return result;
}

//
// Write as many bytes as possible to socket. 
// Append the remaining data bytes to the end 
// of the buffer and update length to reflect
// changes.
//

int Transport::write(T_write type, const unsigned char *data, const unsigned int size)
{
  //
  // If an immediate write was requested then
  // flush the enqueued data first.
  //
  // Alternatively may try to write only if
  // the socket is not blocked.
  //
  // if (w_buffer_.length_ > 0 && blocked_ == 0 &&
  //         type == write_immediate)
  // {
  //   ...
  // }
  //

  if (w_buffer_.length_ > 0 && type == write_immediate)

  {
    #ifdef TEST
    *logofs << "Transport: Writing " << w_buffer_.length_
            << " bytes of previous data to FD#" << fd_ << ".\n"
            << logofs_flush;
    #endif

    int result = Transport::flush();

    if (result < 0)
    {
      return -1;
    }
  }

  //
  // If nothing is remained, write immediately
  // to the socket.
  //

  unsigned int written = 0;

  if (w_buffer_.length_ == 0 && blocked_ == 0 &&
          type == write_immediate)
  {
    //
    // Limit the amount of data sent.
    //

    unsigned int toWrite = size;

    #ifdef DUMP

    *logofs << "Transport: Going to write " << toWrite
            << " bytes to FD#" << fd_ << " with checksum ";

    DumpChecksum(data, size);

    *logofs << ".\n" << logofs_flush;

    #endif

    T_timestamp writeTs;

    int diffTs;

    while (written < toWrite)
    {
      //
      // Trace system time spent writing data.
      //

      writeTs = getTimestamp();

      int result = ::write(fd_, data + written, toWrite - written);

      diffTs = diffTimestamp(writeTs, getNewTimestamp());

      statistics -> addWriteTime(diffTs);

      if (result <= 0)
      {
        if (EGET() == EAGAIN)
        {
          #ifdef TEST
          *logofs << "Transport: Write of " << toWrite - written
                  << " bytes on FD#" << fd_ << " would block.\n"
                  << logofs_flush;
          #endif

          blocked_ = 1;

          break;
        }
        else if (EGET() == EINTR)
        {
          #ifdef TEST
          *logofs << "Transport: Write of " << toWrite - written
                  << " bytes on FD#" << fd_ << " was interrupted.\n"
                  << logofs_flush;
          #endif

          continue;
        }
        else
        {
          #ifdef TEST
          *logofs << "Transport: Write to " << "FD#" 
                  << fd_ << " failed.\n" << logofs_flush;
          #endif

          finish();

          return -1;
        }
      }
      else
      {
        #ifdef TEST
        *logofs << "Transport: Immediately written " << result
                << " bytes on " << "FD#" << fd_ << ".\n"
                << logofs_flush;
        #endif

        written += result;
      }
    }

    #ifdef DUMP

    if (written > 0)
    {
      *logofs << "Transport: Dumping content of immediately written data.\n"
              << logofs_flush;

      DumpData(data, written);
    }

    #endif
  }

  if (written == size)
  {
    //
    // We will not affect the write buffer.
    //

    return written;
  }

  #ifdef DEBUG
  *logofs << "Transport: Going to append " << size - written
          << " bytes to write buffer for " << "FD#" << fd_
          << ".\n" << logofs_flush;
  #endif

  if (resize(w_buffer_, size - written) < 0)
  {
    return -1;
  }

  memmove(w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_,
              data + written, size - written);

  w_buffer_.length_ += size - written;

  #ifdef TEST
  *logofs << "Transport: Write buffer for FD#" << fd_
          << " has data for " << w_buffer_.length_ << " bytes.\n"
          << logofs_flush;

  *logofs << "Transport: Start is " << w_buffer_.start_ 
          << " length is " << w_buffer_.length_ << " size is "
          << w_buffer_.data_.size() << " capacity is "
          << w_buffer_.data_.capacity() << ".\n"
          << logofs_flush;
  #endif

  //
  // Note that this function always returns the whole
  // size of buffer that was provided, either if not
  // all the data could be actually written.
  //

  return size;
}

//
// Write pending data to its file descriptor.
//

int Transport::flush()
{
  if (w_buffer_.length_ == 0)
  {
    #ifdef TEST
    *logofs << "Transport: No data to flush on "
            << "FD#" << fd_ << ".\n" << logofs_flush;
    #endif

    #ifdef WARNING
    if (blocked_ != 0)
    {
      *logofs << "Transport: Blocked flag is " << blocked_
              << " with no data to flush on FD#" << fd_
              << ".\n" << logofs_flush;
    }
    #endif

    return 0;
  }

  //
  // It's time to move data from the 
  // write buffer to the real link. 
  //

  int written = 0;

  int toWrite = w_buffer_.length_;

  //
  // We will do our best to write any available
  // data to the socket, so let's say we start
  // from a clean state.
  //

  blocked_ = 0;

  #ifdef TEST
  *logofs << "Transport: Going to flush " << toWrite
          << " bytes on FD#" << fd_ << ".\n"
          << logofs_flush;
  #endif

  T_timestamp writeTs;

  int diffTs;

  while (written < toWrite)
  {
    writeTs = getTimestamp();

    int result = ::write(fd_, w_buffer_.data_.begin() + w_buffer_.start_ +
                             written, toWrite - written);

    diffTs = diffTimestamp(writeTs, getNewTimestamp());

    statistics -> addWriteTime(diffTs);

    if (result <= 0)
    {
      if (EGET() == EAGAIN)
      {
        #ifdef TEST
        *logofs << "Transport: Write of " << toWrite - written
                << " bytes on FD#" << fd_ << " would block.\n"
                << logofs_flush;
        #endif

        blocked_ = 1;

        break;
      }
      else if (EGET() == EINTR)
      {
        #ifdef TEST
        *logofs << "Transport: Write of " << toWrite - written
                << " bytes on FD#" << fd_ << " was interrupted.\n"
                << logofs_flush;
        #endif

        continue;
      }
      else
      {
        #ifdef TEST
        *logofs << "Transport: Write to " << "FD#" 
                << fd_ << " failed.\n" << logofs_flush;
        #endif

        finish();

        return -1;
      }
    }
    else
    {
      #ifdef TEST
      *logofs << "Transport: Flushed " << result << " bytes on " 
              << "FD#" << fd_ << ".\n" << logofs_flush;
      #endif

      written += result;
    }
  }

  if (written > 0)
  {
    #ifdef DUMP

    *logofs << "Transport: Dumping content of flushed data.\n"
            << logofs_flush;

    DumpData(w_buffer_.data_.begin() + w_buffer_.start_, written);

    #endif

    //
    // Update the buffer status.
    //

    w_buffer_.length_ -= written;

    if (w_buffer_.length_ == 0)
    {
      w_buffer_.start_ = 0;
    }
    else
    {
      w_buffer_.start_ += written;
    }
  }

  //
  // It can be that we wrote less bytes than
  // available because of the write limit.
  //

  if (w_buffer_.length_ > 0)
  {
    #ifdef TEST
    *logofs << "Transport: There are still " << w_buffer_.length_ 
            << " bytes in write buffer for " << "FD#" 
            << fd_ << ".\n" << logofs_flush;
    #endif

    blocked_ = 1;
  }

  #ifdef TEST
  *logofs << "Transport: Write buffer for FD#" << fd_
          << " has data for " << w_buffer_.length_ << " bytes.\n"
          << logofs_flush;

  *logofs << "Transport: Start is " << w_buffer_.start_ 
          << " length is " << w_buffer_.length_ << " size is "
          << w_buffer_.data_.size() << " capacity is "
          << w_buffer_.data_.capacity() << ".\n"
          << logofs_flush;
  #endif

  //
  // No new data was produced for the link except
  // any outstanding data from previous writes.
  //

  return 0;
}

int Transport::drain(int limit, int timeout)
{
  if (w_buffer_.length_ <= limit)
  {
    return 1;
  }

  //
  // Write the data accumulated in the write
  // buffer until it is below the limit or
  // the timeout is elapsed.
  //

  int toWrite = w_buffer_.length_;

  int written = 0;

  #ifdef TEST
  *logofs << "Transport: Draining " << toWrite - limit
          << " bytes on FD#" << fd_ << " with limit set to "
          << limit << ".\n" << logofs_flush;
  #endif

  T_timestamp startTs = getNewTimestamp();

  T_timestamp selectTs;
  T_timestamp writeTs;
  T_timestamp idleTs;

  T_timestamp nowTs = startTs;

  int diffTs;

  fd_set writeSet;
  fd_set readSet;

  FD_ZERO(&writeSet);
  FD_ZERO(&readSet);

  int result;
  int ready;

  while (w_buffer_.length_ - written > limit)
  {
    nowTs = getNewTimestamp();

    //
    // Wait for descriptor to become
    // readable or writable.
    //

    FD_SET(fd_, &writeSet);
    FD_SET(fd_, &readSet);

    setTimestamp(selectTs, timeout / 2);

    idleTs = nowTs;

    result = select(fd_ + 1, &readSet, &writeSet, NULL, &selectTs);

    nowTs = getNewTimestamp();

    diffTs = diffTimestamp(idleTs, nowTs);

    statistics -> addIdleTime(diffTs);

    statistics -> subReadTime(diffTs);

    if (result < 0)
    {
      if (EGET() == EINTR)
      {
        #ifdef TEST
        *logofs << "Transport: Select on FD#" << fd_
                << " was interrupted.\n" << logofs_flush;
        #endif

        continue;
      }
      else
      {
        #ifdef TEST
        *logofs << "Transport: Select on FD#" << fd_
                << " failed.\n" << logofs_flush;
        #endif

        finish();

        return -1;
      }
    }
    else if (result > 0)
    {
      ready = result;

      if (FD_ISSET(fd_, &writeSet))
      {
        writeTs = getNewTimestamp();

        result = ::write(fd_, w_buffer_.data_.begin() + w_buffer_.start_ +
                             written, toWrite - written);

        nowTs = getNewTimestamp();

        diffTs = diffTimestamp(writeTs, nowTs);

        statistics -> addWriteTime(diffTs);

        if (result > 0)
        {
          #ifdef TEST
          *logofs << "Transport: Forced flush of " << result
                  << " bytes on " << "FD#" << fd_ << ".\n"
                  << logofs_flush;
          #endif

          written += result;
        }
        else if (result < 0 && EGET() == EINTR)
        {
          #ifdef TEST
          *logofs << "Transport: Write to FD#" << fd_
                  << " was interrupted.\n" << logofs_flush;
          #endif

          continue;
        }
        else
        {
          #ifdef TEST
          *logofs << "Transport: Write to FD#" << fd_
                  << " failed.\n" << logofs_flush;
          #endif

          finish();

          return -1;
        }

        ready--;
      }

      if (ready > 0)
      {
        if (FD_ISSET(fd_, &readSet))
        {
          #ifdef TEST
          *logofs << "Transport: Not draining further "
                  << "due to data readable on FD#" << fd_
                  << ".\n" << logofs_flush;
          #endif

          break;
        }
      }
    }
    #ifdef TEST
    else
    {
      *logofs << "Transport: Timeout encountered "
              << "waiting for FD#" << fd_ << ".\n"
              << logofs_flush;
    }
    #endif

    nowTs = getNewTimestamp();

    diffTs = diffTimestamp(startTs, nowTs);

    if (diffTs >= timeout)
    {
      #ifdef TEST
      *logofs << "Transport: Not draining further "
              << "due to the timeout on FD#" << fd_
              << ".\n" << logofs_flush;
      #endif

      break;
    }
  }

  if (written > 0)
  {
    #ifdef DUMP

    *logofs << "Transport: Dumping content of flushed data.\n"
            << logofs_flush;

    DumpData(w_buffer_.data_.begin() + w_buffer_.start_, written);

    #endif

    //
    // Update the buffer status.
    //

    w_buffer_.length_ -= written;

    if (w_buffer_.length_ == 0)
    {
      w_buffer_.start_ = 0;

      blocked_ = 0;
    }
    else
    {
      w_buffer_.start_ += written;

      #ifdef TEST
      *logofs << "Transport: There are still " << w_buffer_.length_ 
              << " bytes in write buffer for " << "FD#" 
              << fd_ << ".\n" << logofs_flush;
      #endif

      blocked_ = 1;
    }
  }
  #ifdef TEST
  else
  {
    *logofs << "Transport: WARNING! No data written to FD#" << fd_
            << " with " << toWrite  << " bytes to drain and limit "
            << "set to " << limit << ".\n" << logofs_flush;
  }
  #endif

  #ifdef TEST
  *logofs << "Transport: Write buffer for FD#" << fd_
          << " has data for " << w_buffer_.length_ << " bytes.\n"
          << logofs_flush;

  *logofs << "Transport: Start is " << w_buffer_.start_ 
          << " length is " << w_buffer_.length_ << " size is "
          << w_buffer_.data_.size() << " capacity is "
          << w_buffer_.data_.capacity() << ".\n"
          << logofs_flush;
  #endif

  return (w_buffer_.length_ <= limit);
}

int Transport::wait(int timeout) const
{
  T_timestamp startTs = getNewTimestamp();

  T_timestamp idleTs;
  T_timestamp selectTs;

  T_timestamp nowTs = startTs;

  long available = 0;
  int  result    = 0;

  int diffTs;

  fd_set readSet;

  FD_ZERO(&readSet);
  FD_SET(fd_, &readSet);

  for (;;)
  {
    available = readable();

    diffTs = diffTimestamp(startTs, nowTs);

    if (available != 0 || timeout == 0 ||
            (diffTs + (timeout / 10)) >= timeout)
    {
      #ifdef TEST
      *logofs << "Transport: There are " << available
              << " bytes on FD#" << fd_ << " after "
              << diffTs << " Ms.\n" << logofs_flush;
      #endif

      return available;
    }
    else if (available == 0 && result > 0)
    {
      #ifdef TEST
      *logofs << "Transport: Read on " << "FD#" 
              << fd_ << " failed.\n" << logofs_flush;
      #endif

      return -1;
    }

    //
    // TODO: Should subtract the time
    // already spent in select.
    //

    selectTs.tv_sec  = 0;
    selectTs.tv_usec = timeout * 1000;

    idleTs = nowTs;

    //
    // Wait for descriptor to become readable.
    //

    result = select(fd_ + 1, &readSet, NULL, NULL, &selectTs);

    nowTs = getNewTimestamp();

    diffTs = diffTimestamp(idleTs, nowTs);

    statistics -> addIdleTime(diffTs);

    statistics -> subReadTime(diffTs);

    if (result < 0)
    {
      if (EGET() == EINTR)
      {
        #ifdef TEST
        *logofs << "Transport: Select on FD#" << fd_
                << " was interrupted.\n" << logofs_flush;
        #endif

        continue;
      }
      else
      {
        #ifdef TEST
        *logofs << "Transport: Select on " << "FD#" 
                << fd_ << " failed.\n" << logofs_flush;
        #endif

        return -1;
      }
    }
    #ifdef TEST
    else if (result == 0)
    {
      *logofs << "Transport: No data available on FD#" << fd_
              << " after " << diffTimestamp(startTs, nowTs)
              << " Ms.\n" << logofs_flush;
    }
    else
    {
      *logofs << "Transport: Data became available on FD#" << fd_
              << " after " << diffTimestamp(startTs, nowTs)
              << " Ms.\n" << logofs_flush;
    }
    #endif
  }
}

void Transport::setSize(unsigned int initialSize, unsigned int thresholdSize,
                            unsigned int maximumSize)
{
  initialSize_   = initialSize;
  thresholdSize_ = thresholdSize;
  maximumSize_   = maximumSize;

  #ifdef TEST
  *logofs << "Transport: Set buffer sizes for FD#" << fd_
          << " to " << initialSize_ << "/" << thresholdSize_
          << "/" << maximumSize_ << ".\n" << logofs_flush;
  #endif
}

void Transport::fullReset()
{
  blocked_ = 0;
  finish_  = 0;

  fullReset(w_buffer_);
}

int Transport::resize(T_buffer &buffer, const int &size)
{
  if ((int) buffer.data_.size() >= (buffer.length_ + size) &&
          (buffer.start_ + buffer.length_ + size) >
               (int) buffer.data_.size())
  {
    if (buffer.length_ > 0)
    {
      //
      // There is enough space in buffer but we need
      // to move existing data at the beginning.
      //

      #ifdef TEST
      *logofs << "Transport: Moving " << buffer.length_
              << " bytes of data for " << "FD#" << fd_
              << " to make room in the buffer.\n"
              << logofs_flush;
      #endif

      memmove(buffer.data_.begin(), buffer.data_.begin() +
                  buffer.start_, buffer.length_);
    }

    buffer.start_ = 0;

    #ifdef DEBUG
    *logofs << "Transport: Made room for " 
            << buffer.data_.size() - buffer.start_
            << " bytes in buffer for " << "FD#" 
            << fd_ << ".\n" << logofs_flush;
    #endif
  }
  else if ((buffer.length_ + size) > (int) buffer.data_.size())
  {
    //
    // Not enough space, so increase
    // the size of the buffer.
    //

    if (buffer.start_ != 0 && buffer.length_ > 0)
    {
      #ifdef TEST
      *logofs << "Transport: Moving " << buffer.length_
              << " bytes of data for " << "FD#" << fd_
              << " to resize the buffer.\n"
              << logofs_flush;
      #endif

      memmove(buffer.data_.begin(), buffer.data_.begin() +
                  buffer.start_, buffer.length_);
    }

    buffer.start_ = 0;

    unsigned int newSize = thresholdSize_;

    while (newSize < (unsigned int) buffer.length_ + size)
    {
      newSize <<= 1;

      if (newSize >= maximumSize_)
      {
        newSize = buffer.length_ + size + initialSize_;
      }
    }

    #ifdef DEBUG
    *logofs << "Transport: Buffer for " << "FD#" << fd_
            << " will be enlarged from " << buffer.data_.size()
            << " to at least " << buffer.length_ + size
            << " bytes.\n" << logofs_flush;
    #endif

    buffer.data_.resize(newSize);

    #ifdef TEST
    if (newSize >= maximumSize_)
    {
      *logofs << "Transport: WARNING! Buffer for FD#" << fd_
              << " grown to reach size of " << newSize
              << " bytes.\n" << logofs_flush;
    }
    #endif

    #ifdef TEST
    *logofs << "Transport: Data buffer for " << "FD#" 
            << fd_ << " has now size " << buffer.data_.size() 
            << " and capacity " << buffer.data_.capacity()
            << ".\n" << logofs_flush;
    #endif
  }

  return (buffer.length_ + size);
}

void Transport::fullReset(T_buffer &buffer)
{
  //
  // Force deallocation and allocation
  // of the initial size.
  //

  #ifdef TEST
  *logofs << "Transport: Resetting buffer for " << "FD#"
          << fd_ << " with size " << buffer.data_.size()
          << " and capacity " << buffer.data_.capacity()
          << ".\n" << logofs_flush;
  #endif

  buffer.start_  = 0;
  buffer.length_ = 0;

  if (buffer.data_.size() > (unsigned int) initialSize_ &&
          buffer.data_.capacity() > (unsigned int) initialSize_)
  {
    buffer.data_.clear();

    buffer.data_.resize(initialSize_);

    #ifdef TEST
    *logofs << "Transport: Data buffer for " << "FD#"
            << fd_ << " shrunk to size " << buffer.data_.size()
            << " and capacity " << buffer.data_.capacity()
            << ".\n" << logofs_flush;
    #endif
  }
}

ProxyTransport::ProxyTransport(int fd) : Transport(fd)
{
  #ifdef TEST
  *logofs << "ProxyTransport: Going to create proxy transport "
          << "for FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  type_ = transport_proxy;

  //
  // Set up the read buffer.
  //

  r_buffer_.length_ = 0;
  r_buffer_.start_  = 0;

  r_buffer_.data_.resize(initialSize_);

  //
  // For now we own the buffer.
  //

  owner_ = 1;

  //
  // Set up ZLIB compression.
  //

  int result;

  r_stream_.zalloc = NULL;
  r_stream_.zfree  = NULL;
  r_stream_.opaque = NULL;

  r_stream_.next_in  = NULL;
  r_stream_.avail_in = 0;

  if ((result = inflateInit2(&r_stream_, 15)) != Z_OK)
  {
    #ifdef PANIC
    *logofs << "ProxyTransport: PANIC! Failed initialization of ZLIB read stream. "
            << "Error is '" << zError(result) << "'.\n" << logofs_flush;
    #endif

    cerr << "Error" << ": Failed initialization of ZLIB read stream. "
         << "Error is '" << zError(result) << "'.\n";

    HandleCleanup();
  }

  if (control -> LocalStreamCompression)
  {
    w_stream_.zalloc = NULL;
    w_stream_.zfree  = NULL;
    w_stream_.opaque = NULL;

    if ((result = deflateInit2(&w_stream_, control -> LocalStreamCompressionLevel, Z_DEFLATED,
                                   15, 9, Z_DEFAULT_STRATEGY)) != Z_OK)
    {
      #ifdef PANIC
      *logofs << "ProxyTransport: PANIC! Failed initialization of ZLIB write stream. "
              << "Error is '" << zError(result) << "'.\n" << logofs_flush;
      #endif

      cerr << "Error" << ": Failed initialization of ZLIB write stream. "
           << "Error is '" << zError(result) << "'.\n";

      HandleCleanup();
    }
  }

  //
  // No ZLIB stream to flush yet.
  //

  flush_ = 0;

  #ifdef REFERENCES
  *logofs << "ProxyTransport: Created new object at " 
          << this << " out of " << ++references_ 
          << " allocated references.\n" << logofs_flush;
  #endif
}

ProxyTransport::~ProxyTransport()
{
  #ifdef TEST
  *logofs << "ProxyTransport: Going to destroy derived class "
          << "for FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  //
  // Deallocate the ZLIB stream state.
  //

  inflateEnd(&r_stream_);

  if (control -> LocalStreamCompression)
  {
    deflateEnd(&w_stream_);
  }

  #ifdef REFERENCES
  *logofs << "ProxyTransport: Deleted object at " 
          << this << " out of " << --references_ 
          << " allocated references.\n" << logofs_flush;
  #endif
}

//
// Read data from its file descriptor.
//

int ProxyTransport::read(unsigned char *data, unsigned int size)
{
  //
  // If the remote peer is not compressing
  // the stream then just return any byte
  // read from the socket.
  //

  if (control -> RemoteStreamCompression == 0)
  {
    int result = Transport::read(data, size);

    if (result <= 0)
    {
      return result;
    }

    statistics -> addBytesIn(result);

    return result;
  }

  //
  // Return any pending data first.
  //

  if (r_buffer_.length_ > 0)
  {
    //
    // If the size of the buffer doesn't
    // match the amount of data pending,
    // force the caller to retry.
    //

    if ((int) size < r_buffer_.length_)
    {
      #ifdef TEST
      *logofs << "ProxyTransport: WARNING! Forcing a retry with "
              << r_buffer_.length_ << " bytes pending and "
              << size << " in the buffer.\n"
              << logofs_flush;
      #endif

      ESET(EAGAIN);

      return -1;
    }

    int copied = (r_buffer_.length_ > ((int) size) ?
                      ((int) size) : r_buffer_.length_);

    memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied);

    //
    // Update the buffer status.
    //

    #ifdef DEBUG
    *logofs << "ProxyTransport: Going to immediately return " << copied
            << " bytes from proxy FD#" << fd_ << ".\n"
            << logofs_flush;
    #endif

    r_buffer_.length_ -= copied;

    if (r_buffer_.length_ == 0)
    {
      r_buffer_.start_ = 0;
    }
    else
    {
      r_buffer_.start_ += copied;

      #ifdef TEST
      *logofs << "ProxyTransport: There are still " << r_buffer_.length_ 
              << " bytes in read buffer for proxy " << "FD#" 
              << fd_ << ".\n" << logofs_flush;
        #endif
    }

    return copied;
  }

  //
  // Read data in the user buffer.
  //

  int result = Transport::read(data, size);

  if (result <= 0)
  {
    return result;
  }

  statistics -> addBytesIn(result);

  //
  // Decompress the data into the read
  // buffer.
  //

  #ifdef DEBUG
  *logofs << "ProxyTransport: Going to decompress data for " 
          << "proxy FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  int saveTotalIn  = r_stream_.total_in;
  int saveTotalOut = r_stream_.total_out;

  int oldTotalIn  = saveTotalIn;
  int oldTotalOut = saveTotalOut;

  int diffTotalIn;
  int diffTotalOut;

  #ifdef INSPECT
  *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn
          << ".\n" << logofs_flush;
  #endif

  #ifdef INSPECT
  *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut
          << ".\n" << logofs_flush;
  #endif

  r_stream_.next_in  = (Bytef *) data;
  r_stream_.avail_in = result;

  //
  // Let ZLIB use all the space already
  // available in the buffer.
  //

  unsigned int newAvailOut = r_buffer_.data_.size() - r_buffer_.start_ -
                                 r_buffer_.length_;

  #ifdef TEST
  *logofs << "ProxyTransport: Initial decompress buffer is "
          << newAvailOut << " bytes.\n" << logofs_flush;
  #endif

  for (;;)
  {
    #ifdef INSPECT
    *logofs << "\nProxyTransport: Running the decompress loop.\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_buffer_.length_ = " << r_buffer_.length_
            << ".\n" << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_buffer_.data_.size() = " << r_buffer_.data_.size()
            << ".\n" << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: newAvailOut = " << newAvailOut
            << ".\n" << logofs_flush;
    #endif

    if (resize(r_buffer_, newAvailOut) < 0)
    {
      return -1;
    }

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_buffer_.data_.size() = "
            << r_buffer_.data_.size() << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_stream_.next_in = "
            << (void *) r_stream_.next_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_stream_.avail_in = "
            << r_stream_.avail_in << ".\n"
            << logofs_flush;
    #endif

    r_stream_.next_out  = r_buffer_.data_.begin() + r_buffer_.start_ +
                              r_buffer_.length_;

    r_stream_.avail_out = newAvailOut;

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_stream_.next_out = "
            << (void *) r_stream_.next_out << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_stream_.avail_out = "
            << r_stream_.avail_out << ".\n"
            << logofs_flush;
    #endif

    int result = inflate(&r_stream_, Z_SYNC_FLUSH);

    #ifdef INSPECT
    *logofs << "ProxyTransport: Called inflate() result is "
            << result << ".\n" << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_stream_.avail_in = "
            << r_stream_.avail_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_stream_.avail_out = "
            << r_stream_.avail_out << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_stream_.total_in = "
            << r_stream_.total_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_stream_.total_out = "
            << r_stream_.total_out << ".\n"
            << logofs_flush;
    #endif

    diffTotalIn  = r_stream_.total_in  - oldTotalIn;
    diffTotalOut = r_stream_.total_out - oldTotalOut;

    #ifdef INSPECT
    *logofs << "ProxyTransport: diffTotalIn = "
            << diffTotalIn << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: diffTotalOut = "
            << diffTotalOut << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_buffer_.length_ = "
            << r_buffer_.length_ << ".\n"
            << logofs_flush;
    #endif

    r_buffer_.length_ += diffTotalOut;

    #ifdef INSPECT
    *logofs << "ProxyTransport: r_buffer_.length_ = "
            << r_buffer_.length_ << ".\n"
            << logofs_flush;
    #endif

    oldTotalIn  = r_stream_.total_in;
    oldTotalOut = r_stream_.total_out;

    if (result == Z_OK)
    {
      if (r_stream_.avail_in == 0 && r_stream_.avail_out > 0)
      {
        break;
      }
    }
    else if (result == Z_BUF_ERROR && r_stream_.avail_out > 0 &&
                 r_stream_.avail_in == 0)
    {
      #ifdef TEST
      *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR decompressing data.\n"
              << logofs_flush;
      #endif

      break;
    }
    else
    {
      #ifdef PANIC
      *logofs << "ProxyTransport: PANIC! Decompression of data failed. "
              << "Error is '" << zError(result) << "'.\n"
              << logofs_flush;
      #endif

      cerr << "Error" << ": Decompression of data failed. Error is '"
           << zError(result) << "'.\n";

      finish();

      return -1;
    }

    //
    // Add more bytes to the buffer.
    //

    if (newAvailOut < thresholdSize_)
    {
      newAvailOut = thresholdSize_;
    }

    #ifdef TEST
    *logofs << "ProxyTransport: Need to add " << newAvailOut
            << " bytes to the decompress buffer in read.\n"
            << logofs_flush;
    #endif
  }

  diffTotalIn  = r_stream_.total_in  - saveTotalIn;
  diffTotalOut = r_stream_.total_out - saveTotalOut;

  #ifdef DEBUG
  *logofs << "ProxyTransport: Decompressed data from "
          << diffTotalIn << " to " << diffTotalOut
          << " bytes.\n" << logofs_flush;
  #endif

  statistics -> addDecompressedBytes(diffTotalIn, diffTotalOut);

  //
  // Check if the size of the buffer
  // matches the produced data.
  //

  if ((int) size < r_buffer_.length_)
  {
    #ifdef TEST
    *logofs << "ProxyTransport: WARNING! Forcing a retry with "
            << r_buffer_.length_ << " bytes pending and "
            << size << " in the buffer.\n"
            << logofs_flush;
    #endif

    ESET(EAGAIN);

    return -1;
  }

  //
  // Copy the decompressed data to the
  // provided buffer.
  //

  int copied = (r_buffer_.length_ > ((int) size) ?
                    ((int) size) : r_buffer_.length_);

  #ifdef DEBUG
  *logofs << "ProxyTransport: Going to return " << copied
          << " bytes from proxy FD#" << fd_ << ".\n"
          << logofs_flush;
  #endif

  memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied);

  //
  // Update the buffer status.
  //

  r_buffer_.length_ -= copied;

  if (r_buffer_.length_ == 0)
  {
    r_buffer_.start_ = 0;
  }
  else
  {
    r_buffer_.start_ += copied;

    #ifdef TEST
    *logofs << "ProxyTransport: There are still " << r_buffer_.length_ 
            << " bytes in read buffer for proxy FD#" << fd_
            << ".\n" << logofs_flush;
    #endif
  }

  return copied;
}

//
// If required compress data, else write it to socket.
//

int ProxyTransport::write(T_write type, const unsigned char *data, const unsigned int size)
{
  #ifdef TEST
  if (size == 0)
  {
    *logofs << "ProxyTransport: WARNING! Write called for FD#" 
            << fd_ << " without any data to write.\n"
            << logofs_flush;

    return 0;
  }
  #endif

  //
  // If there is no compression revert to
  // plain socket management.
  //

  if (control -> LocalStreamCompression == 0)
  {
    int result = Transport::write(type, data, size);

    if (result <= 0)
    {
      return result;
    }

    statistics -> addBytesOut(result);

    statistics -> updateBitrate(result);

    FlushCallback(result);

    return result;
  }

  #ifdef DEBUG
  *logofs << "ProxyTransport: Going to compress " << size 
          << " bytes to write buffer for proxy FD#" << fd_
          << ".\n" << logofs_flush;
  #endif

  //
  // Compress data into the write buffer.
  //

  int saveTotalIn  = w_stream_.total_in;
  int saveTotalOut = w_stream_.total_out;

  int oldTotalIn  = saveTotalIn;
  int oldTotalOut = saveTotalOut;

  int diffTotalIn;
  int diffTotalOut;

  #ifdef INSPECT
  *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn
          << ".\n" << logofs_flush;
  #endif

  #ifdef INSPECT
  *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut
          << ".\n" << logofs_flush;
  #endif

  w_stream_.next_in  = (Bytef *) data;
  w_stream_.avail_in = size;

  //
  // Let ZLIB use all the space already
  // available in the buffer.
  //

  unsigned int newAvailOut = w_buffer_.data_.size() - w_buffer_.start_ -
                                 w_buffer_.length_;

  #ifdef TEST
  *logofs << "ProxyTransport: Initial compress buffer is "
          << newAvailOut << " bytes.\n" << logofs_flush;
  #endif

  for (;;)
  {
    #ifdef INSPECT
    *logofs << "\nProxyTransport: Running the compress loop.\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.length_ = "
            << w_buffer_.length_ << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.data_.size() = "
            << w_buffer_.data_.size() << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: newAvailOut = "
            << newAvailOut << ".\n"
            << logofs_flush;
    #endif

    if (resize(w_buffer_, newAvailOut) < 0)
    {
      return -1;
    }

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.data_.size() = "
            << w_buffer_.data_.size() << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.next_in = "
            << (void *) w_stream_.next_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.avail_in = "
            << w_stream_.avail_in << ".\n"
            << logofs_flush;
    #endif

    w_stream_.next_out  = w_buffer_.data_.begin() + w_buffer_.start_ +
                              w_buffer_.length_;

    w_stream_.avail_out = newAvailOut;

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.next_out = "
            << (void *) w_stream_.next_out << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.avail_out = "
            << w_stream_.avail_out << ".\n"
            << logofs_flush;
    #endif

    int result = deflate(&w_stream_, (type == write_delayed ?
                             Z_NO_FLUSH : Z_SYNC_FLUSH));

    #ifdef INSPECT
    *logofs << "ProxyTransport: Called deflate() result is "
            << result << ".\n" << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.avail_in = "
            << w_stream_.avail_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.avail_out = "
            << w_stream_.avail_out << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.total_in = "
            << w_stream_.total_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.total_out = "
            << w_stream_.total_out << ".\n"
            << logofs_flush;
    #endif

    diffTotalOut = w_stream_.total_out - oldTotalOut;
    diffTotalIn  = w_stream_.total_in  - oldTotalIn;

    #ifdef INSPECT
    *logofs << "ProxyTransport: diffTotalIn = "
            << diffTotalIn << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: diffTotalOut = "
            << diffTotalOut << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.length_ = "
            << w_buffer_.length_ << ".\n"
            << logofs_flush;
    #endif

    w_buffer_.length_ += diffTotalOut;

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.length_ = "
            << w_buffer_.length_ << ".\n"
            << logofs_flush;
    #endif

    oldTotalOut = w_stream_.total_out;
    oldTotalIn  = w_stream_.total_in;

    if (result == Z_OK)
    {
      if (w_stream_.avail_in == 0 && w_stream_.avail_out > 0)
      {
        break;
      }
    }
    else if (result == Z_BUF_ERROR && w_stream_.avail_out > 0 &&
                 w_stream_.avail_in == 0)
    {
      #ifdef TEST
      *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR compressing data.\n"
              << logofs_flush;
      #endif

      break;
    }
    else
    {
      #ifdef PANIC
      *logofs << "ProxyTransport: PANIC! Compression of data failed. "
              << "Error is '" << zError(result) << "'.\n"
              << logofs_flush;
      #endif

      cerr << "Error" << ": Compression of data failed. Error is '"
           << zError(result) << "'.\n";

      finish();

      return -1;
    }

    //
    // Add more bytes to the buffer.
    //

    if (newAvailOut < thresholdSize_)
    {
      newAvailOut = thresholdSize_;
    }

    #ifdef TEST
    *logofs << "ProxyTransport: Need to add " << newAvailOut
            << " bytes to the compress buffer in write.\n"
            << logofs_flush;
    #endif
  }

  diffTotalIn  = w_stream_.total_in  - saveTotalIn;
  diffTotalOut = w_stream_.total_out - saveTotalOut;

  #ifdef TEST

  *logofs << "ProxyTransport: Compressed data from "
          << diffTotalIn << " to " << diffTotalOut
          << " bytes.\n" << logofs_flush;

  if (diffTotalIn != (int) size)
  {
    #ifdef PANIC
    *logofs << "ProxyTransport: PANIC! Bytes provided to ZLIB stream "
            << "should be " << size << " but they look to be "
            << diffTotalIn << ".\n" << logofs_flush;
    #endif
  }

  #endif

  //
  // Find out what we have to do with the
  // produced data.
  //

  if (type == write_immediate)
  {
    //
    // If user requested an immediate write we
    // flushed the ZLIB buffer. We can now reset
    // the counter and write data to socket.
    //

    flush_ = 0;

    #ifdef TEST
    *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_
            << " has data for " << w_buffer_.length_ << " bytes.\n"
            << logofs_flush;

    *logofs << "ProxyTransport: Start is " << w_buffer_.start_ 
            << " length is " << w_buffer_.length_ << " flush is "
            << flush_ << " size is " << w_buffer_.data_.size()
            << " capacity is " << w_buffer_.data_.capacity()
            << ".\n" << logofs_flush;
    #endif

    //
    // Alternatively may try to write only if
    // the socket is not blocked.
    //
    // if (w_buffer_.length_ > 0 && blocked_ == 0)
    // {
    //   ...
    // }
    //

    if (w_buffer_.length_ > 0)

    {
      #ifdef TEST
      *logofs << "ProxyTransport: Writing " << w_buffer_.length_
              << " bytes of produced data to FD#" << fd_ << ".\n"
              << logofs_flush;
      #endif

      int result = Transport::flush();

      if (result < 0)
      {
        return -1;
      }
    }
  }
  else
  {
    //
    // We haven't flushed the ZLIB compression
    // buffer, so user will have to call proxy
    // transport's flush explicitly.
    //

    flush_ += diffTotalIn;
  }

  //
  // We either wrote the data or added it to the
  // write buffer. It's convenient to update the
  // counters at this stage to get the current
  // bitrate earlier.
  //

  statistics -> addCompressedBytes(diffTotalIn, diffTotalOut);

  statistics -> addBytesOut(diffTotalOut);

  statistics -> updateBitrate(diffTotalOut);

  FlushCallback(diffTotalOut);

  #ifdef TEST
  *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_
          << " has data for " << w_buffer_.length_ << " bytes.\n"
          << logofs_flush;

  *logofs << "ProxyTransport: Start is " << w_buffer_.start_ 
          << " length is " << w_buffer_.length_ << " flush is "
          << flush_ << " size is " << w_buffer_.data_.size()
          << " capacity is " << w_buffer_.data_.capacity()
          << ".\n" << logofs_flush;
  #endif

  return size;
}

//
// Write data to its file descriptor.
//

int ProxyTransport::flush()
{
  //
  // If there is no compression or we already compressed
  // outgoing data and just need to write it to socket
  // because of previous incomplete writes then revert
  // to plain socket management.
  //

  if (flush_ == 0 || control -> LocalStreamCompression == 0)
  {
    int result = Transport::flush();

    if (result < 0)
    {
      return -1;
    }

    return result;
  }

  #ifdef DEBUG
  *logofs << "ProxyTransport: Going to flush compression on "
          << "proxy FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  #ifdef TEST
  *logofs << "ProxyTransport: Flush counter for proxy FD#" << fd_
          << " is " << flush_ << " bytes.\n" << logofs_flush;
  #endif

  //
  // Flush ZLIB stream into the write buffer.
  //

  int saveTotalIn  = w_stream_.total_in;
  int saveTotalOut = w_stream_.total_out;

  int oldTotalIn  = saveTotalIn;
  int oldTotalOut = saveTotalOut;

  int diffTotalOut;
  int diffTotalIn;

  #ifdef INSPECT
  *logofs << "ProxyTransport: oldTotalIn = " << oldTotalIn
          << ".\n" << logofs_flush;
  #endif

  #ifdef INSPECT
  *logofs << "ProxyTransport: oldTotalOut = " << oldTotalOut
          << ".\n" << logofs_flush;
  #endif

  w_stream_.next_in  = w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_;
  w_stream_.avail_in = 0;

  //
  // Let ZLIB use all the space already
  // available in the buffer.
  //

  unsigned int newAvailOut = w_buffer_.data_.size() - w_buffer_.start_ -
                                 w_buffer_.length_;

  #ifdef DEBUG
  *logofs << "ProxyTransport: Initial flush buffer is "
          << newAvailOut << " bytes.\n" << logofs_flush;
  #endif

  for (;;)
  {
    #ifdef INSPECT
    *logofs << "\nProxyTransport: Running the flush loop.\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.length_ = "
            << w_buffer_.length_ << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.data_.size() = "
            << w_buffer_.data_.size() << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: newAvailOut = "
            << newAvailOut << ".\n"
            << logofs_flush;
    #endif

    if (resize(w_buffer_, newAvailOut) < 0)
    {
      return -1;
    }

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.data_.size() = "
            << w_buffer_.data_.size() << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.next_in = "
            << (void *) w_stream_.next_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.avail_in = "
            << w_stream_.avail_in << ".\n"
            << logofs_flush;
    #endif

    w_stream_.next_out  = w_buffer_.data_.begin() + w_buffer_.start_ +
                              w_buffer_.length_;

    w_stream_.avail_out = newAvailOut;

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.next_out = "
            << (void *) w_stream_.next_out << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.avail_out = "
            << w_stream_.avail_out << ".\n"
            << logofs_flush;
    #endif

    int result = deflate(&w_stream_, Z_SYNC_FLUSH);

    #ifdef INSPECT
    *logofs << "ProxyTransport: Called deflate() result is "
            << result << ".\n" << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.avail_in = "
            << w_stream_.avail_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.avail_out = "
            << w_stream_.avail_out << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.total_in = "
            << w_stream_.total_in << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_stream_.total_out = "
            << w_stream_.total_out << ".\n"
            << logofs_flush;
    #endif

    diffTotalOut = w_stream_.total_out - oldTotalOut;
    diffTotalIn  = w_stream_.total_in  - oldTotalIn;

    #ifdef INSPECT
    *logofs << "ProxyTransport: diffTotalIn = "
            << diffTotalIn << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: diffTotalOut = "
            << diffTotalOut << ".\n"
            << logofs_flush;
    #endif

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.length_ = "
            << w_buffer_.length_ << ".\n"
            << logofs_flush;
    #endif

    w_buffer_.length_ += diffTotalOut;

    #ifdef INSPECT
    *logofs << "ProxyTransport: w_buffer_.length_ = "
            << w_buffer_.length_ << ".\n"
            << logofs_flush;
    #endif

    oldTotalOut = w_stream_.total_out;
    oldTotalIn  = w_stream_.total_in;

    if (result == Z_OK)
    {
      if (w_stream_.avail_in == 0 && w_stream_.avail_out > 0)
      {
        break;
      }
    }
    else if (result == Z_BUF_ERROR && w_stream_.avail_out > 0 &&
                 w_stream_.avail_in == 0)
    {
      #ifdef TEST
      *logofs << "ProxyTransport: WARNING! Raised Z_BUF_ERROR flushing data.\n"
              << logofs_flush;
      #endif

      break;
    }
    else
    {
      #ifdef PANIC
      *logofs << "ProxyTransport: PANIC! Flush of compressed data failed. "
              << "Error is '" << zError(result) << "'.\n"
              << logofs_flush;
      #endif

      cerr << "Error" << ": Flush of compressed data failed. Error is '"
           << zError(result) << "'.\n";

      finish();

      return -1;
    }

    //
    // Add more bytes to the buffer.
    //

    if (newAvailOut < thresholdSize_)
    {
      newAvailOut = thresholdSize_;
    }

    #ifdef TEST
    *logofs << "ProxyTransport: Need to add " << newAvailOut
            << " bytes to the compress buffer in flush.\n"
            << logofs_flush;
    #endif
  }

  diffTotalIn  = w_stream_.total_in  - saveTotalIn;
  diffTotalOut = w_stream_.total_out - saveTotalOut;

  #ifdef TEST
  *logofs << "ProxyTransport: Compressed flush data from "
          << diffTotalIn << " to " << diffTotalOut
          << " bytes.\n" << logofs_flush;
  #endif

  //
  // Time to move data from the write
  // buffer to the real link. 
  //

  #ifdef DEBUG
  *logofs << "ProxyTransport: Reset flush counter for proxy FD#"
          << fd_ << ".\n" << logofs_flush;
  #endif

  flush_ = 0;

  #ifdef TEST
  *logofs << "ProxyTransport: Write buffer for proxy FD#" << fd_
          << " has data for " << w_buffer_.length_ << " bytes.\n"
          << logofs_flush;

  *logofs << "ProxyTransport: Start is " << w_buffer_.start_ 
          << " length is " << w_buffer_.length_ << " flush is "
          << flush_ << " size is " << w_buffer_.data_.size()
          << " capacity is " << w_buffer_.data_.capacity()
          << ".\n" << logofs_flush;
  #endif

  int result = Transport::flush();

  if (result < 0)
  {
    return -1;
  }

  //
  // Update all the counters.
  //

  statistics -> addCompressedBytes(diffTotalIn, diffTotalOut);

  statistics -> addBytesOut(diffTotalOut);

  statistics -> updateBitrate(diffTotalOut);

  FlushCallback(diffTotalOut);

  return result;
}

unsigned int ProxyTransport::getPending(unsigned char *&data)
{
  //
  // Return a pointer to the data in the
  // read buffer. It is up to the caller
  // to ensure that the data is consumed
  // before the read buffer is reused.
  //

  if (r_buffer_.length_ > 0)
  {
    unsigned int size = r_buffer_.length_;

    data = r_buffer_.data_.begin() + r_buffer_.start_;

    #ifdef DEBUG
    *logofs << "ProxyTransport: Returning " << size
            << " pending bytes from proxy FD#" << fd_
            << ".\n" << logofs_flush;
    #endif

    r_buffer_.length_ = 0;
    r_buffer_.start_  = 0;

    //
    // Prevent the deletion of the buffer.
    //

    owner_ = 0;

    return size;
  }

  #ifdef TEST
  *logofs << "ProxyTransport: WARNING! No pending data "
          << "for proxy FD#" << fd_ << ".\n"
          << logofs_flush;
  #endif

  data = NULL;

  return 0;
}

void ProxyTransport::fullReset()
{
  blocked_ = 0;
  finish_  = 0;
  flush_   = 0;

  if (control -> RemoteStreamCompression)
  {
    inflateReset(&r_stream_);
  }

  if (control -> LocalStreamCompression)
  {
    deflateReset(&w_stream_);
  }

  if (owner_ == 1)
  {
    Transport::fullReset(r_buffer_);
  }

  Transport::fullReset(w_buffer_);
}

AgentTransport::AgentTransport(int fd) : Transport(fd)
{
  #ifdef TEST
  *logofs << "AgentTransport: Going to create agent transport "
          << "for FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  type_ = transport_agent;

  //
  // Set up the read buffer.
  //

  r_buffer_.length_ = 0;
  r_buffer_.start_  = 0;

  r_buffer_.data_.resize(initialSize_);

  //
  // For now we own the buffer.
  //

  owner_ = 1;

  //
  // Set up the mutexes.
  //

  #ifdef THREADS

  pthread_mutexattr_t m_attributes;

  pthread_mutexattr_init(&m_attributes);

  //
  // Interfaces in pthread to handle mutex
  // type do not work in current version. 
  //

  m_attributes.__mutexkind = PTHREAD_MUTEX_ERRORCHECK_NP;

  if (pthread_mutex_init(&m_read_, &m_attributes) != 0)
  {
    #ifdef TEST
    *logofs << "AgentTransport: Child: Creation of read mutex failed. "
            << "Error is " << EGET() << " '" << ESTR()
            << "'.\n" << logofs_flush;
    #endif
  }

  if (pthread_mutex_init(&m_write_, &m_attributes) != 0)
  {
    #ifdef TEST
    *logofs << "AgentTransport: Child: Creation of write mutex failed. "
            << "Error is " << EGET() << " '" << ESTR()
            << "'.\n" << logofs_flush;
    #endif
  }

  #endif

  #ifdef REFERENCES
  *logofs << "AgentTransport: Child: Created new object at " 
          << this << " out of " << ++references_ 
          << " allocated references.\n" << logofs_flush;
  #endif
}

AgentTransport::~AgentTransport()
{
  #ifdef TEST
  *logofs << "AgentTransport: Going to destroy derived class "
          << "for FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  //
  // Unlock and free all mutexes.
  //

  #ifdef THREADS

  pthread_mutex_unlock(&m_read_);
  pthread_mutex_unlock(&m_write_);

  pthread_mutex_destroy(&m_read_);
  pthread_mutex_destroy(&m_write_);

  #endif

  #ifdef REFERENCES
  *logofs << "AgentTransport: Child: Deleted object at " 
          << this << " out of " << --references_ 
          << " allocated references.\n" << logofs_flush;
  #endif
}

//
// Read data enqueued by the other thread.
//

int AgentTransport::read(unsigned char *data, unsigned int size)
{
  #ifdef THREADS

  lockRead();

  #endif

  #ifdef DEBUG
  *logofs << "AgentTransport: Child: Going to read " << size
          << " bytes from " << "FD#" << fd_ << ".\n"
          << logofs_flush;
  #endif

  int copied = -1;

  if (r_buffer_.length_ > 0)
  {
    if ((int) size < r_buffer_.length_)
    {
      #ifdef TEST
      *logofs << "AgentTransport: WARNING! Forcing a retry with "
              << r_buffer_.length_ << " bytes pending and "
              << size << " in the buffer.\n"
              << logofs_flush;
      #endif

      ESET(EAGAIN);
    }
    else
    {
      copied = (r_buffer_.length_ > ((int) size) ?
                    ((int) size) : r_buffer_.length_);

      memcpy(data, r_buffer_.data_.begin() + r_buffer_.start_, copied);

      //
      // Update the buffer status.
      //

      #ifdef TEST
      *logofs << "AgentTransport: Child: Going to immediately return "
              << copied << " bytes from FD#" << fd_ << ".\n"
              << logofs_flush;
      #endif

      #ifdef DUMP

      *logofs << "AgentTransport: Child: Dumping content of read data.\n"
              << logofs_flush;

      DumpData(data, copied);

      #endif

      r_buffer_.length_ -= copied;

      if (r_buffer_.length_ == 0)
      {
        r_buffer_.start_ = 0;
      }
      else
      {
        r_buffer_.start_ += copied;

        #ifdef TEST
        *logofs << "AgentTransport: Child: There are still "
                << r_buffer_.length_  << " bytes in read buffer for "
                << "FD#" << fd_ << ".\n" << logofs_flush;
        #endif
      }
    }
  }
  else
  {
    #ifdef DEBUG
    *logofs << "AgentTransport: Child: No data can be got "
            << "from read buffer for FD#" << fd_ << ".\n"
            << logofs_flush;
    #endif

    ESET(EAGAIN);
  }

  #ifdef THREADS

  unlockRead();

  #endif

  return copied;
}

//
// Write data to buffer so that the other
// thread can get it.
//

int AgentTransport::write(T_write type, const unsigned char *data, const unsigned int size)
{
  #ifdef THREADS

  lockWrite();

  #endif

  //
  // Just append data to socket's write buffer.
  // Note that we don't care if buffer exceeds
  // the size limits set for this type of
  // transport.
  //

  #ifdef TEST
  *logofs << "AgentTransport: Child: Going to append " << size
          << " bytes to write buffer for " << "FD#" << fd_
          << ".\n" << logofs_flush;
  #endif

  int copied = -1;

  if (resize(w_buffer_, size) < 0)
  {
    finish();

    ESET(EPIPE);
  }
  else
  {
    memmove(w_buffer_.data_.begin() + w_buffer_.start_ + w_buffer_.length_, data, size);

    w_buffer_.length_ += size;

    #ifdef DUMP

    *logofs << "AgentTransport: Child: Dumping content of written data.\n"
            << logofs_flush;

    DumpData(data, size);

    #endif

    #ifdef TEST
    *logofs << "AgentTransport: Child: Write buffer for FD#" << fd_
            << " has data for " << w_buffer_.length_ << " bytes.\n"
            << logofs_flush;

    *logofs << "AgentTransport: Child: Start is " << w_buffer_.start_ 
            << " length is " << w_buffer_.length_ << " size is "
            << w_buffer_.data_.size() << " capacity is "
            << w_buffer_.data_.capacity() << ".\n"
            << logofs_flush;
    #endif

    copied = size;
  }

  //
  // Let child access again the read buffer.
  //

  #ifdef THREADS

  unlockWrite();

  #endif

  return copied;
}

int AgentTransport::flush()
{
  //
  // In case of memory-to-memory transport
  // this function should never be called.
  //

  #ifdef PANIC
  *logofs << "AgentTransport: Child: PANIC! Called flush() for "
          << "memory to memory transport on " << "FD#"
          << fd_ << ".\n" << logofs_flush;
  #endif

  cerr << "Error" << ": Called flush() for "
       << "memory to memory transport on " << "FD#"
       << fd_ << ".\n";

  HandleAbort();
}

int AgentTransport::drain(int limit, int timeout)
{
  //
  // We can't drain the channel in the case
  // of the memory-to-memory transport. Data
  // is enqueued for the agent to read but
  // the agent could require multiple loops
  // to read it all.
  //

  //
  // In case of memory-to-memory transport
  // this function should never be called.
  //

  #ifdef PANIC
  *logofs << "AgentTransport: Child: PANIC! Called drain() for "
          << "memory to memory transport on " << "FD#"
          << fd_ << ".\n" << logofs_flush;
  #endif

  cerr << "Error" << ": Called drain() for "
       << "memory to memory transport on " << "FD#"
       << fd_ << ".\n";

  HandleAbort();
}

unsigned int AgentTransport::getPending(unsigned char *&data)
{
  #ifdef THREADS

  lockRead();

  #endif

  if (r_buffer_.length_ > 0)
  {
    unsigned int size = r_buffer_.length_;

    data = r_buffer_.data_.begin() + r_buffer_.start_;

    #ifdef DEBUG
    *logofs << "AgentTransport: Child: Returning " << size
            << " pending bytes from FD#" << fd_
            << ".\n" << logofs_flush;
    #endif

    r_buffer_.length_ = 0;
    r_buffer_.start_  = 0;

    #ifdef THREADS

    unlockRead();

    #endif

    //
    // Prevent the deletion of the buffer.
    //

    owner_ = 0;

    return size;
  }

  #ifdef TEST
  *logofs << "AgentTransport: WARNING! No pending data "
          << "for FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  #ifdef THREADS

  unlockRead();

  #endif

  data = NULL;

  return 0;
}

void AgentTransport::fullReset()
{
  #ifdef THREADS

  lockRead();
  lockWrite();

  #endif

  #ifdef TEST
  *logofs << "AgentTransport: Child: Resetting transport "
          << "for FD#" << fd_ << ".\n" << logofs_flush;
  #endif

  blocked_ = 0;
  finish_  = 0;

  if (owner_ == 1)
  {
    Transport::fullReset(r_buffer_);
  }

  Transport::fullReset(w_buffer_);
}

int AgentTransport::enqueue(const char *data, const int size)
{
  #ifdef THREADS

  lockRead();

  #endif

  if (finish_ == 1)
  {
    #if defined(PARENT) && defined(TEST)
    *logofs << "AgentTransport: Parent: Returning EPIPE in "
            << "write for finishing FD#" << fd_ << ".\n"
           << logofs_flush;
    #endif

    ESET(EPIPE);

    return -1;
  }

  //
  // Always allow the agent to write
  // all its data.
  //

  int toPut = size;

  #if defined(PARENT) && defined(TEST)
  *logofs << "AgentTransport: Parent: Going to put " << toPut
          << " bytes into read buffer for FD#" << fd_
          << ". Buffer length is " << r_buffer_.length_
          << ".\n" << logofs_flush;
  #endif

  if (resize(r_buffer_, toPut) < 0)
  {
    finish();

    #ifdef THREADS

    unlockRead();

    #endif

    return -1;
  }

  memcpy(r_buffer_.data_.begin() + r_buffer_.start_ + r_buffer_.length_, data, toPut);

  r_buffer_.length_ += toPut;

  #if defined(DUMP) && defined(PARENT)

  *logofs << "AgentTransport: Parent: Dumping content of enqueued data.\n"
          << logofs_flush;

  DumpData((const unsigned char *) data, toPut);

  #endif

  #if defined(PARENT) && defined(TEST)
  *logofs << "AgentTransport: Parent: Read buffer for FD#" << fd_
          << " has now data for " << r_buffer_.length_
          << " bytes.\n" << logofs_flush;

  *logofs << "AgentTransport: Parent: Start is " << r_buffer_.start_ 
          << " length is " << r_buffer_.length_ << " size is "
          << r_buffer_.data_.size() << " capacity is "
          << r_buffer_.data_.capacity() << ".\n"
          << logofs_flush;
  #endif

  #ifdef THREADS

  unlockRead();

  #endif

  return toPut;
}

int AgentTransport::dequeue(char *data, int size)
{
  #ifdef THREADS

  lockWrite();

  #endif

  if (w_buffer_.length_ == 0)
  {
    if (finish_ == 1)
    {
      #if defined(PARENT) && defined(TEST)
      *logofs << "AgentTransport: Parent: Returning 0 in read "
              << "for finishing FD#" << fd_ << ".\n"
              << logofs_flush;
      #endif

      return 0;
    }

    #if defined(PARENT) && defined(TEST)
    *logofs << "AgentTransport: Parent: No data can be read "
            << "from write buffer for FD#" << fd_ << ".\n"
            << logofs_flush;
    #endif

    ESET(EAGAIN);

    #ifdef THREADS

    unlockWrite();

    #endif

    return -1;
  }

  //
  // Return as many bytes as possible.
  //

  int toGet = ((int) size > w_buffer_.length_ ? w_buffer_.length_ : size);

  #if defined(PARENT) && defined(TEST)
  *logofs << "AgentTransport: Parent: Going to get " << toGet
          << " bytes from write buffer for FD#" << fd_ << ".\n"
          << logofs_flush;
  #endif

  memcpy(data, w_buffer_.data_.begin() + w_buffer_.start_, toGet);

  w_buffer_.start_  += toGet;
  w_buffer_.length_ -= toGet;

  #if defined(DUMP) && defined(PARENT)

  *logofs << "AgentTransport: Parent: Dumping content of dequeued data.\n"
          << logofs_flush;

  DumpData((const unsigned char *) data, toGet);

  #endif

  #if defined(PARENT) && defined(TEST)
  *logofs << "AgentTransport: Parent: Write buffer for FD#" << fd_
          << " has now data for " << length() << " bytes.\n"
          << logofs_flush;

  *logofs << "AgentTransport: Parent: Start is " << w_buffer_.start_ 
          << " length is " << w_buffer_.length_ << " size is "
          << w_buffer_.data_.size() << " capacity is "
          << w_buffer_.data_.capacity() << ".\n"
          << logofs_flush;
  #endif

  #ifdef THREADS

  unlockWrite();

  #endif

  return toGet;
}

int AgentTransport::dequeuable()
{
  if (finish_ == 1)
  {
    #if defined(PARENT) && defined(TEST)
    *logofs << "AgentTransport: Parent: Returning EPIPE in "
            << "readable for finishing FD#" << fd_
            << ".\n" << logofs_flush;
    #endif

    ESET(EPIPE);

    return -1;
  }

  #if defined(PARENT) && defined(TEST)
  *logofs << "AgentTransport: Parent: Returning "
          << w_buffer_.length_ << " as data readable "
          << "from read buffer for FD#" << fd_ << ".\n"
          << logofs_flush;
  #endif

  return w_buffer_.length_;
}

#ifdef THREADS

int AgentTransport::lockRead()
{
  for (;;)
  {
    int result = pthread_mutex_lock(&m_read_);

    if (result == 0)
    {
      #ifdef DEBUG
      *logofs << "AgentTransport: Read mutex locked by thread id "
              << pthread_self() << ".\n" << logofs_flush;
      #endif

      return 0;
    }
    else if (EGET() == EINTR)
    {
      continue;
    }
    else
    {
      #ifdef WARNING
      *logofs << "AgentTransport: WARNING! Locking of read mutex by thread id "
              << pthread_self() << " returned " << result << ". Error is '"
              << ESTR() << "'.\n" << logofs_flush;
      #endif

      return result;
    }
  }
}

int AgentTransport::lockWrite()
{
  for (;;)
  {
    int result = pthread_mutex_lock(&m_write_);

    if (result == 0)
    {
      #ifdef DEBUG
      *logofs << "AgentTransport: Write mutex locked by thread id "
              << pthread_self() << ".\n" << logofs_flush;
      #endif

      return 0;
    }
    else if (EGET() == EINTR)
    {
      continue;
    }
    else
    {
      #ifdef WARNING
      *logofs << "AgentTransport: WARNING! Locking of write mutex by thread id "
              << pthread_self() << " returned " << result << ". Error is '"
              << ESTR() << "'.\n" << logofs_flush;
      #endif

      return result;
    }
  }
}

int AgentTransport::unlockRead()
{
  for (;;)
  {
    int result = pthread_mutex_unlock(&m_read_);

    if (result == 0)
    {
      #ifdef DEBUG
      *logofs << "AgentTransport: Read mutex unlocked by thread id "
              << pthread_self() << ".\n" << logofs_flush;
      #endif

      return 0;
    }
    else if (EGET() == EINTR)
    {
      continue;
    }
    else
    {
      #ifdef WARNING
      *logofs << "AgentTransport: WARNING! Unlocking of read mutex by thread id "
              << pthread_self() << " returned " << result << ". Error is '"
              << ESTR() << "'.\n" << logofs_flush;
      #endif

      return result;
    }
  }
}

int AgentTransport::unlockWrite()
{
  for (;;)
  {
    int result = pthread_mutex_unlock(&m_write_);

    if (result == 0)
    {
      #ifdef DEBUG
      *logofs << "AgentTransport: Write mutex unlocked by thread id "
              << pthread_self() << ".\n" << logofs_flush;
      #endif

      return 0;
    }
    else if (EGET() == EINTR)
    {
      continue;
    }
    else
    {
      #ifdef WARNING
      *logofs << "AgentTransport: WARNING! Unlocking of write mutex by thread id "
              << pthread_self() << " returned " << result << ". Error is '"
              << ESTR() << "'.\n" << logofs_flush;
      #endif

      return result;
    }
  }
}

#endif