diff options
Diffstat (limited to 'nxcomp/Proxy.cpp')
-rw-r--r-- | nxcomp/Proxy.cpp | 6450 |
1 files changed, 6450 insertions, 0 deletions
diff --git a/nxcomp/Proxy.cpp b/nxcomp/Proxy.cpp new file mode 100644 index 000000000..3b4df7eb6 --- /dev/null +++ b/nxcomp/Proxy.cpp @@ -0,0 +1,6450 @@ +/**************************************************************************/ +/* */ +/* Copyright (c) 2001, 2011 NoMachine, http://www.nomachine.com/. */ +/* */ +/* NXCOMP, NX protocol compression and NX extensions to this software */ +/* are copyright of NoMachine. Redistribution and use of the present */ +/* software is allowed according to terms specified in the file LICENSE */ +/* which comes in the source distribution. */ +/* */ +/* Check http://www.nomachine.com/licensing.html for applicability. */ +/* */ +/* NX and NoMachine are trademarks of Medialogic S.p.A. */ +/* */ +/* All rights reserved. */ +/* */ +/**************************************************************************/ + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "Misc.h" + +#if defined(__CYGWIN32__) || defined(__APPLE__) || defined(__FreeBSD__) || defined(__sun) +#include <netinet/in_systm.h> +#endif + +#ifndef __CYGWIN32__ +#include <sys/un.h> +#endif + +#include <netinet/in.h> +#include <netinet/ip.h> +#include <netinet/tcp.h> + +#if defined(__EMX__ ) || defined(__CYGWIN32__) + +struct sockaddr_un +{ + u_short sun_family; + char sun_path[108]; +}; + +#endif + +#include "NXalert.h" +#include "NXvars.h" + +#include "Proxy.h" + +#include "Socket.h" +#include "Channel.h" +#include "Statistics.h" + +#include "ClientChannel.h" +#include "ServerChannel.h" +#include "GenericChannel.h" + +// +// We need to adjust some values related +// to these messages at the time the mes- +// sage stores are reconfigured. +// + +#include "PutImage.h" +#include "ChangeGC.h" +#include "PolyFillRectangle.h" +#include "PutPackedImage.h" + +// +// This is from the main loop. +// + +extern void CleanupListeners(); + +// +// Default size of string buffers. +// + +#define DEFAULT_STRING_LENGTH 512 + +// +// Set the verbosity level. You also need +// to define DUMP in Misc.cpp if DUMP is +// defined here. +// + +#define WARNING +#define PANIC +#undef TEST +#undef DEBUG +#undef DUMP + +// +// Log the important tracepoints related +// to writing packets to the peer proxy. +// + +#undef FLUSH + +// +// Log the operations related to splits. +// + +#undef SPLIT + +// +// Log the operations related to sending +// and receiving the control tokens. +// + +#undef TOKEN + +// +// Log the operations related to setting +// the token limits. +// + +#undef LIMIT + +// +// Log a warning if no data is written by +// the proxy within a timeout. +// + +#undef TIME + +// +// Log the operation related to generating +// the ping message at idle time. +// + +#undef PING + +Proxy::Proxy(int fd) + + : transport_(new ProxyTransport(fd)), fd_(fd), readBuffer_(transport_) +{ + for (int channelId = 0; + channelId < CONNECTIONS_LIMIT; + channelId++) + { + channels_[channelId] = NULL; + transports_[channelId] = NULL; + congestions_[channelId] = 0; + + fdMap_[channelId] = nothing; + channelMap_[channelId] = nothing; + } + + inputChannel_ = nothing; + outputChannel_ = nothing; + + controlLength_ = 0; + + operation_ = operation_in_negotiation; + + draining_ = 0; + priority_ = 0; + finish_ = 0; + shutdown_ = 0; + congestion_ = 0; + + timer_ = 0; + alert_ = 0; + + agent_ = nothing; + + // + // Set null timeouts. This will require + // a new link configuration. + // + + timeouts_.split = 0; + timeouts_.motion = 0; + + timeouts_.readTs = getTimestamp(); + timeouts_.writeTs = getTimestamp(); + + timeouts_.loopTs = getTimestamp(); + timeouts_.pingTs = getTimestamp(); + timeouts_.alertTs = nullTimestamp(); + timeouts_.loadTs = nullTimestamp(); + + timeouts_.splitTs = nullTimestamp(); + timeouts_.motionTs = nullTimestamp(); + + // + // Initialize the token counters. This + // will require a new link configuration. + // + + for (int i = token_control; i <= token_data; i++) + { + tokens_[i].size = 0; + tokens_[i].limit = 0; + + tokens_[i].bytes = 0; + tokens_[i].remaining = 0; + } + + tokens_[token_control].request = code_control_token_request; + tokens_[token_control].reply = code_control_token_reply; + tokens_[token_control].type = token_control; + + tokens_[token_split].request = code_split_token_request; + tokens_[token_split].reply = code_split_token_reply; + tokens_[token_split].type = token_split; + + tokens_[token_data].request = code_data_token_request; + tokens_[token_data].reply = code_data_token_reply; + tokens_[token_data].type = token_data; + + currentStatistics_ = NULL; + + // + // Create compressor and decompressor + // for image and data payload. + // + + compressor_ = new StaticCompressor(control -> LocalDataCompressionLevel, + control -> LocalDataCompressionThreshold); + + // + // Create object storing NX specific + // opcodes. + // + + opcodeStore_ = new OpcodeStore(); + + // + // Create the message stores. + // + + clientStore_ = new ClientStore(compressor_); + serverStore_ = new ServerStore(compressor_); + + // + // Older proxies will refuse to store + // messages bigger than 262144 bytes. + // + + if (control -> isProtoStep7() == 0) + { + #ifdef TEST + *logofs << "Proxy: WARNING! Limiting the maximum " + << "message size to " << 262144 << ".\n" + << logofs_flush; + #endif + + control -> MaximumMessageSize = 262144; + } + + clientCache_ = new ClientCache(); + serverCache_ = new ServerCache(); + + if (clientCache_ == NULL || serverCache_ == NULL) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Failed to create the channel cache.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Failed to create the channel cache.\n"; + + HandleCleanup(); + } + + // + // Prepare for image decompression. + // + + UnpackInit(); + + #ifdef DEBUG + *logofs << "Proxy: Created new object at " << this + << ".\n" << logofs_flush; + #endif +} + +Proxy::~Proxy() +{ + for (int channelId = 0; + channelId < CONNECTIONS_LIMIT; + channelId++) + { + if (channels_[channelId] != NULL) + { + deallocateTransport(channelId); + + delete channels_[channelId]; + channels_[channelId] = NULL; + } + } + + delete transport_; + delete compressor_; + + // + // Delete storage shared among channels. + // + + delete opcodeStore_; + + delete clientStore_; + delete serverStore_; + + delete clientCache_; + delete serverCache_; + + // + // Get rid of the image decompression + // resources. + // + + UnpackDestroy(); + + #ifdef DEBUG + *logofs << "Proxy: Deleted proxy object at " << this + << ".\n" << logofs_flush; + #endif +} + +int Proxy::setOperational() +{ + #ifdef TEST + *logofs << "Proxy: Entering operational mode.\n" + << logofs_flush; + #endif + + operation_ = operation_in_messages; + + return 1; +} + +int Proxy::setReadDescriptors(fd_set *fdSet, int &fdMax, T_timestamp &tsMax) +{ + // + // Set the initial timeout to the time of + // the next ping. If the congestion count + // is greater than zero, anyway, use a + // shorter timeout to force a congestion + // update. + // + + if (agent_ != nothing && congestions_[agent_] == 0 && + statistics -> getCongestionInFrame() >= 1 && + tokens_[token_control].remaining >= + (tokens_[token_control].limit - 1)) + { + setMinTimestamp(tsMax, control -> IdleTimeout); + + #ifdef TEST + *logofs << "Proxy: Initial timeout is " << tsMax.tv_sec + << " S and " << (double) tsMax.tv_usec / + 1000 << " Ms with congestion " + << statistics -> getCongestionInFrame() + << ".\n" << logofs_flush; + #endif + } + else + { + setMinTimestamp(tsMax, control -> PingTimeout); + + #ifdef TEST + *logofs << "Proxy: Initial timeout is " << tsMax.tv_sec + << " S and " << (double) tsMax.tv_usec / + 1000 << " Ms.\n" << logofs_flush; + #endif + } + + int fd = -1; + + if (isTimeToRead() == 1) + { + // + // If we don't have split tokens available + // don't set the timeout. + // + + if (tokens_[token_split].remaining > 0 && + isTimestamp(timeouts_.splitTs) == 1) + { + int diffTs = getTimeToNextSplit(); + + #if defined(TEST) || defined(INFO) || \ + defined(FLUSH) || defined(SPLIT) + + if (diffTimestamp(timeouts_.splitTs, + getTimestamp()) > timeouts_.split) + { + *logofs << "Proxy: FLUSH! SPLIT! WARNING! Running with " + << diffTimestamp(timeouts_.splitTs, getTimestamp()) + << " Ms elapsed since the last split.\n" + << logofs_flush; + } + + *logofs << "Proxy: FLUSH! SPLIT! Requesting timeout of " + << diffTs << " Ms as there are splits to send.\n" + << logofs_flush; + + #endif + + setMinTimestamp(tsMax, diffTs); + } + #if defined(TEST) || defined(INFO) + else if (isTimestamp(timeouts_.splitTs) == 1) + { + *logofs << "Proxy: WARNING! Not requesting a split " + << "timeout with " << tokens_[token_split].remaining + << " split tokens remaining.\n" << logofs_flush; + } + #endif + + // + // Loop through the valid channels and set + // the descriptors selected for read and + // the timeout. + // + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] == NULL) + { + continue; + } + + fd = getFd(channelId); + + if (channels_[channelId] -> getFinish() == 0 && + (channels_[channelId] -> getType() == channel_x11 || + tokens_[token_data].remaining > 0) && + congestions_[channelId] == 0) + { + FD_SET(fd, fdSet); + + if (fd >= fdMax) + { + fdMax = fd + 1; + } + + #ifdef TEST + *logofs << "Proxy: Descriptor FD#" << fd + << " selected for read with buffer length " + << transports_[channelId] -> length() + << ".\n" << logofs_flush; + #endif + + // + // Wakeup the proxy if there are motion + // events to flush. + // + + if (isTimestamp(timeouts_.motionTs) == 1) + { + int diffTs = getTimeToNextMotion(); + + #if defined(TEST) || defined(INFO) + + if (diffTimestamp(timeouts_.motionTs, + getTimestamp()) > timeouts_.motion) + { + *logofs << "Proxy: FLUSH! WARNING! Running with " + << diffTimestamp(timeouts_.motionTs, getTimestamp()) + << " Ms elapsed since the last motion.\n" + << logofs_flush; + } + + *logofs << "Proxy: FLUSH! Requesting timeout of " + << diffTs << " Ms as FD#" << fd << " has motion " + << "events to send.\n" << logofs_flush; + + #endif + + setMinTimestamp(tsMax, diffTs); + } + } + #if defined(TEST) || defined(INFO) + else + { + if (channels_[channelId] -> getType() != channel_x11 && + tokens_[token_data].remaining <= 0) + { + *logofs << "Proxy: WARNING! Descriptor FD#" << fd + << " not selected for read with " + << tokens_[token_data].remaining << " data " + << "tokens remaining.\n" << logofs_flush; + } + } + #endif + } + } + #if defined(TEST) || defined(INFO) + else + { + *logofs << "Proxy: WARNING! Disabled reading from channels.\n" + << logofs_flush; + + *logofs << "Proxy: WARNING! Congestion is " << congestion_ + << " pending " << transport_ -> pending() << " blocked " + << transport_ -> blocked() << " length " << transport_ -> + length() << ".\n" << logofs_flush; + } + #endif + + // + // Include the proxy descriptor. + // + + FD_SET(fd_, fdSet); + + if (fd_ >= fdMax) + { + fdMax = fd_ + 1; + } + + #ifdef TEST + *logofs << "Proxy: Proxy descriptor FD#" << fd_ + << " selected for read with buffer length " + << transport_ -> length() << ".\n" + << logofs_flush; + #endif + + return 1; +} + +// +// Add to the mask the file descriptors of all +// X connections to write to. +// + +int Proxy::setWriteDescriptors(fd_set *fdSet, int &fdMax, T_timestamp &tsMax) +{ + int fd = -1; + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] != NULL) + { + fd = getFd(channelId); + + if (transports_[channelId] -> length() > 0) + { + FD_SET(fd, fdSet); + + #ifdef TEST + *logofs << "Proxy: Descriptor FD#" << fd << " selected " + << "for write with blocked " << transports_[channelId] -> + blocked() << " and length " << transports_[channelId] -> + length() << ".\n" << logofs_flush; + #endif + + if (fd >= fdMax) + { + fdMax = fd + 1; + } + } + #ifdef TEST + else + { + *logofs << "Proxy: Descriptor FD#" << fd << " not selected " + << "for write with blocked " << transports_[channelId] -> + blocked() << " and length " << transports_[channelId] -> + length() << ".\n" << logofs_flush; + } + #endif + + #if defined(TEST) || defined(INFO) + + if (transports_[channelId] -> getType() != + transport_agent && transports_[channelId] -> + length() > 0 && transports_[channelId] -> + blocked() != 1) + { + *logofs << "Proxy: PANIC! Descriptor FD#" << fd + << " has data to write but blocked flag is " + << transports_[channelId] -> blocked() + << ".\n" << logofs_flush; + + cerr << "Error" << ": Descriptor FD#" << fd + << " has data to write but blocked flag is " + << transports_[channelId] -> blocked() + << ".\n"; + + HandleCleanup(); + } + + #endif + } + } + + // + // Check if the proxy transport has data + // from a previous blocking write. + // + + if (transport_ -> blocked() == 1) + { + FD_SET(fd_, fdSet); + + #ifdef TEST + *logofs << "Proxy: Proxy descriptor FD#" + << fd_ << " selected for write. Blocked is " + << transport_ -> blocked() << " length is " + << transport_ -> length() << ".\n" + << logofs_flush; + #endif + + if (fd_ >= fdMax) + { + fdMax = fd_ + 1; + } + } + #ifdef TEST + else + { + *logofs << "Proxy: Proxy descriptor FD#" + << fd_ << " not selected for write. Blocked is " + << transport_ -> blocked() << " length is " + << transport_ -> length() << ".\n" + << logofs_flush; + } + #endif + + // + // We are entering the main select. Save + // the timestamp of the last loop so that + // we can detect the clock drifts. + // + + timeouts_.loopTs = getTimestamp(); + + return 1; +} + +int Proxy::getChannels(T_channel_type type) +{ + int channels = 0; + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] != NULL && + (type == channel_none || + type == channels_[channelId] -> + getType())) + { + channels++; + } + } + + return channels; +} + +T_channel_type Proxy::getType(int fd) +{ + int channelId = getChannel(fd); + + if (channelId < 0 || channels_[channelId] == NULL) + { + return channel_none; + } + + return channels_[channelId] -> getType(); +} + +const char *Proxy::getTypeName(T_channel_type type) +{ + switch (type) + { + case channel_x11: + { + return "X"; + } + case channel_cups: + { + return "CUPS"; + } + case channel_smb: + { + return "SMB"; + } + case channel_media: + { + return "media"; + } + case channel_http: + { + return "HTTP"; + } + case channel_font: + { + return "font"; + } + case channel_slave: + { + return "slave"; + } + default: + { + return "unknown"; + } + } +} + +const char *Proxy::getComputerName() +{ + // + // Strangely enough, under some Windows OSes SMB + // service doesn't bind to localhost. Fall back + // to localhost if can't find computer name in + // the environment. In future we should try to + // bind to localhost and then try the other IPs. + // + + const char *hostname = NULL; + + #ifdef __CYGWIN32__ + + hostname = getenv("COMPUTERNAME"); + + #endif + + if (hostname == NULL) + { + hostname = "localhost"; + } + + return hostname; +} + +// +// Handle data from channels selected for read. +// + +int Proxy::handleRead(int &resultFds, fd_set &readSet) +{ + #ifdef DEBUG + *logofs << "Proxy: Checking descriptors selected for read.\n" + << logofs_flush; + #endif + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + #ifdef DEBUG + *logofs << "Proxy: Looping with current channel " + << *j << ".\n" << logofs_flush; + #endif + + int fd = getFd(*j); + + if (fd >= 0 && resultFds > 0 && FD_ISSET(fd, &readSet)) + { + #ifdef DEBUG + *logofs << "Proxy: Going to read messages from FD#" + << fd << ".\n" << logofs_flush; + #endif + + int result = handleRead(fd); + + if (result < 0) + { + #ifdef TEST + *logofs << "Proxy: Failure reading messages from FD#" + << fd << ".\n" << logofs_flush; + #endif + + return -1; + } + + #ifdef DEBUG + *logofs << "Proxy: Clearing the read descriptor " + << "for FD#" << fd << ".\n" << logofs_flush; + #endif + + FD_CLR(fd, &readSet); + + resultFds--; + } + } + + if (resultFds > 0 && FD_ISSET(fd_, &readSet)) + { + #ifdef DEBUG + *logofs << "Proxy: Going to read messages from " + << "proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + if (handleRead() < 0) + { + #ifdef TEST + *logofs << "Proxy: Failure reading from proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + return -1; + } + + #ifdef DEBUG + *logofs << "Proxy: Clearing the read descriptor " + << "for proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + FD_CLR(fd_, &readSet); + + resultFds--; + } + + return 1; +} + +// +// Perform flush on descriptors selected for write. +// + +int Proxy::handleFlush(int &resultFds, fd_set &writeSet) +{ + #ifdef DEBUG + *logofs << "Proxy: Checking descriptors selected for write.\n" + << logofs_flush; + #endif + + if (resultFds > 0 && FD_ISSET(fd_, &writeSet)) + { + #ifdef TEST + *logofs << "Proxy: FLUSH! Proxy descriptor FD#" << fd_ + << " reported to be writable.\n" + << logofs_flush; + #endif + + if (handleFlush() < 0) + { + #ifdef TEST + *logofs << "Proxy: Failure flushing the writable " + << "proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + return -1; + } + + #ifdef DEBUG + *logofs << "Proxy: Clearing the write descriptor " + << "for proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + FD_CLR(fd_, &writeSet); + + resultFds--; + } + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + resultFds > 0 && j != channelList.end(); j++) + { + #ifdef DEBUG + *logofs << "Proxy: Looping with current channel " + << *j << ".\n" << logofs_flush; + #endif + + int fd = getFd(*j); + + if (fd >= 0 && FD_ISSET(fd, &writeSet)) + { + #ifdef TEST + *logofs << "Proxy: X descriptor FD#" << fd + << " reported to be writable.\n" + << logofs_flush; + #endif + + // + // It can happen that, in handling reads, we + // have destroyed the buffer associated to a + // closed socket, so don't complain about + // the errors. + // + + handleFlush(fd); + + // + // Clear the descriptor from the mask so + // we don't confuse the agent if it's + // not checking only its own descriptors. + // + + #ifdef DEBUG + *logofs << "Proxy: Clearing the write descriptor " + << "for FD#" << fd << ".\n" + << logofs_flush; + #endif + + FD_CLR(fd, &writeSet); + + resultFds--; + } + } + + return 1; +} + +int Proxy::handleRead() +{ + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Decoding data from proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + // + // Decode all the available messages from + // the remote proxy until is not possible + // to read more. + // + + for (;;) + { + int result = readBuffer_.readMessage(); + + #if defined(TEST) || defined(DEBUG) || defined(INFO) + *logofs << "Proxy: Read result on proxy FD#" << fd_ + << " is " << result << ".\n" + << logofs_flush; + #endif + + if (result < 0) + { + if (shutdown_ == 0) + { + if (finish_ == 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Failure reading from the " + << "peer proxy on FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Failure reading from the " + << "peer proxy.\n"; + } + } + #ifdef TEST + else + { + *logofs << "Proxy: Closure of the proxy link detected " + << "after clean shutdown.\n" << logofs_flush; + } + #endif + + priority_ = 0; + finish_ = 1; + congestion_ = 0; + + return -1; + } + else if (result == 0) + { + #if defined(TEST) || defined(DEBUG) || defined(INFO) + *logofs << "Proxy: No data read from proxy FD#" + << fd_ << "\n" << logofs_flush; + #endif + + return 0; + } + + // + // We read some data from the remote. If we set + // the congestion flag because we couldn't read + // before the timeout and have tokens available, + // then reset the congestion flag. + // + + if (congestion_ == 1 && + tokens_[token_control].remaining > 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Exiting congestion due to " + << "proxy data with " << tokens_[token_control].remaining + << " tokens.\n" << logofs_flush; + #endif + + congestion_ = 0; + } + + // + // Set the timestamp of the last read + // operation from the remote proxy and + // enable again showing the 'no data + // received' dialog at the next timeout. + // + + timeouts_.readTs = getTimestamp(); + + if (alert_ != 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Displacing the dialog " + << "for proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + HandleAlert(DISPLACE_MESSAGE_ALERT, 1); + } + + timeouts_.alertTs = nullTimestamp(); + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Getting messages from proxy FD#" << fd_ + << " with " << readBuffer_.getLength() << " bytes " + << "in the read buffer.\n" << logofs_flush; + #endif + + unsigned int controlLength; + unsigned int dataLength; + + const unsigned char *message; + + while ((message = readBuffer_.getMessage(controlLength, dataLength)) != NULL) + { + statistics -> addFrameIn(); + + if (controlLength == 3 && *message == 0 && + *(message + 1) < code_last_tag) + { + if (handleControlFromProxy(message) < 0) + { + return -1; + } + } + else if (operation_ == operation_in_messages) + { + int channelId = inputChannel_; + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Identified message of " << dataLength + << " bytes for FD#" << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + if (channelId >= 0 && channelId < CONNECTIONS_LIMIT && + channels_[channelId] != NULL) + { + int finish = channels_[channelId] -> getFinish(); + + #ifdef WARNING + + if (finish == 1) + { + *logofs << "Proxy: WARNING! Handling data for finishing " + << "FD#" << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + } + + #endif + + // + // We need to decode all the data to preserve + // the consistency of the cache, so can't re- + // turn as soon as the first error is encount- + // ered. Check if this is the first time that + // the failure is detected. + // + + int result = channels_[channelId] -> handleWrite(message, dataLength); + + if (result < 0 && finish == 0) + { + #ifdef TEST + *logofs << "Proxy: Failed to write proxy data to FD#" + << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + if (handleFinish(channelId) < 0) + { + return -1; + } + } + + // + // Check if we have splits or motion + // events to send. + // + + setSplitTimeout(channelId); + setMotionTimeout(channelId); + } + #ifdef WARNING + else + { + *logofs << "Proxy: WARNING! Received data for " + << "invalid channel ID#" << channelId + << ".\n" << logofs_flush; + } + #endif + } + else if (operation_ == operation_in_statistics) + { + #ifdef TEST + *logofs << "Proxy: Received statistics data from remote proxy.\n" + << logofs_flush; + #endif + + if (handleStatisticsFromProxy(message, dataLength) < 0) + { + return -1; + } + + operation_ = operation_in_messages; + } + else if (operation_ == operation_in_negotiation) + { + #ifdef TEST + *logofs << "Proxy: Received new negotiation data from remote proxy.\n" + << logofs_flush; + #endif + + if (handleNegotiationFromProxy(message, dataLength) < 0) + { + return -1; + } + } + + // + // if (controlLength == 3 && *message == 0 && ...) ... + // else if (operation_ == operation_in_statistics) ... + // else if (operation_ == operation_in_messages) ... + // else if (operation_ == operation_in_negotiation) ... + // else ... + // + + else + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Unrecognized message received on proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + cerr << "Error" << ": Unrecognized message received on proxy FD#" + << fd_ << ".\n"; + + return -1; + } + + } // while ((message = readBuffer_.getMessage(controlLength, dataLength)) != NULL) ... + + // + // Reset the read buffer. + // + + readBuffer_.fullReset(); + + // + // Give up if no data is readable. + // + + if (transport_ -> readable() == 0) + { + break; + } + + } // End of for (;;) ... + + return 1; +} + +int Proxy::handleControlFromProxy(const unsigned char *message) +{ + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Received message '" << DumpControl(*(message + 1)) + << "' at " << strMsTimestamp() << " with data ID#" + << (int) *(message + 2) << ".\n" << logofs_flush; + #endif + + T_channel_type channelType = channel_none; + + switch (*(message + 1)) + { + case code_switch_connection: + { + int channelId = *(message + 2); + + // + // If channel is invalid further messages will + // be ignored. The acknowledged shutdown of + // channels should prevent this. + // + + inputChannel_ = channelId; + + break; + } + case code_begin_congestion: + { + // + // Set the congestion state for the + // channel reported by the remote. + // + + int channelId = *(message + 2); + + if (channels_[channelId] != NULL) + { + congestions_[channelId] = 1; + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Received a begin congestion " + << "for channel id ID#" << channelId + << ".\n" << logofs_flush; + #endif + + if (channelId == agent_ && congestions_[agent_] != 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Forcing an update of the congestion " + << "counter with agent congested.\n" + << logofs_flush; + #endif + + statistics -> updateCongestion(-tokens_[token_control].remaining, + tokens_[token_control].limit); + } + } + #ifdef WARNING + else + { + *logofs << "Proxy: WARNING! Received a begin congestion " + << "for invalid channel id ID#" << channelId + << ".\n" << logofs_flush; + } + #endif + + break; + } + case code_end_congestion: + { + // + // Attend again to the channel. + // + + int channelId = *(message + 2); + + if (channels_[channelId] != NULL) + { + congestions_[channelId] = 0; + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Received an end congestion " + << "for channel id ID#" << channelId + << ".\n" << logofs_flush; + #endif + + if (channelId == agent_ && congestions_[agent_] != 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Forcing an update of the congestion " + << "counter with agent decongested.\n" + << logofs_flush; + #endif + + statistics -> updateCongestion(tokens_[token_control].remaining, + tokens_[token_control].limit); + } + } + #ifdef WARNING + else + { + *logofs << "Proxy: WARNING! Received an end congestion " + << "for invalid channel id ID#" << channelId + << ".\n" << logofs_flush; + } + #endif + + break; + } + case code_control_token_request: + { + T_proxy_token &token = tokens_[token_control]; + + if (handleTokenFromProxy(token, *(message + 2)) < 0) + { + return -1; + } + + break; + } + case code_split_token_request: + { + T_proxy_token &token = tokens_[token_split]; + + if (handleTokenFromProxy(token, *(message + 2)) < 0) + { + return -1; + } + + break; + } + case code_data_token_request: + { + T_proxy_token &token = tokens_[token_data]; + + if (handleTokenFromProxy(token, *(message + 2)) < 0) + { + return -1; + } + + break; + } + case code_control_token_reply: + { + T_proxy_token &token = tokens_[token_control]; + + if (handleTokenReplyFromProxy(token, *(message + 2)) < 0) + { + return -1; + } + + break; + } + case code_split_token_reply: + { + T_proxy_token &token = tokens_[token_split]; + + if (handleTokenReplyFromProxy(token, *(message + 2)) < 0) + { + return -1; + } + + break; + } + case code_data_token_reply: + { + T_proxy_token &token = tokens_[token_data]; + + if (handleTokenReplyFromProxy(token, *(message + 2)) < 0) + { + return -1; + } + + break; + } + case code_new_x_connection: + { + // + // Opening the channel is handled later. + // + + channelType = channel_x11; + + break; + } + case code_new_cups_connection: + { + channelType = channel_cups; + + break; + } + case code_new_aux_connection: + { + // + // Starting from version 1.5.0 we create real X + // connections for the keyboard channel. We need + // to refuse old auxiliary X connections because + // they would be unable to leverage the new fake + // authorization cookie. + // + + #ifdef WARNING + *logofs << "Proxy: WARNING! Can't open outdated auxiliary X " + << "channel for code " << *(message + 1) << ".\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Can't open outdated auxiliary X " + << "channel for code " << *(message + 1) << ".\n"; + + if (handleControl(code_drop_connection, *(message + 2)) < 0) + { + return -1; + } + + break; + } + case code_new_smb_connection: + { + channelType = channel_smb; + + break; + } + case code_new_media_connection: + { + channelType = channel_media; + + break; + } + case code_new_http_connection: + { + channelType = channel_http; + + break; + } + case code_new_font_connection: + { + channelType = channel_font; + + break; + } + case code_new_slave_connection: + { + channelType = channel_slave; + + break; + } + case code_drop_connection: + { + int channelId = *(message + 2); + + if (channelId >= 0 && channelId < CONNECTIONS_LIMIT && + channels_[channelId] != NULL) + { + handleDropFromProxy(channelId); + } + #ifdef WARNING + else + { + *logofs << "Proxy: WARNING! Received a drop message " + << "for invalid channel id ID#" << channelId + << ".\n" << logofs_flush; + } + #endif + + break; + } + case code_finish_connection: + { + int channelId = *(message + 2); + + if (channelId >= 0 && channelId < CONNECTIONS_LIMIT && + channels_[channelId] != NULL) + { + // + // Force the finish state on the channel. + // We can receive this message while in + // the read loop, so we only mark the + // channel for deletion. + // + + #ifdef TEST + *logofs << "Proxy: Received a finish message for FD#" + << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + handleFinishFromProxy(channelId); + } + #ifdef WARNING + else + { + *logofs << "Proxy: WARNING! Received a finish message " + << "for invalid channel id ID#" << channelId + << ".\n" << logofs_flush; + } + #endif + + break; + } + case code_finish_listeners: + { + // + // This is from the main loop. + // + + #ifdef TEST + *logofs << "Proxy: Closing down all local listeners.\n" + << logofs_flush; + #endif + + CleanupListeners(); + + finish_ = 1; + + break; + } + case code_reset_request: + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Proxy reset not supported " + << "in this version.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Proxy reset not supported " + << "in this version.\n"; + + HandleCleanup(); + } + case code_shutdown_request: + { + // + // Time to rest in peace. + // + + shutdown_ = 1; + + break; + } + case code_load_request: + { + if (handleLoadFromProxy() < 0) + { + return -1; + } + + break; + } + case code_save_request: + { + // + // Don't abort the connection + // if can't write to disk. + // + + handleSaveFromProxy(); + + break; + } + case code_statistics_request: + { + int type = *(message + 2); + + if (handleStatisticsFromProxy(type) < 0) + { + return -1; + } + + break; + } + case code_statistics_reply: + { + operation_ = operation_in_statistics; + + break; + } + case code_alert_request: + { + HandleAlert(*(message + 2), 1); + + break; + } + case code_sync_request: + { + int channelId = *(message + 2); + + if (handleSyncFromProxy(channelId) < 0) + { + return -1; + } + + break; + } + case code_sync_reply: + { + // + // We are not the one that issued + // the request. + // + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: PANIC! Received an unexpected " + << "synchronization reply.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Received an unexpected " + << "synchronization reply.\n"; + + HandleCleanup(); + } + default: + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Received bad control message number " + << (unsigned int) *(message + 1) << " with attribute " + << (unsigned int) *(message + 2) << ".\n" << logofs_flush; + #endif + + cerr << "Error" << ": Received bad control message number " + << (unsigned int) *(message + 1) << " with attribute " + << (unsigned int) *(message + 2) << ".\n"; + + HandleCleanup(); + } + + } // End of switch (*(message + 1)) ... + + if (channelType == channel_none) + { + return 1; + } + + // + // Handle the channel allocation that we + // left from the main switch case. + // + + int channelId = *(message + 2); + + // + // Check if the channel has been dropped. + // + + if (channels_[channelId] != NULL && + (channels_[channelId] -> getDrop() == 1 || + channels_[channelId] -> getClosing() == 1)) + { + #ifdef TEST + *logofs << "Proxy: Dropping the descriptor FD#" + << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + handleDrop(channelId); + } + + // + // Check if the channel is in the valid + // range. + // + + int result = checkChannelMap(channelId); + + if (result >= 0) + { + result = handleNewConnectionFromProxy(channelType, channelId); + } + + if (result < 0) + { + // + // Realization of new channel failed. + // Send channel shutdown message to + // the peer proxy. + // + + if (handleControl(code_drop_connection, channelId) < 0) + { + return -1; + } + } + else + { + int fd = getFd(channelId); + + if (getReadable(fd) > 0) + { + #ifdef TEST + *logofs << "Proxy: Trying to read immediately " + << "from descriptor FD#" << fd << ".\n" + << logofs_flush; + #endif + + if (handleRead(fd) < 0) + { + return -1; + } + } + #ifdef TEST + *logofs << "Proxy: Nothing to read immediately " + << "from descriptor FD#" << fd << ".\n" + << logofs_flush; + #endif + } + + return 1; +} + +int Proxy::handleRead(int fd, const char *data, int size) +{ + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Handling data for connection on FD#" + << fd << ".\n" << logofs_flush; + #endif + + if (canRead(fd) == 0) + { + #if defined(TEST) || defined(INFO) + + if (getChannel(fd) < 0) + { + *logofs << "Proxy: PANIC! Can't read from invalid FD#" + << fd << ".\n" << logofs_flush; + + HandleCleanup(); + } + else + { + *logofs << "Proxy: WARNING! Read method called for FD#" + << fd << " but operation is not possible.\n" + << logofs_flush; + } + + #endif + + return 0; + } + + int channelId = getChannel(fd); + + // + // Let the channel object read all the new data from + // its file descriptor, isolate messages, compress + // those messages, and append the compressed form to + // the encode buffer. + // + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Reading messages from FD#" << fd + << " channel ID#" << channelId << ".\n" + << logofs_flush; + #endif + + int result = channels_[channelId] -> handleRead(encodeBuffer_, (const unsigned char *) data, + (unsigned int) size); + + // + // Even in the case of a failure, write the produced + // data to the proxy connection. To keep the stores + // synchronized, the remote side needs to decode any + // message encoded by this side, also if the X socket + // was closed in the meanwhile. If this is the case, + // the decompressed output will be silently discarded. + // + + if (result < 0) + { + #ifdef TEST + *logofs << "Proxy: Failed to read data from connection FD#" + << fd << " channel ID#" << channelId << ".\n" + << logofs_flush; + #endif + + if (handleFinish(channelId) < 0) + { + return -1; + } + } + + // + // Check if there are new splits or + // motion events to send. + // + + setSplitTimeout(channelId); + setMotionTimeout(channelId); + + return 1; +} + +int Proxy::handleEvents() +{ + #ifdef TEST + *logofs << "Proxy: Going to check the events on channels.\n" + << logofs_flush; + #endif + + // + // Check if we can safely write to the + // proxy link. + // + + int read = isTimeToRead(); + + // + // Loop on channels and send the pending + // events. We must copy the list because + // channels can be removed in the middle + // of the loop. + // + + T_list channelList = activeChannels_.copyList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] == NULL) + { + continue; + } + + // + // Check if we need to drop the channel. + // + + if (channels_[channelId] -> getDrop() == 1 || + channels_[channelId] -> getClosing() == 1) + { + #ifdef TEST + *logofs << "Proxy: Dropping the descriptor FD#" + << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + if (handleDrop(channelId) < 0) + { + return -1; + } + + continue; + } + else if (channels_[channelId] -> getFinish() == 1) + { + #ifdef TEST + *logofs << "Proxy: Skipping finishing " + << "descriptor FD#" << getFd(channelId) + << " channel ID#" << channelId << ".\n" + << logofs_flush; + #endif + + continue; + } + + // + // If the proxy link or the channel is + // in congestion state, don't handle + // the further events. + // + + if (read == 0 || congestions_[channelId] == 1) + { + #ifdef TEST + + if (read == 0) + { + *logofs << "Proxy: Can't handle events for FD#" + << getFd(channelId) << " channel ID#" + << channelId << " with proxy not available.\n" + << logofs_flush; + } + else + { + *logofs << "Proxy: Can't handle events for FD#" + << getFd(channelId) << " channel ID#" + << channelId << " with channel congested.\n" + << logofs_flush; + } + + #endif + + continue; + } + + // + // Handle the timeouts on the channel + // operations. + // + + int result = 0; + + // + // Handle the motion events. + // + + if (result >= 0 && channels_[channelId] -> needMotion() == 1) + { + if (isTimeToMotion() == 1) + { + #if defined(TEST) || defined(INFO) || defined(FLUSH) + + *logofs << "Proxy: FLUSH! Motion timeout expired after " + << diffTimestamp(timeouts_.motionTs, getTimestamp()) + << " Ms.\n" << logofs_flush; + + #endif + + result = channels_[channelId] -> handleMotion(encodeBuffer_); + + #ifdef TEST + + if (result < 0) + { + *logofs << "Proxy: Failed to handle motion events for FD#" + << getFd(channelId) << " channel ID#" << channelId + << ".\n" << logofs_flush; + } + + #endif + + timeouts_.motionTs = nullTimestamp(); + + setMotionTimeout(channelId); + } + #if defined(TEST) || defined(INFO) + else if (isTimestamp(timeouts_.motionTs) == 1) + { + *logofs << "Proxy: Running with " + << diffTimestamp(timeouts_.motionTs, getTimestamp()) + << " Ms elapsed since the last motion.\n" + << logofs_flush; + } + #endif + } + + if (result >= 0 && channels_[channelId] -> needSplit() == 1) + { + // + // Check if it is time to send more splits + // and how many bytes are going to be sent. + // + + if (isTimeToSplit() == 1) + { + #if defined(TEST) || defined(INFO) || defined(SPLIT) + *logofs << "Proxy: SPLIT! Split timeout expired after " + << diffTimestamp(timeouts_.splitTs, getTimestamp()) + << " Ms.\n" << logofs_flush; + #endif + + #if defined(TEST) || defined(INFO) || defined(SPLIT) + + *logofs << "Proxy: SPLIT! Encoding splits for FD#" + << getFd(channelId) << " at " << strMsTimestamp() + << " with " << clientStore_ -> getSplitTotalStorageSize() + << " total bytes and " << control -> SplitDataPacketLimit + << " bytes " << "to write.\n" + << logofs_flush; + + #endif + + result = channels_[channelId] -> handleSplit(encodeBuffer_); + + #ifdef TEST + + if (result < 0) + { + *logofs << "Proxy: Failed to handle splits for FD#" + << getFd(channelId) << " channel ID#" << channelId + << ".\n" << logofs_flush; + } + + #endif + + timeouts_.splitTs = nullTimestamp(); + + setSplitTimeout(channelId); + } + #if defined(TEST) || defined(INFO) || defined(SPLIT) + else if (channels_[channelId] -> needSplit() == 1 && + isTimestamp(timeouts_.splitTs) == 0) + { + *logofs << "Proxy: SPLIT! WARNING! Channel for FD#" + << getFd(channelId) << " has split to send but " + << "there is no timeout.\n" << logofs_flush; + } + else if (isTimestamp(timeouts_.splitTs) == 1) + { + *logofs << "Proxy: SPLIT! Running with " + << diffTimestamp(timeouts_.splitTs, getTimestamp()) + << " Ms elapsed since the last split.\n" + << logofs_flush; + } + #endif + } + + if (result < 0) + { + #ifdef TEST + *logofs << "Proxy: Error handling events for FD#" + << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + if (handleFinish(channelId) < 0) + { + return -1; + } + } + } + + return 1; +} + +int Proxy::handleFrame(T_frame_type type) +{ + // + // Write any outstanding control message, followed by the + // content of the encode buffer, to the proxy transport. + // + // This code assumes that the encode buffer data is at an + // offset several bytes from start of the buffer, so that + // the length header and any necessary control bytes can + // be inserted in front of the data already in the buffer. + // This is the easiest way to encapsulate header and data + // together in a single frame. + // + // The way framing is implemented is very efficient but + // inherently limited and does not allow for getting the + // best performance, especially when running over a fast + // link. Framing should be rewritten to include the length + // of the packets in a fixed size header and, possibly, + // to incapsulate the control messages and the channel's + // data in a pseudo X protocol message, so that the proxy + // itself would be treated like any other channel. + // + + #if defined(TEST) || defined(INFO) + + if (congestion_ == 1) + { + // + // This can happen because there may be control + // messages to send, like a proxy shutdown mes- + // sage or a statistics request. All the other + // cases should be considered an error. + // + + #ifdef WARNING + *logofs << "Proxy: WARNING! Data is to be sent while " + << "congestion is " << congestion_ << ".\n" + << logofs_flush; + #endif + } + + #endif + + // + // Check if there is any data available on + // the socket. Recent Linux kernels are very + // picky. They require that we read often or + // they assume that the process is non-inter- + // active. + // + + if (handleAsyncEvents() < 0) + { + return -1; + } + + // + // Check if this is a ping, not a data frame. + // + + if (type == frame_ping) + { + if (handleToken(frame_ping) < 0) + { + return -1; + } + } + + unsigned int dataLength = encodeBuffer_.getLength(); + + #ifdef DEBUG + *logofs << "Proxy: Data length is " << dataLength + << " control length is " << controlLength_ + << ".\n" << logofs_flush; + #endif + + if (dataLength > 0) + { + // + // If this is a generic channel we need + // to add the completion bits. Data can + // also have been encoded because of a + // statistics request, even if no output + // channel was currently selected. + // + + if (outputChannel_ != -1) + { + #if defined(TEST) || defined(INFO) + + if (channels_[outputChannel_] == NULL) + { + *logofs << "Proxy: PANIC! A new frame was requested " + << "but the channel is invalid.\n" + << logofs_flush; + + HandleCleanup(); + } + + #endif + + channels_[outputChannel_] -> handleCompletion(encodeBuffer_); + + dataLength = encodeBuffer_.getLength(); + } + } + else if (controlLength_ == 0) + { + #if defined(TEST) || defined(INFO) + + *logofs << "Proxy: PANIC! A new frame was requested " + << "but there is no data to write.\n" + << logofs_flush; + + HandleCleanup(); + + #endif + + return 0; + } + + #ifdef DEBUG + *logofs << "Proxy: Data length is now " << dataLength + << " control length is " << controlLength_ + << ".\n" << logofs_flush; + #endif + + // + // Check if this frame needs to carry a new + // token request. + // + + if (type == frame_data) + { + if (handleToken(frame_data) < 0) + { + return -1; + } + } + + #ifdef DEBUG + *logofs << "Proxy: Adding a new frame for the remote proxy.\n" + << logofs_flush; + #endif + + unsigned char temp[5]; + + unsigned int lengthLength = 0; + unsigned int shift = dataLength; + + while (shift) + { + temp[lengthLength++] = (unsigned char) (shift & 0x7f); + + shift >>= 7; + } + + unsigned char *data = encodeBuffer_.getData(); + + unsigned char *outputMessage = data - (controlLength_ + lengthLength); + + unsigned char *nextDest = outputMessage; + + for (int i = 0; i < controlLength_; i++) + { + *nextDest++ = controlCodes_[i]; + } + + for (int j = lengthLength - 1; j > 0; j--) + { + *nextDest++ = (temp[j] | 0x80); + } + + if (lengthLength) + { + *nextDest++ = temp[0]; + } + + unsigned int outputLength = dataLength + controlLength_ + lengthLength; + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Produced plain output for " << dataLength << "+" + << controlLength_ << "+" << lengthLength << " out of " + << outputLength << " bytes.\n" << logofs_flush; + #endif + + #if defined(TEST) || defined(INFO) || defined(FLUSH) || defined(TIME) + + T_timestamp nowTs = getTimestamp(); + + *logofs << "Proxy: FLUSH! Immediate with blocked " << transport_ -> + blocked() << " length " << transport_ -> length() + << " new " << outputLength << " flushable " << transport_ -> + flushable() << " tokens " << tokens_[token_control].remaining + << " after " << diffTimestamp(timeouts_.writeTs, nowTs) + << " Ms.\n" << logofs_flush; + + *logofs << "Proxy: FLUSH! Immediate flush to proxy FD#" << fd_ + << " of " << outputLength << " bytes at " << strMsTimestamp() + << " with priority " << priority_ << ".\n" << logofs_flush; + + *logofs << "Proxy: FLUSH! Current bitrate is " + << statistics -> getBitrateInShortFrame() << " with " + << statistics -> getBitrateInLongFrame() << " in the " + << "long frame and top " << statistics -> + getTopBitrate() << ".\n" << logofs_flush; + #endif + + statistics -> addWriteOut(); + + int result = transport_ -> write(write_immediate, outputMessage, outputLength); + + #ifdef TIME + + if (diffTimestamp(timeouts_.writeTs, nowTs) > 50) + { + *logofs << "Proxy: WARNING! TIME! Data written to proxy FD#" + << fd_ << " at " << strMsTimestamp() << " after " + << diffTimestamp(timeouts_.writeTs, nowTs) + << " Ms.\n" << logofs_flush; + } + + #endif + + #ifdef DUMP + *logofs << "Proxy: Sent " << outputLength << " bytes of data " + << "with checksum "; + + DumpChecksum(outputMessage, outputLength); + + *logofs << " on proxy FD#" << fd_ << ".\n" << logofs_flush; + #endif + + #ifdef DUMP + *logofs << "Proxy: Partial checksums are:\n"; + + DumpBlockChecksums(outputMessage, outputLength, 256); + + *logofs << logofs_flush; + #endif + + // + // Clean up the encode buffer and + // bring it to the initial size. + // + + encodeBuffer_.fullReset(); + + // + // Close the connection if we got + // an error. + // + + if (result < 0) + { + #ifdef TEST + *logofs << "Proxy: Failed write to proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + return -1; + } + + // + // Account for the data frame and the + // framing overhead. + // + + if (dataLength > 0) + { + statistics -> addFrameOut(); + } + + statistics -> addFramingBits((controlLength_ + lengthLength) << 3); + + controlLength_ = 0; + + // + // Reset all buffers, counters and the + // priority flag. + // + + handleResetFlush(); + + // + // Check if more data became available + // after writing. + // + + if (handleAsyncEvents() < 0) + { + return -1; + } + + // + // Drain the proxy link if we are in + // congestion state. + // + // if (needDrain() == 1 && draining_ == 0) + // { + // if (handleDrain() < 0) + // { + // return -1; + // } + // } + // + + return result; +} + +int Proxy::handleFlush() +{ + // + // We can have data in the encode buffer or + // control bytes to send. In the case make + // up a new frame. + // + + if (encodeBuffer_.getLength() + controlLength_ > 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Flushing data in the encode buffer.\n" + << logofs_flush; + #endif + + priority_ = 1; + + if (handleFrame(frame_data) < 0) + { + return -1; + } + } + + // + // Check if we have something to write. + // + + if (transport_ -> length() + transport_ -> flushable() == 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Nothing else to flush for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + return 0; + } + + #if defined(TEST) || defined(INFO) + + if (transport_ -> blocked() == 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Proxy descriptor FD#" << fd_ + << " has data to flush but the transport " + << "is not blocked.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Proxy descriptor FD#" << fd_ + << " has data to flush but the transport " + << "is not blocked.\n"; + + HandleCleanup(); + } + + #endif + + #if defined(TEST) || defined(INFO) || defined(FLUSH) + *logofs << "Proxy: FLUSH! Deferred with blocked " << transport_ -> + blocked() << " length " << transport_ -> length() + << " flushable " << transport_ -> flushable() << " tokens " + << tokens_[token_control].remaining << ".\n" + << logofs_flush; + + *logofs << "Proxy: FLUSH! Deferred flush to proxy FD#" << fd_ + << " of " << transport_ -> length() + transport_ -> + flushable() << " bytes at " << strMsTimestamp() + << " with priority " << priority_ << ".\n" + << logofs_flush; + + *logofs << "Proxy: FLUSH! Current bitrate is " + << statistics -> getBitrateInShortFrame() << " with " + << statistics -> getBitrateInLongFrame() << " in the " + << "long frame and top " << statistics -> + getTopBitrate() << ".\n" << logofs_flush; + #endif + + statistics -> addWriteOut(); + + int result = transport_ -> flush(); + + if (result < 0) + { + return -1; + } + + // + // Reset the counters and update the + // timestamp of the last write. + // + + handleResetFlush(); + + return result; +} + +int Proxy::handleDrain() +{ + // + // If the proxy is run in the same process + // as SSH, we can't block or the program + // would not have a chance to read or write + // its data. + // + + if (control -> LinkEncrypted == 1) + { + return 0; + } + + if (needDrain() == 0 || draining_ == 1) + { + #if defined(TEST) || defined(INFO) + + if (draining_ == 1) + { + *logofs << "Proxy: WARNING! Already draining proxy FD#" + << fd_ << " at " << strMsTimestamp() << ".\n" + << logofs_flush; + } + else + { + *logofs << "Proxy: WARNING! No need to drain proxy FD#" + << fd_ << " with congestion " << congestion_ + << " length " << transport_ -> length() + << " and blocked " << transport_ -> blocked() + << ".\n" << logofs_flush; + } + + #endif + + return 0; + } + + draining_ = 1; + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Going to drain the proxy FD#" << fd_ + << " at " << strMsTimestamp() << ".\n" + << logofs_flush; + #endif + + int timeout = control -> PingTimeout / 2; + + T_timestamp startTs = getNewTimestamp(); + + T_timestamp nowTs = startTs; + + int remaining; + int result; + + // + // Keep draining the proxy socket while + // reading the incoming messages until + // the timeout is expired. + // + + for (;;) + { + remaining = timeout - diffTimestamp(startTs, nowTs); + + if (remaining <= 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Timeout raised while draining " + << "FD#" << fd_ << " at " << strMsTimestamp() + << " after " << diffTimestamp(startTs, nowTs) + << " Ms.\n" << logofs_flush; + #endif + + result = 0; + + goto ProxyDrainEnd; + } + + if (transport_ -> length() > 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Trying to write to FD#" << fd_ + << " at " << strMsTimestamp() << " with length " + << transport_ -> length() << " and " + << remaining << " Ms remaining.\n" + << logofs_flush; + #endif + + result = transport_ -> drain(0, remaining); + + if (result == -1) + { + result = -1; + + goto ProxyDrainEnd; + } + else if (result == 0 && transport_ -> readable() > 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Decoding more data from proxy FD#" + << fd_ << " at " << strMsTimestamp() << " with " + << transport_ -> length() << " bytes to write and " + << transport_ -> readable() << " readable.\n" + << logofs_flush; + #endif + + if (handleRead() < 0) + { + result = -1; + + goto ProxyDrainEnd; + } + } + #if defined(TEST) || defined(INFO) + else if (result == 1) + { + *logofs << "Proxy: Transport for proxy FD#" << fd_ + << " drained down to " << transport_ -> length() + << " bytes.\n" << logofs_flush; + } + #endif + } + else + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Waiting for more data from proxy " + << "FD#" << fd_ << " at " << strMsTimestamp() + << " with " << remaining << " Ms remaining.\n" + << logofs_flush; + #endif + + + result = transport_ -> wait(remaining); + + if (result == -1) + { + result = -1; + + goto ProxyDrainEnd; + } + else if (result > 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Decoding more data from proxy FD#" + << fd_ << " at " << strMsTimestamp() << " with " + << transport_ -> readable() << " bytes readable.\n" + << logofs_flush; + #endif + + if (handleRead() < 0) + { + result = -1; + + goto ProxyDrainEnd; + } + } + } + + // + // Check if we finally got the tokens + // that would allow us to come out of + // the congestion state. + // + + if (needDrain() == 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Got decongestion for proxy FD#" + << fd_ << " at " << strMsTimestamp() << " after " + << diffTimestamp(startTs, getTimestamp()) + << " Ms.\n" << logofs_flush; + #endif + + result = 1; + + goto ProxyDrainEnd; + } + + nowTs = getNewTimestamp(); + } + +ProxyDrainEnd: + + draining_ = 0; + + return result; +} + +int Proxy::handleFlush(int fd) +{ + int channelId = getChannel(fd); + + if (channelId < 0 || channels_[channelId] == NULL) + { + #ifdef TEST + *logofs << "Proxy: WARNING! Skipping flush on invalid " + << "descriptor FD#" << fd << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + return 0; + } + else if (channels_[channelId] -> getFinish() == 1) + { + #ifdef TEST + *logofs << "Proxy: Skipping flush on finishing " + << "descriptor FD#" << fd << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + return 0; + } + + #ifdef TEST + *logofs << "Proxy: Going to flush FD#" << fd + << " with blocked " << transports_[channelId] -> blocked() + << " length " << transports_[channelId] -> length() + << ".\n" << logofs_flush; + #endif + + if (channels_[channelId] -> handleFlush() < 0) + { + #ifdef TEST + *logofs << "Proxy: Failed to flush data to FD#" + << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + handleFinish(channelId); + + return -1; + } + + return 1; +} + +int Proxy::handleStatistics(int type, ostream *stream) +{ + if (stream == NULL || control -> EnableStatistics == 0) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! Cannot produce statistics " + << " for proxy FD#" << fd_ << ". Invalid settings " + << "for statistics or stream.\n" << logofs_flush; + #endif + + return 0; + } + else if (currentStatistics_ != NULL) + { + // + // Need to update the stream pointer as the + // previous one could have been destroyed. + // + + #ifdef WARNING + *logofs << "Proxy: WARNING! Replacing stream while producing " + << "statistics in stream at " << currentStatistics_ + << " for proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + } + + currentStatistics_ = stream; + + // + // Get statistics of remote peer. + // + + if (handleControl(code_statistics_request, type) < 0) + { + return -1; + } + + return 1; +} + +int Proxy::handleStatisticsFromProxy(int type) +{ + if (needFlush() == 1) + { + #if defined(TEST) || defined(INFO) || defined(FLUSH) + *logofs << "Proxy: WARNING! Data for the previous " + << "channel ID#" << outputChannel_ + << " flushed in statistics.\n" + << logofs_flush; + #endif + + if (handleFrame(frame_data) < 0) + { + return -1; + } + } + + if (control -> EnableStatistics == 1) + { + // + // Allocate a buffer for the output. + // + + char *buffer = new char[STATISTICS_LENGTH]; + + *buffer = '\0'; + + if (control -> ProxyMode == proxy_client) + { + #ifdef TEST + *logofs << "Proxy: Producing " + << (type == TOTAL_STATS ? "total" : "partial") + << " client statistics for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + statistics -> getClientProtocolStats(type, buffer); + + statistics -> getClientOverallStats(type, buffer); + } + else + { + #ifdef TEST + *logofs << "Proxy: Producing " + << (type == TOTAL_STATS ? "total" : "partial") + << " server statistics for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + statistics -> getServerProtocolStats(type, buffer); + } + + if (type == PARTIAL_STATS) + { + statistics -> resetPartialStats(); + } + + unsigned int length = strlen((char *) buffer) + 1; + + encodeBuffer_.encodeValue(type, 8); + + encodeBuffer_.encodeValue(length, 32); + + #ifdef TEST + *logofs << "Proxy: Encoding " << length + << " bytes of statistics data for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + encodeBuffer_.encodeMemory((unsigned char *) buffer, length); + + // + // Account statistics data as framing bits. + // + + statistics -> addFramingBits(length << 3); + + delete [] buffer; + } + else + { + #ifdef WARNING + *logofs << "Proxy: WARNING! Got statistics request " + << "but local statistics are disabled.\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Got statistics request " + << "but local statistics are disabled.\n"; + + type = NO_STATS; + + encodeBuffer_.encodeValue(type, 8); + + #ifdef TEST + *logofs << "Proxy: Sending error code to remote proxy on FD#" + << fd_ << ".\n" << logofs_flush; + #endif + } + + // + // The next write will flush the statistics + // data and the control message. + // + + if (handleControl(code_statistics_reply, type) < 0) + { + return -1; + } + + return 1; +} + +int Proxy::handleStatisticsFromProxy(const unsigned char *message, unsigned int length) +{ + if (currentStatistics_ == NULL) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! Unexpected statistics data received " + << "from remote proxy on FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Unexpected statistics data received " + << "from remote proxy.\n"; + + return 0; + } + + // + // Allocate the decode buffer and at least + // the 'type' field to see if there was an + // error. + // + + DecodeBuffer decodeBuffer(message, length); + + unsigned int type; + + decodeBuffer.decodeValue(type, 8); + + if (type == NO_STATS) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Couldn't get statistics from remote " + << "proxy on FD#" << fd_ << ".\n" << logofs_flush; + #endif + + cerr << "Error" << ": Couldn't get statistics from remote proxy.\n"; + } + else if (type != TOTAL_STATS && type != PARTIAL_STATS) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Cannot produce statistics " + << "with qualifier '" << type << "'.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Cannot produce statistics " + << "with qualifier '" << type << "'.\n"; + + return -1; + } + else + { + unsigned int size; + + decodeBuffer.decodeValue(size, 32); + + char *buffer = new char[STATISTICS_LENGTH]; + + *buffer = '\0'; + + if (control -> EnableStatistics == 1) + { + if (control -> ProxyMode == proxy_client) + { + #ifdef TEST + *logofs << "Proxy: Finalizing " + << (type == TOTAL_STATS ? "total" : "partial") + << " client statistics for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + statistics -> getClientCacheStats(type, buffer); + + #ifdef TEST + *logofs << "Proxy: Decoding " << size + << " bytes of statistics data for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size); + + statistics -> getClientProtocolStats(type, buffer); + + statistics -> getClientOverallStats(type, buffer); + } + else + { + #ifdef TEST + *logofs << "Proxy: Finalizing " + << (type == TOTAL_STATS ? "total" : "partial") + << " server statistics for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + statistics -> getServerCacheStats(type, buffer); + + statistics -> getServerProtocolStats(type, buffer); + + #ifdef TEST + *logofs << "Proxy: Decoding " << size + << " bytes of statistics data for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size); + } + + if (type == PARTIAL_STATS) + { + statistics -> resetPartialStats(); + } + + *currentStatistics_ << buffer; + + // + // Mark the end of text to help external parsing. + // + + *currentStatistics_ << '\4'; + + *currentStatistics_ << flush; + } + else + { + // + // It can be that statistics were enabled at the time + // we issued the request (otherwise we could not have + // set the stream), but now they have been disabled + // by user. We must decode statistics data if we want + // to keep the connection. + // + + #ifdef TEST + *logofs << "Proxy: Discarding " << size + << " bytes of statistics data for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size); + } + + delete [] buffer; + } + + currentStatistics_ = NULL; + + return 1; +} + +int Proxy::handleNegotiation(const unsigned char *message, unsigned int length) +{ + #ifdef PANIC + *logofs << "Proxy: PANIC! Writing data during proxy " + << "negotiation is not implemented.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Writing data during proxy " + << "negotiation is not implemented.\n"; + + return -1; +} + +int Proxy::handleNegotiationFromProxy(const unsigned char *message, unsigned int length) +{ + #ifdef PANIC + *logofs << "Proxy: PANIC! Reading data during proxy " + << "negotiation is not implemented.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Reading data during proxy " + << "negotiation is not implemented.\n"; + + return -1; +} + +int Proxy::handleAlert(int alert) +{ + if (handleControl(code_alert_request, alert) < 0) + { + return -1; + } + + return 1; +} + +int Proxy::handleCloseConnection(int clientFd) +{ + int channelId = getChannel(clientFd); + + if (channels_[channelId] != NULL && + channels_[channelId] -> getFinish() == 0) + { + #ifdef TEST + *logofs << "Proxy: Closing down the channel for FD#" + << clientFd << ".\n" << logofs_flush; + #endif + + if (handleFinish(channelId) < 0) + { + return -1; + } + + return 1; + } + + return 0; +} + +int Proxy::handleCloseAllXConnections() +{ + #ifdef TEST + *logofs << "Proxy: Closing down any remaining X channel.\n" + << logofs_flush; + #endif + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] != NULL && + channels_[channelId] -> getType() == channel_x11 && + channels_[channelId] -> getFinish() == 0) + { + #ifdef TEST + *logofs << "Proxy: Closing down the channel for FD#" + << getFd(channelId) << ".\n" << logofs_flush; + #endif + + if (handleFinish(channelId) < 0) + { + return -1; + } + } + } + + return 1; +} + +int Proxy::handleCloseAllListeners() +{ + if (control -> isProtoStep7() == 1) + { + if (finish_ == 0) + { + #ifdef TEST + *logofs << "Proxy: Closing down all remote listeners.\n" + << logofs_flush; + #endif + + if (handleControl(code_finish_listeners) < 0) + { + return -1; + } + + finish_ = 1; + } + } + else + { + #ifdef TEST + *logofs << "Proxy: WARNING! Not sending unsupported " + << "'code_finish_listeners' message.\n" + << logofs_flush; + #endif + + finish_ = 1; + } + + return 1; +} + +void Proxy::handleResetAlert() +{ + if (alert_ != 0) + { + #ifdef TEST + *logofs << "Proxy: The proxy alert '" << alert_ + << "' was displaced.\n" << logofs_flush; + #endif + + alert_ = 0; + } + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] != NULL) + { + channels_[channelId] -> handleResetAlert(); + } + } +} + +int Proxy::handleFinish(int channelId) +{ + // + // Send any outstanding encoded data and + // do any finalization needed on the + // channel. + // + + if (needFlush(channelId) == 1) + { + if (channels_[channelId] -> getFinish() == 1) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! The finishing channel ID#" + << channelId << " has data to flush.\n" + << logofs_flush; + #endif + } + + #if defined(TEST) || defined(INFO) || defined(FLUSH) + *logofs << "Proxy: WARNING! Flushing data for the " + << "finishing channel ID#" << channelId + << ".\n" << logofs_flush; + #endif + + if (handleFrame(frame_data) < 0) + { + return -1; + } + } + + // + // Reset the congestion state and the + // timeouts, if needed. + // + + congestions_[channelId] = 0; + + setSplitTimeout(channelId); + setMotionTimeout(channelId); + + if (channels_[channelId] -> getFinish() == 0) + { + channels_[channelId] -> handleFinish(); + + // + // Force a failure in the case somebody + // would try to read from the channel. + // + + shutdown(getFd(channelId), SHUT_RD); + + // + // If the failure was not originated by + // the remote, send a channel shutdown + // message. + // + + if (channels_[channelId] -> getClosing() == 0) + { + #ifdef TEST + *logofs << "Proxy: Finishing channel for FD#" + << getFd(channelId) << " channel ID#" + << channelId << " because of failure.\n" + << logofs_flush; + #endif + + if (handleControl(code_finish_connection, channelId) < 0) + { + return -1; + } + } + } + + return 1; +} + +int Proxy::handleFinishFromProxy(int channelId) +{ + // + // Check if this channel has pending + // data to send. + // + + if (needFlush(channelId) == 1) + { + #if defined(TEST) || defined(INFO) || defined(FLUSH) + *logofs << "Proxy: WARNING! Flushing data for the " + << "finishing channel ID#" << channelId + << ".\n" << logofs_flush; + #endif + + if (handleFrame(frame_data) < 0) + { + return -1; + } + } + + // + // Mark the channel. We will free its + // resources at the next loop and will + // send the drop message to the remote. + // + + if (channels_[channelId] -> getClosing() == 0) + { + #ifdef TEST + *logofs << "Proxy: Marking channel for FD#" + << getFd(channelId) << " channel ID#" + << channelId << " as closing.\n" + << logofs_flush; + #endif + + channels_[channelId] -> handleClosing(); + } + + if (channels_[channelId] -> getFinish() == 0) + { + #ifdef TEST + *logofs << "Proxy: Finishing channel for FD#" + << getFd(channelId) << " channel ID#" + << channelId << " because of proxy.\n" + << logofs_flush; + #endif + + channels_[channelId] -> handleFinish(); + } + + if (handleFinish(channelId) < 0) + { + return -1; + } + + return 1; +} + +int Proxy::handleDropFromProxy(int channelId) +{ + // + // Only mark the channel. + // + + #ifdef TEST + *logofs << "Proxy: Marking channel for FD#" + << getFd(channelId) << " channel ID#" + << channelId << " as being dropped.\n" + << logofs_flush; + #endif + + if (channels_[channelId] -> getDrop() == 0) + { + channels_[channelId] -> handleDrop(); + } + + return 1; +} + +// +// Close the channel and deallocate all its +// resources. +// + +int Proxy::handleDrop(int channelId) +{ + // + // Check if this channel has pending + // data to send. + // + + if (needFlush(channelId) == 1) + { + if (channels_[channelId] -> getFinish() == 1) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! The dropping channel ID#" + << channelId << " has data to flush.\n" + << logofs_flush; + #endif + } + + #if defined(TEST) || defined(INFO) || defined(FLUSH) + *logofs << "Proxy: WARNING! Flushing data for the " + << "dropping channel ID#" << channelId + << ".\n" << logofs_flush; + #endif + + if (handleFrame(frame_data) < 0) + { + return -1; + } + } + + #ifdef TEST + *logofs << "Proxy: Dropping channel for FD#" + << getFd(channelId) << " channel ID#" + << channelId << ".\n" << logofs_flush; + #endif + + if (channels_[channelId] -> getFinish() == 0) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! The channel for FD#" + << getFd(channelId) << " channel ID#" + << channelId << " was not marked as " + << "finishing.\n" << logofs_flush; + #endif + + cerr << "Warning" << ": The channel for FD#" + << getFd(channelId) << " channel ID#" + << channelId << " was not marked as " + << "finishing.\n"; + + channels_[channelId] -> handleFinish(); + } + + // + // Send the channel shutdown message + // to the peer proxy. + // + + if (channels_[channelId] -> getClosing() == 1) + { + if (handleControl(code_drop_connection, channelId) < 0) + { + return -1; + } + } + + // + // Get rid of the channel. + // + + if (channels_[channelId] -> getType() != channel_x11) + { + #ifdef TEST + *logofs << "Proxy: Closed connection to " + << getTypeName(channels_[channelId] -> getType()) + << " server.\n" << logofs_flush; + #endif + + cerr << "Info" << ": Closed connection to " + << getTypeName(channels_[channelId] -> getType()) + << " server.\n"; + } + + delete channels_[channelId]; + channels_[channelId] = NULL; + + cleanupChannelMap(channelId); + + // + // Get rid of the transport. + // + + deallocateTransport(channelId); + + congestions_[channelId] = 0; + + decreaseChannels(channelId); + + // + // Check if the channel was the + // one currently selected for + // output. + // + + if (outputChannel_ == channelId) + { + outputChannel_ = -1; + } + + return 1; +} + +// +// Send an empty message to the remote peer +// to verify if the link is alive and let +// the remote proxy detect a congestion. +// + +int Proxy::handlePing() +{ + T_timestamp nowTs = getTimestamp(); + + #if defined(DEBUG) || defined(PING) + + *logofs << "Proxy: Checking ping at " + << strMsTimestamp(nowTs) << logofs_flush; + + *logofs << " with last loop at " + << strMsTimestamp(timeouts_.loopTs) << ".\n" + << logofs_flush; + + *logofs << "Proxy: Last bytes in at " + << strMsTimestamp(timeouts_.readTs) << logofs_flush; + + *logofs << " last bytes out at " + << strMsTimestamp(timeouts_.writeTs) << ".\n" + << logofs_flush; + + *logofs << "Proxy: Last ping at " + << strMsTimestamp(timeouts_.pingTs) << ".\n" + << logofs_flush; + + #endif + + // + // Be sure we take into account any clock drift. This + // can be caused by the user changing the system timer + // or by small adjustments introduced by the operating + // system making the clock go backward. + // + + if (checkDiffTimestamp(timeouts_.loopTs, nowTs) == 0) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! Detected drift in system " + << "timer. Resetting to current time.\n" + << logofs_flush; + #endif + + timeouts_.pingTs = nowTs; + timeouts_.readTs = nowTs; + timeouts_.writeTs = nowTs; + } + + // + // Check timestamp of last read from remote proxy. It can + // happen that we stayed in the main loop long enough to + // have idle timeout expired, for example if the proxy was + // stopped and restarted or because of an extremely high + // load of the system. In this case we don't complain if + // there is something new to read from the remote. + // + + int diffIn = diffTimestamp(timeouts_.readTs, nowTs); + + if (diffIn >= (control -> PingTimeout * 2) - + control -> LatencyTimeout) + { + // + // Force a read to detect whether the remote proxy + // aborted the connection. + // + + int result = handleRead(); + + if (result < 0) + { + #if defined(TEST) || defined(INFO) || defined(PING) + *logofs << "Proxy: WARNING! Detected shutdown waiting " + << "for the ping after " << diffIn / 1000 + << " seconds.\n" << logofs_flush; + #endif + + return -1; + } + else if (result > 0) + { + diffIn = diffTimestamp(timeouts_.readTs, nowTs); + + if (handleFlush() < 0) + { + return -1; + } + } + } + + if (diffIn >= (control -> PingTimeout * 2) - + control -> LatencyTimeout) + { + #if defined(TEST) || defined(INFO) || defined(PING) + *logofs << "Proxy: Detected congestion at " + << strMsTimestamp() << " with " << diffIn / 1000 + << " seconds since the last read.\n" + << logofs_flush; + #endif + + // + // There are two types of proxy congestion. The first, + // affecting the ability of the proxy to write the + // encoded data to the network, is controlled by the + // congestion_ flag. The flag is raised when no data + // is received from the remote proxy within a timeout. + // On the X client side, the flag is also raised when + // the proxy runs out of tokens. + // + + if (control -> ProxyMode == proxy_server) + { + // + // At X server side we must return to read data + // from the channels after a while, because we + // need to give a chance to the channel to read + // the key sequence CTRL+ALT+SHIFT+ESC. + // + + if (congestion_ == 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Forcibly entering congestion due to " + << "timeout with " << tokens_[token_control].remaining + << " tokens.\n" << logofs_flush; + #endif + + congestion_ = 1; + } + else + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Forcibly exiting congestion due to " + << "timeout with " << tokens_[token_control].remaining + << " tokens.\n" << logofs_flush; + #endif + + congestion_ = 0; + } + } + else + { + #if defined(TEST) || defined(INFO) + + if (congestion_ == 0) + { + *logofs << "Proxy: Entering congestion due to timeout " + << "with " << tokens_[token_control].remaining + << " tokens.\n" << logofs_flush; + } + + #endif + + congestion_ = 1; + } + + if (control -> ProxyTimeout > 0 && + diffIn >= (control -> ProxyTimeout - + control -> LatencyTimeout)) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! No data received from " + << "remote proxy on FD#" << fd_ << " within " + << (diffIn + control -> LatencyTimeout) / 1000 + << " seconds.\n" << logofs_flush; + #endif + + cerr << "Error" << ": No data received from remote " + << "proxy within " << (diffIn + control -> + LatencyTimeout) / 1000 << " seconds.\n"; + + HandleAbort(); + } + else + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: WARNING! No data received from " + << "remote proxy on FD#" << fd_ << " since " + << diffIn << " Ms.\n" << logofs_flush; + #endif + + if (control -> ProxyTimeout > 0 && + isTimestamp(timeouts_.alertTs) == 0 && + diffIn >= (control -> ProxyTimeout - + control -> LatencyTimeout) / 4) + { + // + // If we are in the middle of a shutdown + // procedure but the remote is not resp- + // onding, force the closure of the link. + // + + if (finish_ != 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! No response received from " + << "the remote proxy on FD#" << fd_ << " while " + << "waiting for the shutdown.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": No response received from remote " + << "proxy while waiting for the shutdown.\n"; + + HandleAbort(); + } + else + { + cerr << "Warning" << ": No data received from remote " + << "proxy within " << (diffIn + control -> + LatencyTimeout) / 1000 << " seconds.\n"; + + if (alert_ == 0) + { + if (control -> ProxyMode == proxy_client) + { + alert_ = CLOSE_DEAD_PROXY_CONNECTION_CLIENT_ALERT; + } + else + { + alert_ = CLOSE_DEAD_PROXY_CONNECTION_SERVER_ALERT; + } + + HandleAlert(alert_, 1); + } + + timeouts_.alertTs = nowTs; + } + } + } + } + + // + // Check if we need to update the congestion + // counter. + // + + int diffOut = diffTimestamp(timeouts_.writeTs, nowTs); + + if (agent_ != nothing && congestions_[agent_] == 0 && + statistics -> getCongestionInFrame() >= 1 && + diffOut >= (control -> IdleTimeout - + control -> LatencyTimeout * 5)) + { + #if defined(TEST) || defined(INFO) || defined(PING) + *logofs << "Proxy: Forcing an update of the " + << "congestion counter after timeout.\n" + << logofs_flush; + #endif + + statistics -> updateCongestion(tokens_[token_control].remaining, + tokens_[token_control].limit); + } + + // + // Send a new token if we didn't send any data to + // the remote for longer than the ping timeout. + // The client side sends a token, the server side + // responds with a token reply. + // + // VMWare virtual machines can have the system + // timer deadly broken. Try to send a ping regard- + // less we are the client or the server proxy to + // force a write by the remote. + // + + if (control -> ProxyMode == proxy_client || + diffIn >= (control -> PingTimeout * 4) - + control -> LatencyTimeout) + { + // + // We need to send a new ping even if we didn't + // receive anything from the remote within the + // ping timeout. The server side will respond + // to our ping, so we use the ping to force the + // remote end to send some data. + // + + if (diffIn >= (control -> PingTimeout - + control -> LatencyTimeout * 5) || + diffOut >= (control -> PingTimeout - + control -> LatencyTimeout * 5)) + { + int diffPing = diffTimestamp(timeouts_.pingTs, nowTs); + + if (diffPing < 0 || diffPing >= (control -> PingTimeout - + control -> LatencyTimeout * 5)) + { + #if defined(TEST) || defined(INFO) || defined(PING) + *logofs << "Proxy: Sending a new ping at " << strMsTimestamp() + << " with " << tokens_[token_control].remaining + << " tokens and elapsed in " << diffIn << " out " + << diffOut << " ping " << diffPing + << ".\n" << logofs_flush; + #endif + + if (handleFrame(frame_ping) < 0) + { + return -1; + } + + timeouts_.pingTs = nowTs; + } + #if defined(TEST) || defined(INFO) || defined(PING) + else + { + *logofs << "Proxy: Not sending a new ping with " + << "elapsed in " << diffIn << " out " + << diffOut << " ping " << diffPing + << ".\n" << logofs_flush; + } + #endif + } + } + + return 1; +} + +int Proxy::handleSyncFromProxy(int channelId) +{ + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: WARNING! Received a synchronization " + << "request from the remote proxy.\n" + << logofs_flush; + #endif + + if (handleControl(code_sync_reply, channelId) < 0) + { + return -1; + } + + return 1; +} + +int Proxy::handleResetStores() +{ + // + // Recreate the message stores. + // + + delete clientStore_; + delete serverStore_; + + clientStore_ = new ClientStore(compressor_); + serverStore_ = new ServerStore(compressor_); + + timeouts_.loadTs = nullTimestamp(); + + // + // Replace message stores in channels. + // + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] != NULL) + { + if (channels_[channelId] -> setStores(clientStore_, serverStore_) < 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Failed to replace message stores in " + << "channel for FD#" << getFd(channelId) << ".\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Failed to replace message stores in " + << "channel for FD#" << getFd(channelId) << ".\n"; + + return -1; + } + #ifdef TEST + else + { + *logofs << "Proxy: Replaced message stores in channel " + << "for FD#" << getFd(channelId) << ".\n" + << logofs_flush; + } + #endif + } + } + + return 1; +} + +int Proxy::handleResetPersistentCache() +{ + char *fullName = new char[strlen(control -> PersistentCachePath) + + strlen(control -> PersistentCacheName) + 2]; + + strcpy(fullName, control -> PersistentCachePath); + strcat(fullName, "/"); + strcat(fullName, control -> PersistentCacheName); + + #ifdef TEST + *logofs << "Proxy: Going to remove persistent cache file '" + << fullName << "'\n" << logofs_flush; + #endif + + unlink(fullName); + + delete [] fullName; + + delete [] control -> PersistentCacheName; + + control -> PersistentCacheName = NULL; + + return 1; +} + +void Proxy::handleResetFlush() +{ + #ifdef TEST + *logofs << "Proxy: Going to reset flush counters " + << "for proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + // + // Reset the proxy priority flag. + // + + priority_ = 0; + + // + // Restore buffers to their initial + // size. + // + + transport_ -> partialReset(); + + // + // Update the timestamp of the last + // write operation performed on the + // socket. + // + + timeouts_.writeTs = getTimestamp(); +} + +int Proxy::handleFinish() +{ + // + // Reset the timestamps to give the proxy + // another chance to show the 'no response' + // dialog if the shutdown message doesn't + // come in time. + // + + timeouts_.readTs = getTimestamp(); + + timeouts_.alertTs = nullTimestamp(); + + finish_ = 1; + + return 1; +} + +int Proxy::handleShutdown() +{ + // + // Send shutdown message to remote proxy. + // + + shutdown_ = 1; + + handleControl(code_shutdown_request); + + #ifdef TEST + *logofs << "Proxy: Starting shutdown procedure " + << "for proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + // + // Ensure that all the data accumulated + // in the transport buffer is flushed + // to the network layer. + // + + for (int i = 0; i < 100; i++) + { + if (canFlush() == 1) + { + handleFlush(); + } + else + { + break; + } + + usleep(100000); + } + + // + // Now wait for the network layers to + // consume all the data. + // + + for (int i = 0; i < 100; i++) + { + if (transport_ -> queued() <= 0) + { + break; + } + + usleep(100000); + } + + // + // Give time to the remote end to read + // the shutdown message and close the + // connection. + // + + transport_ -> wait(10000); + + #ifdef TEST + *logofs << "Proxy: Ending shutdown procedure " + << "for proxy FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + return 1; +} + +int Proxy::handleChannelConfiguration() +{ + if (activeChannels_.getSize() == 0) + { + #ifdef TEST + *logofs << "Proxy: Going to initialize the static " + << "members in channels for proxy FD#" + << fd_ << ".\n" << logofs_flush; + #endif + + Channel::setReferences(); + + ClientChannel::setReferences(); + ServerChannel::setReferences(); + + GenericChannel::setReferences(); + } + + return 1; +} + +int Proxy::handleSocketConfiguration() +{ + // + // Set linger mode on proxy to correctly + // get shutdown notification. + // + + SetLingerTimeout(fd_, 30); + + // + // Set keep-alive on socket so that if remote link + // terminates abnormally (as killed hard or because + // of a power-off) process will get a SIGPIPE. In + // practice this is useless as proxies already ping + // each other every few seconds. + // + + if (control -> OptionProxyKeepAlive == 1) + { + SetKeepAlive(fd_); + } + + // + // Set 'priority' flag at TCP layer for path + // proxy-to-proxy. Look at IPTOS_LOWDELAY in + // man 7 ip. + // + + if (control -> OptionProxyLowDelay == 1) + { + SetLowDelay(fd_); + } + + // + // Update size of TCP send and receive buffers. + // + + if (control -> OptionProxySendBuffer != -1) + { + SetSendBuffer(fd_, control -> OptionProxySendBuffer); + } + + if (control -> OptionProxyReceiveBuffer != -1) + { + SetReceiveBuffer(fd_, control -> OptionProxyReceiveBuffer); + } + + // + // Update TCP_NODELAY settings. Note that on old Linux + // kernels turning off the Nagle algorithm didn't work + // when proxy was run through a PPP link. Trying to do + // so caused the kernel to stop delivering data to us + // if a serious network congestion was encountered. + // + + if (control -> ProxyMode == proxy_client) + { + if (control -> OptionProxyClientNoDelay != -1) + { + SetNoDelay(fd_, control -> OptionProxyClientNoDelay); + } + } + else + { + if (control -> OptionProxyServerNoDelay != -1) + { + SetNoDelay(fd_, control -> OptionProxyServerNoDelay); + } + } + + return 1; +} + +int Proxy::handleLinkConfiguration() +{ + #ifdef TEST + *logofs << "Proxy: Propagating parameters to " + << "channels' read buffers.\n" + << logofs_flush; + #endif + + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] != NULL) + { + channels_[channelId] -> handleConfiguration(); + } + } + + #ifdef TEST + *logofs << "Proxy: Propagating parameters to " + << "proxy buffers.\n" + << logofs_flush; + #endif + + readBuffer_.setSize(control -> ProxyInitialReadSize, + control -> ProxyMaximumBufferSize); + + encodeBuffer_.setSize(control -> TransportProxyBufferSize, + control -> TransportProxyBufferThreshold, + control -> TransportMaximumBufferSize); + + transport_ -> setSize(control -> TransportProxyBufferSize, + control -> TransportProxyBufferThreshold, + control -> TransportMaximumBufferSize); + + #ifdef TEST + *logofs << "Proxy: Configuring the proxy timeouts.\n" + << logofs_flush; + #endif + + timeouts_.split = control -> SplitTimeout; + timeouts_.motion = control -> MotionTimeout; + + #ifdef TEST + *logofs << "Proxy: Configuring the proxy tokens.\n" + << logofs_flush; + #endif + + tokens_[token_control].size = control -> TokenSize; + tokens_[token_control].limit = control -> TokenLimit; + + if (tokens_[token_control].limit < 1) + { + tokens_[token_control].limit = 1; + } + + #if defined(TEST) || defined(INFO) || defined(LIMIT) + *logofs << "Proxy: TOKEN! LIMIT! Setting token [" + << DumpToken(token_control) << "] size to " + << tokens_[token_control].size << " and limit to " + << tokens_[token_control].limit << ".\n" + << logofs_flush; + #endif + + tokens_[token_split].size = control -> TokenSize; + tokens_[token_split].limit = control -> TokenLimit / 2; + + if (tokens_[token_split].limit < 1) + { + tokens_[token_split].limit = 1; + } + + #if defined(TEST) || defined(INFO) || defined(LIMIT) + *logofs << "Proxy: TOKEN! LIMIT! Setting token [" + << DumpToken(token_split) << "] size to " + << tokens_[token_split].size << " and limit to " + << tokens_[token_split].limit << ".\n" + << logofs_flush; + #endif + + tokens_[token_data].size = control -> TokenSize; + tokens_[token_data].limit = control -> TokenLimit / 4; + + if (tokens_[token_data].limit < 1) + { + tokens_[token_data].limit = 1; + } + + #if defined(TEST) || defined(INFO) || defined(LIMIT) + *logofs << "Proxy: TOKEN! LIMIT! Setting token [" + << DumpToken(token_data) << "] size to " + << tokens_[token_data].size << " and limit to " + << tokens_[token_data].limit << ".\n" + << logofs_flush; + #endif + + for (int i = token_control; i <= token_data; i++) + { + tokens_[i].remaining = tokens_[i].limit; + } + + #if defined(TEST) || defined(INFO) || defined(LIMIT) + *logofs << "Proxy: LIMIT! Using client bitrate " + << "limit " << control -> ClientBitrateLimit + << " server bitrate limit " << control -> + ServerBitrateLimit << " with local limit " + << control -> LocalBitrateLimit << ".\n" + << logofs_flush; + #endif + + // + // Set the other parameters based on + // the token size. + // + + int base = control -> TokenSize; + + control -> SplitDataThreshold = base * 4; + control -> SplitDataPacketLimit = base / 2; + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: LIMIT! Setting split data threshold " + << "to " << control -> SplitDataThreshold + << " split packet limit to " << control -> + SplitDataPacketLimit << " with base " + << base << ".\n" << logofs_flush; + #endif + + // + // Set the number of bytes read from the + // data channels at each loop. This will + // basically determine the maximum band- + // width available for the generic chan- + // nels. + // + + control -> GenericInitialReadSize = base / 2; + control -> GenericMaximumBufferSize = base / 2; + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: LIMIT! Setting generic channel " + << "initial read size to " << control -> + GenericInitialReadSize << " maximum read " + << "size to " << control -> GenericMaximumBufferSize + << " with base " << base << ".\n" + << logofs_flush; + #endif + + return 1; +} + +int Proxy::handleCacheConfiguration() +{ + #ifdef TEST + *logofs << "Proxy: Configuring cache according to pack parameters.\n" + << logofs_flush; + #endif + + // + // Further adjust the cache parameters. If + // packing of the images is enabled, reduce + // the size available for plain images. + // + + if (control -> SessionMode == session_agent) + { + if (control -> PackMethod != NO_PACK) + { + clientStore_ -> getRequestStore(X_PutImage) -> + cacheThreshold = PUTIMAGE_CACHE_THRESHOLD_IF_PACKED; + + clientStore_ -> getRequestStore(X_PutImage) -> + cacheLowerThreshold = PUTIMAGE_CACHE_LOWER_THRESHOLD_IF_PACKED; + } + } + + // + // If this is a shadow session increase the + // size of the image cache. + // + + if (control -> SessionMode == session_shadow) + { + if (control -> PackMethod != NO_PACK) + { + clientStore_ -> getRequestStore(X_NXPutPackedImage) -> + cacheThreshold = PUTPACKEDIMAGE_CACHE_THRESHOLD_IF_PACKED_SHADOW; + + clientStore_ -> getRequestStore(X_NXPutPackedImage) -> + cacheLowerThreshold = PUTPACKEDIMAGE_CACHE_LOWER_THRESHOLD_IF_PACKED_SHADOW; + } + else + { + clientStore_ -> getRequestStore(X_PutImage) -> + cacheThreshold = PUTIMAGE_CACHE_THRESHOLD_IF_SHADOW; + + clientStore_ -> getRequestStore(X_PutImage) -> + cacheLowerThreshold = PUTIMAGE_CACHE_LOWER_THRESHOLD_IF_SHADOW; + } + } + + return 1; +} + +int Proxy::handleSaveStores() +{ + // + // Save content of stores on disk. + // + + char *cacheToAdopt = NULL; + + if (control -> PersistentCacheEnableSave) + { + #ifdef TEST + *logofs << "Proxy: Going to save content of client store.\n" + << logofs_flush; + #endif + + cacheToAdopt = handleSaveAllStores(control -> PersistentCachePath); + } + #ifdef TEST + else + { + if (control -> ProxyMode == proxy_client) + { + *logofs << "Proxy: Saving persistent cache to disk disabled.\n" + << logofs_flush; + } + else + { + *logofs << "Proxy: PANIC! Protocol violation in command save.\n" + << logofs_flush; + + cerr << "Error" << ": Protocol violation in command save.\n"; + + HandleCleanup(); + } + } + #endif + + if (cacheToAdopt != NULL) + { + // + // Do we have a cache already? + // + + if (control -> PersistentCacheName != NULL) + { + // + // Check if old and new cache are the same. + // In this case don't remove the old cache. + // + + if (strcasecmp(control -> PersistentCacheName, cacheToAdopt) != 0) + { + handleResetPersistentCache(); + } + + delete [] control -> PersistentCacheName; + } + + #ifdef TEST + *logofs << "Proxy: Setting current persistent cache file to '" + << cacheToAdopt << "'\n" << logofs_flush; + #endif + + control -> PersistentCacheName = cacheToAdopt; + + return 1; + } + #ifdef TEST + else + { + *logofs << "Proxy: No cache file produced from message stores.\n" + << logofs_flush; + } + #endif + + // + // It can be that we didn't generate a new cache + // because store was too small or persistent cache + // was disabled. This is not an error. + // + + return 0; +} + +int Proxy::handleLoadStores() +{ + // + // Restore the content of the client store + // from disk if a valid cache was negotiated + // at session startup. + // + + if (control -> PersistentCacheEnableLoad == 1 && + control -> PersistentCachePath != NULL && + control -> PersistentCacheName != NULL) + { + #ifdef TEST + *logofs << "Proxy: Going to load content of client store.\n" + << logofs_flush; + #endif + + // + // Returns the same string passed as name of + // the cache, or NULL if it was not possible + // to load the cache from disk. + // + + if (handleLoadAllStores(control -> PersistentCachePath, + control -> PersistentCacheName) == NULL) + { + // + // The corrupted cache should have been + // removed from disk. Get rid of the + // reference so we don't try to delete + // it once again. + // + + if (control -> PersistentCacheName != NULL) + { + delete [] control -> PersistentCacheName; + } + + control -> PersistentCacheName = NULL; + + return -1; + } + + // + // Set timestamp of last time cache + // was loaded from data on disk. + // + + timeouts_.loadTs = getTimestamp(); + + return 1; + } + #ifdef TEST + else + { + if (control -> ProxyMode == proxy_client) + { + *logofs << "Proxy: Loading of cache disabled or no cache file selected.\n" + << logofs_flush; + } + else + { + *logofs << "Proxy: PANIC! Protocol violation in command load.\n" + << logofs_flush; + + cerr << "Error" << ": Protocol violation in command load.\n"; + + HandleCleanup(); + } + } + #endif + + return 0; +} + +int Proxy::handleControl(T_proxy_code code, int data) +{ + // + // Send the given control messages + // to the remote proxy. + // + + #if defined(TEST) || defined(INFO) + + if (data != -1) + { + if (code == code_control_token_reply || + code == code_split_token_reply || + code == code_data_token_reply) + { + *logofs << "Proxy: TOKEN! Sending message '" << DumpControl(code) + << "' at " << strMsTimestamp() << " with count " + << data << ".\n" << logofs_flush; + } + else + { + *logofs << "Proxy: Sending message '" << DumpControl(code) + << "' at " << strMsTimestamp() << " with data ID#" + << data << ".\n" << logofs_flush; + } + } + else + { + *logofs << "Proxy: Sending message '" << DumpControl(code) + << "' at " << strMsTimestamp() << ".\n" + << logofs_flush; + } + + #endif + + // + // Add the control message and see if the + // data has to be flushed immediately. + // + + if (addControlCodes(code, data) < 0) + { + return -1; + } + + switch (code) + { + // + // Append the first data read from the opened + // channel to the control code. + // + + case code_new_x_connection: + case code_new_cups_connection: + case code_new_aux_connection: + case code_new_smb_connection: + case code_new_media_connection: + case code_new_http_connection: + case code_new_font_connection: + case code_new_slave_connection: + + // + // Do we send the token reply immediately? + // The control messages are put at the begin- + // ning of the of the encode buffer, so we may + // reply to multiple tokens before having the + // chance of handling the actual frame data. + // On the other hand, the sooner we reply, the + // sooner the remote proxy is restarted. + // + + case code_control_token_reply: + case code_split_token_reply: + case code_data_token_reply: + { + break; + } + + // + // Also send the congestion control codes + // immediately. + // + // case code_begin_congestion: + // case code_end_congestion: + // + + default: + { + priority_ = 1; + + break; + } + } + + if (priority_ == 1) + { + if (handleFrame(frame_data) < 0) + { + return -1; + } + } + + return 1; +} + +int Proxy::handleSwitch(int channelId) +{ + // + // If data is for a different channel than last + // selected for output, prepend to the data the + // new channel id. + // + + #ifdef DEBUG + *logofs << "Proxy: Requested a switch with " + << "current channel ID#" << outputChannel_ + << " new channel ID#" << channelId << ".\n" + << logofs_flush; + #endif + + if (channelId != outputChannel_) + { + if (needFlush() == 1) + { + #if defined(TEST) || defined(INFO) || defined(FLUSH) + *logofs << "Proxy: WARNING! Flushing data for the " + << "previous channel ID#" << outputChannel_ + << ".\n" << logofs_flush; + #endif + + if (handleFrame(frame_data) < 0) + { + return -1; + } + } + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Sending message '" + << DumpControl(code_switch_connection) << "' at " + << strMsTimestamp() << " with FD#" << getFd(channelId) + << " channel ID#" << channelId << ".\n" + << logofs_flush; + #endif + + if (addControlCodes(code_switch_connection, channelId) < 0) + { + return -1; + } + + outputChannel_ = channelId; + } + + return 1; +} + +int Proxy::addTokenCodes(T_proxy_token &token) +{ + #if defined(TEST) || defined(INFO) || defined(TOKEN) + *logofs << "Proxy: TOKEN! Sending token [" + << DumpToken(token.type) << "] with " + << token.bytes << " bytes accumulated size " + << token.size << " and " << token.remaining + << " available.\n" << logofs_flush; + #endif + + // + // Give a 'weight' to the token. The tokens + // remaining can become negative if we sent + // a packet that was exceptionally big. + // + + int count = 0; + + if (control -> isProtoStep7() == 1) + { + count = token.bytes / token.size; + + if (count > 255) + { + count = 255; + } + } + + // + // Force a count of 1, for example + // if this is a ping. + // + + if (count < 1) + { + count = 1; + + token.bytes = 0; + } + else + { + // + // Let the next token account for the + // remaining bytes. + // + + token.bytes %= token.size; + } + + #if defined(TEST) || defined(INFO) || defined(TOKEN) + *logofs << "Proxy: Sending message '" + << DumpControl(token.request) << "' at " + << strMsTimestamp() << " with count " << count + << ".\n" << logofs_flush; + #endif + + controlCodes_[controlLength_++] = 0; + controlCodes_[controlLength_++] = (unsigned char) token.request; + controlCodes_[controlLength_++] = (unsigned char) count; + + statistics -> addFrameOut(); + + token.remaining -= count; + + return 1; +} + +int Proxy::handleToken(T_frame_type type) +{ + #if defined(TEST) || defined(INFO) || defined(TOKEN) + *logofs << "Proxy: TOKEN! Checking tokens with " + << "frame type ["; + + *logofs << (type == frame_ping ? "frame_ping" : "frame_data"); + + *logofs << "] with stream ratio " << statistics -> + getStreamRatio() << ".\n" << logofs_flush; + #endif + + if (type == frame_data) + { + if (control -> isProtoStep7() == 1) + { + // + // Send a distinct token for each data type. + // We don't want to slow down the sending of + // the X events, X replies and split confir- + // mation events on the X server side, so + // take care only of the generic data token. + // + + if (control -> ProxyMode == proxy_client) + { + statistics -> updateControlToken(tokens_[token_control].bytes); + + if (tokens_[token_control].bytes > tokens_[token_control].size) + { + if (addTokenCodes(tokens_[token_control]) < 0) + { + return -1; + } + + #if defined(TEST) || defined(INFO) || defined(TOKEN) + + T_proxy_token &token = tokens_[token_control]; + + *logofs << "Proxy: TOKEN! Token class [" + << DumpToken(token.type) << "] has now " + << token.bytes << " bytes accumulated and " + << token.remaining << " tokens remaining.\n" + << logofs_flush; + #endif + } + + statistics -> updateSplitToken(tokens_[token_split].bytes); + + if (tokens_[token_split].bytes > tokens_[token_split].size) + { + if (addTokenCodes(tokens_[token_split]) < 0) + { + return -1; + } + + #if defined(TEST) || defined(INFO) || defined(TOKEN) + + T_proxy_token &token = tokens_[token_split]; + + *logofs << "Proxy: TOKEN! Token class [" + << DumpToken(token.type) << "] has now " + << token.bytes << " bytes accumulated and " + << token.remaining << " tokens remaining.\n" + << logofs_flush; + #endif + } + } + + statistics -> updateDataToken(tokens_[token_data].bytes); + + if (tokens_[token_data].bytes > tokens_[token_data].size) + { + if (addTokenCodes(tokens_[token_data]) < 0) + { + return -1; + } + + #if defined(TEST) || defined(INFO) || defined(TOKEN) + + T_proxy_token &token = tokens_[token_data]; + + *logofs << "Proxy: TOKEN! Token class [" + << DumpToken(token.type) << "] has now " + << token.bytes << " bytes accumulated and " + << token.remaining << " tokens remaining.\n" + << logofs_flush; + #endif + } + } + else + { + // + // Sum everything to the control token. + // + + if (control -> ProxyMode == proxy_client) + { + statistics -> updateControlToken(tokens_[token_control].bytes); + statistics -> updateSplitToken(tokens_[token_control].bytes); + statistics -> updateDataToken(tokens_[token_control].bytes); + + if (tokens_[token_control].bytes > tokens_[token_control].size) + { + if (addTokenCodes(tokens_[token_control]) < 0) + { + return -1; + } + + #if defined(TEST) || defined(INFO) || defined(TOKEN) + + T_proxy_token &token = tokens_[token_control]; + + *logofs << "Proxy: TOKEN! Token class [" + << DumpToken(token.type) << "] has now " + << token.bytes << " bytes accumulated and " + << token.remaining << " tokens remaining.\n" + << logofs_flush; + #endif + } + } + } + } + else + { + if (addTokenCodes(tokens_[token_control]) < 0) + { + return -1; + } + + // + // Reset all counters on a ping. + // + + tokens_[token_control].bytes = 0; + tokens_[token_split].bytes = 0; + tokens_[token_data].bytes = 0; + + #if defined(TEST) || defined(INFO) || defined(TOKEN) + + T_proxy_token &token = tokens_[token_control]; + + *logofs << "Proxy: TOKEN! Token class [" + << DumpToken(token.type) << "] has now " + << token.bytes << " bytes accumulated and " + << token.remaining << " tokens remaining.\n" + << logofs_flush; + #endif + } + + // + // Check if we have entered in + // congestion state. + // + + if (congestion_ == 0 && + tokens_[token_control].remaining <= 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Entering congestion with " + << tokens_[token_control].remaining + << " tokens remaining.\n" << logofs_flush; + #endif + + congestion_ = 1; + } + + statistics -> updateCongestion(tokens_[token_control].remaining, + tokens_[token_control].limit); + + return 1; +} + +int Proxy::handleTokenFromProxy(T_proxy_token &token, int count) +{ + #if defined(TEST) || defined(INFO) || defined(TOKEN) + *logofs << "Proxy: TOKEN! Received token [" + << DumpToken(token.type) << "] request at " + << strMsTimestamp() << " with count " + << count << ".\n" << logofs_flush; + #endif + + if (control -> isProtoStep7() == 0) + { + if (control -> ProxyMode == proxy_client || + token.request != code_control_token_request) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Invalid token request received from remote.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Invalid token request received from remote.\n"; + + HandleCleanup(); + } + } + + // + // Add our token reply. + // + + if (handleControl(token.reply, count) < 0) + { + return -1; + } + + return 1; +} + +int Proxy::handleTokenReplyFromProxy(T_proxy_token &token, int count) +{ + #if defined(TEST) || defined(INFO) || defined(TOKEN) + *logofs << "Proxy: TOKEN! Received token [" + << DumpToken(token.type) << "] reply at " + << strMsTimestamp() << " with count " << count + << ".\n" << logofs_flush; + #endif + + // + // Increment the available tokens. + // + + if (control -> isProtoStep7() == 0) + { + if (token.reply != code_control_token_reply) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Invalid token reply received from remote.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Invalid token reply received from remote.\n"; + + HandleCleanup(); + } + + count = 1; + } + + token.remaining += count; + + if (token.remaining > token.limit) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Token overflow handling messages.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Token overflow handling messages.\n"; + + HandleCleanup(); + } + + #if defined(TEST) || defined(INFO) || defined(TOKEN) + *logofs << "Proxy: TOKEN! Token class [" + << DumpToken(token.type) << "] has now " << token.bytes + << " bytes accumulated and " << token.remaining + << " tokens remaining.\n" << logofs_flush; + #endif + + // + // Check if we can jump out of the + // congestion state. + // + + if (congestion_ == 1 && + tokens_[token_control].remaining > 0) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: Exiting congestion with " + << tokens_[token_control].remaining + << " tokens remaining.\n" << logofs_flush; + #endif + + congestion_ = 0; + } + + statistics -> updateCongestion(tokens_[token_control].remaining, + tokens_[token_control].limit); + + return 1; +} + +void Proxy::handleFailOnSave(const char *fullName, const char *failContext) const +{ + #ifdef WARNING + *logofs << "Proxy: WARNING! Error saving stores to cache file " + << "in context [" << failContext << "].\n" << logofs_flush; + #endif + + cerr << "Warning" << ": Error saving stores to cache file " + << "in context [" << failContext << "].\n"; + + #ifdef WARNING + *logofs << "Proxy: WARNING! Removing invalid cache '" + << fullName << "'.\n" << logofs_flush; + #endif + + cerr << "Warning" << ": Removing invalid cache '" + << fullName << "'.\n"; + + unlink(fullName); +} + +void Proxy::handleFailOnLoad(const char *fullName, const char *failContext) const +{ + #ifdef WARNING + *logofs << "Proxy: WARNING! Error loading stores from cache file " + << "in context [" << failContext << "].\n" << logofs_flush; + #endif + + cerr << "Warning" << ": Error loading stores from cache file " + << "in context [" << failContext << "].\n"; + + #ifdef WARNING + *logofs << "Proxy: WARNING! Removing invalid cache '" + << fullName << "'.\n" << logofs_flush; + #endif + + cerr << "Warning" << ": Removing invalid cache '" + << fullName << "'.\n"; + + unlink(fullName); +} + +int Proxy::handleSaveVersion(unsigned char *buffer, int &major, + int &minor, int &patch) const +{ + if (control -> isProtoStep8() == 1) + { + major = 3; + minor = 0; + patch = 0; + } + else if (control -> isProtoStep7() == 1) + { + major = 2; + minor = 0; + patch = 0; + } + else + { + major = 1; + minor = 4; + patch = 0; + } + + *(buffer + 0) = major; + *(buffer + 1) = minor; + + PutUINT(patch, buffer + 2, storeBigEndian()); + + return 1; +} + +int Proxy::handleLoadVersion(const unsigned char *buffer, int &major, + int &minor, int &patch) const +{ + major = *(buffer + 0); + minor = *(buffer + 1); + + patch = GetUINT(buffer + 2, storeBigEndian()); + + // + // Force the proxy to discard the + // incompatible caches. + // + + if (control -> isProtoStep8() == 1) + { + if (major < 3) + { + return -1; + } + } + else if (control -> isProtoStep7() == 1) + { + if (major < 2) + { + return -1; + } + } + else + { + if (major != 1 && minor != 4) + { + return -1; + } + } + + return 1; +} + +char *Proxy::handleSaveAllStores(const char *savePath) const +{ + int cumulativeSize = MessageStore::getCumulativeTotalStorageSize(); + + if (cumulativeSize < control -> PersistentCacheThreshold) + { + #ifdef TEST + *logofs << "Proxy: Cache not saved as size is " + << cumulativeSize << " with threshold set to " + << control -> PersistentCacheThreshold + << ".\n" << logofs_flush; + #endif + + return NULL; + } + else if (savePath == NULL) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! No name provided for save path.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": No name provided for save path.\n"; + + return NULL; + } + + #ifdef TEST + *logofs << "Proxy: Going to save content of message stores.\n" + << logofs_flush; + #endif + + // + // Our parent process is likely going to terminate. + // Until we finish saving cache we must ignore its + // SIGIPE. + // + + DisableSignals(); + + ofstream *cachefs = NULL; + + md5_state_t *md5StateStream = NULL; + md5_byte_t *md5DigestStream = NULL; + + md5_state_t *md5StateClient = NULL; + md5_byte_t *md5DigestClient = NULL; + + char *tempName = NULL; + + char md5String[MD5_LENGTH * 2 + 2]; + + char fullName[strlen(savePath) + MD5_LENGTH * 2 + 4]; + + if (control -> ProxyMode == proxy_client) + { + tempName = tempnam(savePath, "Z-C-"); + } + else + { + tempName = tempnam(savePath, "Z-S-"); + } + + // + // Change the mask to make the file only + // readable by the user, then restore the + // old mask. + // + + mode_t fileMode = umask(0077); + + cachefs = new ofstream(tempName, ios::out | ios::binary); + + umask(fileMode); + + if (tempName == NULL || cachefs == NULL) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Can't create temporary file in '" + << savePath << "'.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Can't create temporary file in '" + << savePath << "'.\n"; + + if (tempName != NULL) + { + free(tempName); + } + + if (cachefs != NULL) + { + delete cachefs; + } + + EnableSignals(); + + return NULL; + } + + md5StateStream = new md5_state_t(); + md5DigestStream = new md5_byte_t[MD5_LENGTH]; + + md5_init(md5StateStream); + + // + // First write the proxy version. + // + + unsigned char version[4]; + + int major; + int minor; + int patch; + + handleSaveVersion(version, major, minor, patch); + + #ifdef TEST + *logofs << "Proxy: Saving cache using version '" + << major << "." << minor << "." << patch + << "'.\n" << logofs_flush; + #endif + + if (PutData(cachefs, version, 4) < 0) + { + handleFailOnSave(tempName, "A"); + + delete cachefs; + + delete md5StateStream; + delete [] md5DigestStream; + + free(tempName); + + EnableSignals(); + + return NULL; + } + + // + // Make space for the calculated MD5 so we + // can later rewind the file and write it + // at this position. + // + + if (PutData(cachefs, md5DigestStream, MD5_LENGTH) < 0) + { + handleFailOnSave(tempName, "B"); + + delete cachefs; + + delete md5StateStream; + delete [] md5DigestStream; + + free(tempName); + + EnableSignals(); + + return NULL; + } + + md5StateClient = new md5_state_t(); + md5DigestClient = new md5_byte_t[MD5_LENGTH]; + + md5_init(md5StateClient); + + #ifdef DUMP + + ofstream *cacheDump = NULL; + + ofstream *tempfs = (ofstream*) logofs; + + char cacheDumpName[DEFAULT_STRING_LENGTH]; + + if (control -> ProxyMode == proxy_client) + { + snprintf(cacheDumpName, DEFAULT_STRING_LENGTH - 1, + "%s/client-cache-dump", control -> TempPath); + } + else + { + snprintf(cacheDumpName, DEFAULT_STRING_LENGTH - 1, + "%s/server-cache-dump", control -> TempPath); + } + + *(cacheDumpName + DEFAULT_STRING_LENGTH - 1) = '\0'; + + mode_t fileMode = umask(0077); + + cacheDump = new ofstream(cacheDumpName, ios::out); + + umask(fileMode); + + logofs = cacheDump; + + #endif + + // + // Use the virtual method of the concrete proxy class. + // + + int allSaved = handleSaveAllStores(cachefs, md5StateStream, md5StateClient); + + #ifdef DUMP + + logofs = tempfs; + + delete cacheDump; + + #endif + + if (allSaved == 0) + { + handleFailOnSave(tempName, "C"); + + delete cachefs; + + delete md5StateStream; + delete [] md5DigestStream; + + delete md5StateClient; + delete [] md5DigestClient; + + free(tempName); + + EnableSignals(); + + return NULL; + } + + md5_finish(md5StateClient, md5DigestClient); + + for (unsigned int i = 0; i < MD5_LENGTH; i++) + { + sprintf(md5String + (i * 2), "%02X", md5DigestClient[i]); + } + + strcpy(fullName, (control -> ProxyMode == proxy_client) ? "C-" : "S-"); + + strcat(fullName, md5String); + + md5_append(md5StateStream, (const md5_byte_t *) fullName, strlen(fullName)); + md5_finish(md5StateStream, md5DigestStream); + + // + // Go to the beginning of file plus + // the integer where we wrote our + // proxy version. + // + + cachefs -> seekp(4); + + if (PutData(cachefs, md5DigestStream, MD5_LENGTH) < 0) + { + handleFailOnSave(tempName, "D"); + + delete cachefs; + + delete md5StateStream; + delete [] md5DigestStream; + + delete md5StateClient; + delete [] md5DigestClient; + + free(tempName); + + EnableSignals(); + + return NULL; + } + + delete cachefs; + + // + // Copy the resulting cache name without + // the path so that we can return it to + // the caller. + // + + char *cacheName = new char[MD5_LENGTH * 2 + 4]; + + strcpy(cacheName, fullName); + + // + // Add the path to the full name and move + // the cache into the path. + // + + strcpy(fullName, savePath); + strcat(fullName, (control -> ProxyMode == proxy_client) ? "/C-" : "/S-"); + strcat(fullName, md5String); + + #ifdef TEST + *logofs << "Proxy: Renaming cache file from '" + << tempName << "' to '" << fullName + << "'.\n" << logofs_flush; + #endif + + rename(tempName, fullName); + + delete md5StateStream; + delete [] md5DigestStream; + + delete md5StateClient; + delete [] md5DigestClient; + + free(tempName); + + // + // Restore the original handlers. + // + + EnableSignals(); + + #ifdef TEST + *logofs << "Proxy: Successfully saved cache file '" + << cacheName << "'.\n" << logofs_flush; + #endif + + // + // This must be enabled only for test + // because it requires that client and + // server reside on the same machine. + // + + if (control -> PersistentCacheCheckOnShutdown == 1 && + control -> ProxyMode == proxy_server) + { + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: MATCH! Checking if the file '" + << fullName << "' matches a client cache.\n" + << logofs_flush; + #endif + + strcpy(fullName, savePath); + strcat(fullName, "/C-"); + strcat(fullName, md5String); + + struct stat fileStat; + + if (stat(fullName, &fileStat) != 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Can't find a client cache " + << "with name '" << fullName << "'.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Can't find a client cache " + << "with name '" << fullName << "'.\n"; + + HandleShutdown(); + } + + #if defined(TEST) || defined(INFO) + *logofs << "Proxy: MATCH! Client cache '" << fullName + << "' matches the local cache.\n" + << logofs_flush; + #endif + } + + return cacheName; +} + +const char *Proxy::handleLoadAllStores(const char *loadPath, const char *loadName) const +{ + #ifdef TEST + *logofs << "Proxy: Going to load content of message stores.\n" + << logofs_flush; + #endif + + // + // Until we finish loading cache we + // must at least ignore any SIGIPE. + // + + DisableSignals(); + + if (loadPath == NULL || loadName == NULL) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! No path or no file name provided for cache to restore.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": No path or no file name provided for cache to restore.\n"; + + EnableSignals(); + + return NULL; + } + else if (strlen(loadName) != MD5_LENGTH * 2 + 2) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Bad file name provided for cache to restore.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Bad file name provided for cache to restore.\n"; + + EnableSignals(); + + return NULL; + } + + istream *cachefs = NULL; + char md5String[(MD5_LENGTH * 2) + 2]; + md5_byte_t md5FromFile[MD5_LENGTH]; + + char *cacheName = new char[strlen(loadPath) + strlen(loadName) + 3]; + + strcpy(cacheName, loadPath); + strcat(cacheName, "/"); + strcat(cacheName, loadName); + + #ifdef TEST + *logofs << "Proxy: Name of cache file is '" + << cacheName << "'.\n" << logofs_flush; + #endif + + cachefs = new ifstream(cacheName, ios::in | ios::binary); + + unsigned char version[4]; + + if (cachefs == NULL || GetData(cachefs, version, 4) < 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Can't read cache file '" + << cacheName << "'.\n" << logofs_flush;; + #endif + + handleFailOnLoad(cacheName, "A"); + + delete cachefs; + + delete [] cacheName; + + EnableSignals(); + + return NULL; + } + + int major; + int minor; + int patch; + + if (handleLoadVersion(version, major, minor, patch) < 0) + { + #ifdef PANIC + *logofs << "Proxy: WARNING! Incompatible version '" + << major << "." << minor << "." << patch + << "' in cache file '" << cacheName + << "'.\n" << logofs_flush; + #endif + + cerr << "Warning" << ": Incompatible version '" + << major << "." << minor << "." << patch + << "' in cache file '" << cacheName + << "'.\n" << logofs_flush; + + if (control -> ProxyMode == proxy_server) + { + handleFailOnLoad(cacheName, "B"); + } + else + { + // + // Simply remove the cache file. + // + + unlink(cacheName); + } + + delete cachefs; + + delete [] cacheName; + + EnableSignals(); + + return NULL; + } + + #ifdef TEST + *logofs << "Proxy: Reading from cache file version '" + << major << "." << minor << "." << patch + << "'.\n" << logofs_flush; + #endif + + if (GetData(cachefs, md5FromFile, MD5_LENGTH) < 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! No checksum in cache file '" + << loadName << "'.\n" << logofs_flush; + #endif + + handleFailOnLoad(cacheName, "C"); + + delete cachefs; + + delete [] cacheName; + + EnableSignals(); + + return NULL; + } + + md5_state_t *md5StateStream = NULL; + md5_byte_t *md5DigestStream = NULL; + + md5StateStream = new md5_state_t(); + md5DigestStream = new md5_byte_t[MD5_LENGTH]; + + md5_init(md5StateStream); + + // + // Use the virtual method of the proxy class. + // + + if (handleLoadAllStores(cachefs, md5StateStream) < 0) + { + handleFailOnLoad(cacheName, "D"); + + delete cachefs; + + delete md5StateStream; + delete [] md5DigestStream; + + delete [] cacheName; + + EnableSignals(); + + return NULL; + } + + md5_append(md5StateStream, (const md5_byte_t *) loadName, strlen(loadName)); + md5_finish(md5StateStream, md5DigestStream); + + for (int i = 0; i < MD5_LENGTH; i++) + { + if (md5DigestStream[i] != md5FromFile[i]) + { + #ifdef PANIC + + *logofs << "Proxy: PANIC! Bad checksum for cache file '" + << cacheName << "'.\n" << logofs_flush; + + for (unsigned int i = 0; i < MD5_LENGTH; i++) + { + sprintf(md5String + (i * 2), "%02X", md5FromFile[i]); + } + + *logofs << "Proxy: PANIC! Saved checksum is '" + << md5String << "'.\n" << logofs_flush; + + for (unsigned int i = 0; i < MD5_LENGTH; i++) + { + sprintf(md5String + (i * 2),"%02X", md5DigestStream[i]); + } + + *logofs << "Proxy: PANIC! Calculated checksum is '" + << md5String << "'.\n" << logofs_flush; + + #endif + + handleFailOnLoad(cacheName, "E"); + + delete cachefs; + + delete md5StateStream; + delete [] md5DigestStream; + + delete [] cacheName; + + EnableSignals(); + + return NULL; + } + } + + delete cachefs; + + delete md5StateStream; + delete [] md5DigestStream; + + delete [] cacheName; + + // + // Restore the original handlers. + // + + EnableSignals(); + + #ifdef TEST + *logofs << "Proxy: Successfully loaded cache file '" + << loadName << "'.\n" << logofs_flush; + #endif + + // + // Return the string provided by caller. + // + + return loadName; +} + +int Proxy::allocateChannelMap(int fd) +{ + // + // We assume that the fd is lower than + // the maximum allowed number. This is + // checked at the time the connection + // is accepted. + // + + if (fd < 0 || fd >= CONNECTIONS_LIMIT) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Internal error allocating " + << "new channel with FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Internal error allocating " + << "new channel with FD#" << fd_ << ".\n"; + + HandleCleanup(); + } + + for (int channelId = 0; + channelId < CONNECTIONS_LIMIT; + channelId++) + { + if (checkLocalChannelMap(channelId) == 1 && + fdMap_[channelId] == -1) + { + fdMap_[channelId] = fd; + channelMap_[fd] = channelId; + + #ifdef TEST + *logofs << "Proxy: Allocated new channel ID#" + << channelId << " with FD#" << fd << ".\n" + << logofs_flush; + #endif + + return channelId; + } + } + + // + // No available channel is remaining. + // + + #ifdef TEST + *logofs << "Proxy: WARNING! Can't allocate a new channel " + << "for FD#" << fd_ << ".\n" << logofs_flush; + #endif + + return -1; +} + +int Proxy::checkChannelMap(int channelId) +{ + // + // To be acceptable, the channel id must + // be an id that is not possible to use + // to allocate channels at this side. + // + + if (checkLocalChannelMap(channelId) == 1) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Can't open a new channel " + << "with invalid ID#" << channelId << ".\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Can't open a new channel " + << "with invalid ID#" << channelId << ".\n"; + + return -1; + } + else if (channels_[channelId] != NULL) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Can't open a new channel " + << "over an existing ID#" << channelId + << " with FD#" << getFd(channelId) + << ".\n" << logofs_flush; + #endif + + cerr << "Error" << ": Can't open a new channel " + << "over an existing ID#" << channelId + << " with FD#" << getFd(channelId) + << ".\n"; + + return -1; + } + + return 1; +} + +int Proxy::assignChannelMap(int channelId, int fd) +{ + // + // We assume that the fd is lower than + // the maximum allowed number. This is + // checked at the time the connection + // is accepted. + // + + if (channelId < 0 || channelId >= CONNECTIONS_LIMIT || + fd < 0 || fd >= CONNECTIONS_LIMIT) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Internal error assigning " + << "new channel with FD#" << fd_ << ".\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Internal error assigning " + << "new channel with FD#" << fd_ << ".\n"; + + HandleCleanup(); + } + + fdMap_[channelId] = fd; + channelMap_[fd] = channelId; + + return 1; +} + +void Proxy::cleanupChannelMap(int channelId) +{ + int fd = fdMap_[channelId]; + + if (fd != -1) + { + fdMap_[channelId] = -1; + channelMap_[fd] = -1; + } +} + +int Proxy::addControlCodes(T_proxy_code code, int data) +{ + // + // Flush the encode buffer plus all the outstanding + // control codes if there is not enough space for + // the new control message. We need to ensure that + // there are further bytes available, in the case + // we will need to add more token control messages. + // + + if (controlLength_ + 3 > CONTROL_CODES_THRESHOLD) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! Flushing control messages " + << "while sending code '" << DumpControl(code) + << "'.\n" << logofs_flush; + #endif + + if (handleFlush() < 0) + { + return -1; + } + } + + controlCodes_[controlLength_++] = 0; + controlCodes_[controlLength_++] = (unsigned char) code; + controlCodes_[controlLength_++] = (unsigned char) (data == -1 ? 0 : data); + + // + // Account for the control frame. + // + + statistics -> addFrameOut(); + + return 1; +} + +void Proxy::setSplitTimeout(int channelId) +{ + int needed = channels_[channelId] -> needSplit(); + + if (needed != isTimestamp(timeouts_.splitTs)) + { + if (needed == 1) + { + #if defined(TEST) || defined(INFO) || defined(SPLIT) + *logofs << "Proxy: SPLIT! Allocating split timestamp at " + << strMsTimestamp() << ".\n" << logofs_flush; + #endif + + timeouts_.splitTs = getTimestamp(); + } + else + { + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] != NULL && + channels_[channelId] -> needSplit() == 1) + { + #ifdef TEST + *logofs << "Proxy: SPLIT! Channel for FD#" + << getFd(channelId) << " still needs splits.\n" + << logofs_flush; + #endif + + return; + } + } + + #if defined(TEST) || defined(INFO) || defined(SPLIT) + *logofs << "Proxy: SPLIT! Resetting split timestamp at " + << strMsTimestamp() << ".\n" << logofs_flush; + #endif + + timeouts_.splitTs = nullTimestamp(); + } + } +} + +void Proxy::setMotionTimeout(int channelId) +{ + int needed = channels_[channelId] -> needMotion(); + + if (needed != isTimestamp(timeouts_.motionTs)) + { + if (channels_[channelId] -> needMotion() == 1) + { + #if defined(TEST) || defined(INFO) || defined(SPLIT) + *logofs << "Proxy: Allocating motion timestamp at " + << strMsTimestamp() << ".\n" << logofs_flush; + #endif + + timeouts_.motionTs = getTimestamp(); + } + else + { + T_list &channelList = activeChannels_.getList(); + + for (T_list::iterator j = channelList.begin(); + j != channelList.end(); j++) + { + int channelId = *j; + + if (channels_[channelId] != NULL && + channels_[channelId] -> needMotion() == 1) + { + #ifdef TEST + *logofs << "Proxy: SPLIT! Channel for FD#" + << getFd(channelId) << " still needs motions.\n" + << logofs_flush; + #endif + + return; + } + } + + #if defined(TEST) || defined(INFO) || defined(SPLIT) + *logofs << "Proxy: Resetting motion timestamp at " + << strMsTimestamp() << ".\n" << logofs_flush; + #endif + + timeouts_.motionTs = nullTimestamp(); + } + } +} + +void Proxy::increaseChannels(int channelId) +{ + #ifdef TEST + *logofs << "Proxy: Adding channel " << channelId + << " to the list of active channels.\n" + << logofs_flush; + #endif + + activeChannels_.add(channelId); + + #ifdef TEST + *logofs << "Proxy: There are " << activeChannels_.getSize() + << " allocated channels for proxy FD#" << fd_ + << ".\n" << logofs_flush; + #endif +} + +void Proxy::decreaseChannels(int channelId) +{ + #ifdef TEST + *logofs << "Proxy: Removing channel " << channelId + << " from the list of active channels.\n" + << logofs_flush; + #endif + + activeChannels_.remove(channelId); + + #ifdef TEST + *logofs << "Proxy: There are " << activeChannels_.getSize() + << " allocated channels for proxy FD#" << fd_ + << ".\n" << logofs_flush; + #endif +} + +int Proxy::allocateTransport(int channelFd, int channelId) +{ + if (transports_[channelId] == NULL) + { + transports_[channelId] = new Transport(channelFd); + + if (transports_[channelId] == NULL) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Can't allocate transport for " + << "channel id " << channelId << ".\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Can't allocate transport for " + << "channel id " << channelId << ".\n"; + + return -1; + } + } + else if (transports_[channelId] -> + getType() != transport_agent) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Transport for channel id " + << channelId << " should be null.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Transport for channel id " + << channelId << " should be null.\n"; + + return -1; + } + + return 1; +} + +int Proxy::deallocateTransport(int channelId) +{ + // + // Transport for the agent connection + // is passed from the outside when + // creating the channel. + // + + if (transports_[channelId] -> + getType() != transport_agent) + { + delete transports_[channelId]; + } + + transports_[channelId] = NULL; + + return 1; +} + +int Proxy::handleNewGenericConnection(int clientFd, T_channel_type type, const char *label) +{ + int channelId = allocateChannelMap(clientFd); + + if (channelId == -1) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Maximum mumber of available " + << "channels exceeded.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Maximum mumber of available " + << "channels exceeded.\n"; + + return -1; + } + + #ifdef TEST + *logofs << "Proxy: Channel for " << label << " descriptor " + << "FD#" << clientFd << " mapped to ID#" + << channelId << ".\n" + << logofs_flush; + #endif + + // + // Turn queuing off for path server-to-proxy. + // + + SetNoDelay(clientFd, 1); + + if (allocateTransport(clientFd, channelId) < 0) + { + return -1; + } + + switch (type) + { + case channel_cups: + { + channels_[channelId] = new CupsChannel(transports_[channelId], compressor_); + + break; + } + case channel_smb: + { + channels_[channelId] = new SmbChannel(transports_[channelId], compressor_); + + break; + } + case channel_media: + { + channels_[channelId] = new MediaChannel(transports_[channelId], compressor_); + + break; + } + case channel_http: + { + channels_[channelId] = new HttpChannel(transports_[channelId], compressor_); + + break; + } + case channel_font: + { + channels_[channelId] = new FontChannel(transports_[channelId], compressor_); + + break; + } + default: + { + channels_[channelId] = new SlaveChannel(transports_[channelId], compressor_); + + break; + } + } + + if (channels_[channelId] == NULL) + { + deallocateTransport(channelId); + + return -1; + } + + #ifdef TEST + *logofs << "Proxy: Accepted new connection to " + << label << " server.\n" << logofs_flush; + #endif + + cerr << "Info" << ": Accepted new connection to " + << label << " server.\n"; + + increaseChannels(channelId); + + switch (type) + { + case channel_cups: + { + if (handleControl(code_new_cups_connection, channelId) < 0) + { + return -1; + } + + break; + } + case channel_smb: + { + if (handleControl(code_new_smb_connection, channelId) < 0) + { + return -1; + } + + break; + } + case channel_media: + { + if (handleControl(code_new_media_connection, channelId) < 0) + { + return -1; + } + + break; + } + case channel_http: + { + if (handleControl(code_new_http_connection, channelId) < 0) + { + return -1; + } + + break; + } + case channel_font: + { + if (handleControl(code_new_font_connection, channelId) < 0) + { + return -1; + } + + break; + } + default: + { + if (handleControl(code_new_slave_connection, channelId) < 0) + { + return -1; + } + + break; + } + } + + channels_[channelId] -> handleConfiguration(); + + return 1; +} + +int Proxy::handleNewSlaveConnection(int clientFd) +{ + if (control -> isProtoStep7() == 1) + { + return handleNewGenericConnection(clientFd, channel_slave, "slave"); + } + else + { + #ifdef TEST + *logofs << "Proxy: WARNING! Not sending unsupported " + << "'code_new_slave_connection' message.\n" + << logofs_flush; + #endif + + return -1; + } +} + +int Proxy::handleNewGenericConnectionFromProxy(int channelId, T_channel_type type, + const char *hostname, int port, const char *label) +{ + if (port <= 0) + { + // + // This happens when user has disabled + // forwarding of the specific service. + // + + #ifdef WARNING + *logofs << "Proxy: WARNING! Refusing attempted connection " + << "to " << label << " server.\n" << logofs_flush; + #endif + + cerr << "Warning" << ": Refusing attempted connection " + << "to " << label << " server.\n"; + + return -1; + } + + const char *serverHost = hostname; + int serverAddrFamily = AF_INET; + sockaddr *serverAddr = NULL; + unsigned int serverAddrLength = 0; + + #ifdef TEST + *logofs << "Proxy: Connecting to " << label + << " server '" << serverHost << "' on TCP port '" + << port << "'.\n" << logofs_flush; + #endif + + int serverIPAddr = GetHostAddress(serverHost); + + if (serverIPAddr == 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Unknown " << label + << " server host '" << serverHost << "'.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Unknown " << label + << " server host '" << serverHost + << "'.\n"; + + return -1; + } + + sockaddr_in *serverAddrTCP = new sockaddr_in; + + serverAddrTCP -> sin_family = AF_INET; + serverAddrTCP -> sin_port = htons(port); + serverAddrTCP -> sin_addr.s_addr = serverIPAddr; + + serverAddr = (sockaddr *) serverAddrTCP; + serverAddrLength = sizeof(sockaddr_in); + + // + // Connect to the requested server. + // + + int serverFd = socket(serverAddrFamily, SOCK_STREAM, PF_UNSPEC); + + if (serverFd < 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Call to socket failed. " + << "Error is " << EGET() << " '" << ESTR() + << "'.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Call to socket failed. " + << "Error is " << EGET() << " '" << ESTR() + << "'.\n"; + + delete serverAddrTCP; + + return -1; + } + else if (connect(serverFd, serverAddr, serverAddrLength) < 0) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! Connection to " << label + << " server '" << serverHost << ":" << port + << "' failed with error '" << ESTR() << "'.\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Connection to " << label + << " server '" << serverHost << ":" << port + << "' failed with error '" << ESTR() << "'.\n"; + + close(serverFd); + + delete serverAddrTCP; + + return -1; + } + + delete serverAddrTCP; + + if (handlePostConnectionFromProxy(channelId, serverFd, type, label) < 0) + { + return -1; + } + + #ifdef TEST + *logofs << "Proxy: Forwarded new connection to " + << label << " server on port '" << port + << "'.\n" << logofs_flush; + #endif + + cerr << "Info" << ": Forwarded new connection to " + << label << " server on port '" << port + << "'.\n"; + + return 1; +} + +int Proxy::handleNewGenericConnectionFromProxy(int channelId, T_channel_type type, + const char *hostname, const char *path, const char *label) +{ + if (path == NULL || *path == '\0' ) + { + // + // This happens when user has disabled + // forwarding of the specific service. + // + + #ifdef WARNING + *logofs << "Proxy: WARNING! Refusing attempted connection " + << "to " << label << " server.\n" << logofs_flush; + #endif + + cerr << "Warning" << ": Refusing attempted connection " + << "to " << label << " server.\n"; + + return -1; + } + + sockaddr_un serverAddrUnix; + + unsigned int serverAddrLength = sizeof(sockaddr_un); + + int serverAddrFamily = AF_UNIX; + + serverAddrUnix.sun_family = AF_UNIX; + + const int serverAddrNameLength = 108; + + strncpy(serverAddrUnix.sun_path, path, serverAddrNameLength); + + *(serverAddrUnix.sun_path + serverAddrNameLength - 1) = '\0'; + + #ifdef TEST + *logofs << "Proxy: Connecting to " << label << " server " + << "on Unix port '" << path << "'.\n" << logofs_flush; + #endif + + // + // Connect to the requested server. + // + + int serverFd = socket(serverAddrFamily, SOCK_STREAM, PF_UNSPEC); + + if (serverFd < 0) + { + #ifdef PANIC + *logofs << "Proxy: PANIC! Call to socket failed. " + << "Error is " << EGET() << " '" << ESTR() + << "'.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Call to socket failed. " + << "Error is " << EGET() << " '" << ESTR() + << "'.\n"; + + return -1; + } + else if (connect(serverFd, (sockaddr *) &serverAddrUnix, serverAddrLength) < 0) + { + #ifdef WARNING + *logofs << "Proxy: WARNING! Connection to " << label + << " server on Unix port '" << path << "' failed " + << "with error " << EGET() << ", '" << ESTR() << "'.\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Connection to " << label + << " server on Unix port '" << path << "' failed " + << "with error " << EGET() << ", '" << ESTR() << "'.\n"; + + close(serverFd); + + return -1; + } + + if (handlePostConnectionFromProxy(channelId, serverFd, type, label) < 0) + { + return -1; + } + + #ifdef TEST + *logofs << "Proxy: Forwarded new connection to " + << label << " server on Unix port '" << path + << "'.\n" << logofs_flush; + #endif + + cerr << "Info" << ": Forwarded new connection to " + << label << " server on Unix port '" << path + << "'.\n"; + + return 1; +} + +int Proxy::handleNewSlaveConnectionFromProxy(int channelId) +{ + // + // Implementation is incomplete. Opening a + // slave channel should let the proxy fork + // a new client and pass to it the channel + // descriptors. For now we make the channel + // fail immediately. + // + // #include <fcntl.h> + // #include <sys/types.h> + // #include <sys/stat.h> + // + // char *slaveServer = "/dev/null"; + // + // #ifdef TEST + // *logofs << "Proxy: Opening file '" << slaveServer + // << "'.\n" << logofs_flush; + // #endif + // + // int serverFd = open(slaveServer, O_RDWR); + // + // if (handlePostConnectionFromProxy(channelId, serverFd, channel_slave, "slave") < 0) + // { + // return -1; + // } + // + + #ifdef WARNING + *logofs << "Proxy: Refusing new slave connection for " + << "channel ID#" << channelId << "\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Refusing new slave connection for " + << "channel ID#" << channelId << "\n"; + + return -1; +} + +int Proxy::handlePostConnectionFromProxy(int channelId, int serverFd, + T_channel_type type, const char *label) +{ + // + // Turn queuing off for path proxy-to-server. + // + + SetNoDelay(serverFd, 1); + + assignChannelMap(channelId, serverFd); + + #ifdef TEST + *logofs << "Proxy: Descriptor FD#" << serverFd + << " mapped to channel ID#" << channelId << ".\n" + << logofs_flush; + #endif + + if (allocateTransport(serverFd, channelId) < 0) + { + return -1; + } + + switch (type) + { + case channel_cups: + { + channels_[channelId] = new CupsChannel(transports_[channelId], compressor_); + break; + } + case channel_smb: + { + channels_[channelId] = new SmbChannel(transports_[channelId], compressor_); + + break; + } + case channel_media: + { + channels_[channelId] = new MediaChannel(transports_[channelId], compressor_); + + break; + } + case channel_http: + { + channels_[channelId] = new HttpChannel(transports_[channelId], compressor_); + + break; + } + case channel_font: + { + channels_[channelId] = new FontChannel(transports_[channelId], compressor_); + + break; + } + default: + { + channels_[channelId] = new SlaveChannel(transports_[channelId], compressor_); + + break; + } + } + + if (channels_[channelId] == NULL) + { + deallocateTransport(channelId); + + return -1; + } + + increaseChannels(channelId); + + channels_[channelId] -> handleConfiguration(); + + return 1; +} |