diff options
author | Mike Gabriel <mike.gabriel@das-netzwerkteam.de> | 2017-06-30 20:13:51 +0200 |
---|---|---|
committer | Mike Gabriel <mike.gabriel@das-netzwerkteam.de> | 2017-07-26 10:12:43 +0200 |
commit | f76c82403888bb498973ec974dbfd20e4edb02fe (patch) | |
tree | be0cb6c112d9d9fb46387fbd114727510197ddec /nxcomp/Proxy.cpp | |
parent | 9193d11eeeea933e293acd5e0f03fa4e9887186b (diff) | |
download | nx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.tar.gz nx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.tar.bz2 nx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.zip |
nxcomp: Switch to autoreconf.
Diffstat (limited to 'nxcomp/Proxy.cpp')
-rw-r--r-- | nxcomp/Proxy.cpp | 6525 |
1 files changed, 0 insertions, 6525 deletions
diff --git a/nxcomp/Proxy.cpp b/nxcomp/Proxy.cpp deleted file mode 100644 index 3a7a42362..000000000 --- a/nxcomp/Proxy.cpp +++ /dev/null @@ -1,6525 +0,0 @@ -/**************************************************************************/ -/* */ -/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */ -/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */ -/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */ -/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */ -/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/ -/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */ -/* */ -/* NXCOMP, NX protocol compression and NX extensions to this software */ -/* are copyright of the aforementioned persons and companies. */ -/* */ -/* Redistribution and use of the present software is allowed according */ -/* to terms specified in the file LICENSE.nxcomp which comes in the */ -/* source distribution. */ -/* */ -/* All rights reserved. */ -/* */ -/* NOTE: This software has received contributions from various other */ -/* contributors, only the core maintainers and supporters are listed as */ -/* copyright holders. Please contact us, if you feel you should be listed */ -/* as copyright holder, as well. */ -/* */ -/**************************************************************************/ - -#include <cstdio> -#include <unistd.h> -#include <cstdlib> -#include <string.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <signal.h> - -#ifdef ANDROID -#include <netinet/in.h> -#include <netinet/ip.h> -#include <netinet/tcp.h> -#endif - -#include "Misc.h" - -#if defined(__CYGWIN32__) || defined(__APPLE__) || defined(__FreeBSD__) || defined(__sun) -#include <netinet/in_systm.h> -#endif - -#ifndef ANDROID -#include <netinet/in.h> -#include <netinet/ip.h> -#include <netinet/tcp.h> -#endif - -#include "NXalert.h" -#include "NXvars.h" - -#include "Proxy.h" - -#include "Socket.h" -#include "Channel.h" -#include "Statistics.h" - -#include "ClientChannel.h" -#include "ServerChannel.h" -#include "GenericChannel.h" -#include "ChannelEndPoint.h" - -// -// We need to adjust some values related -// to these messages at the time the mes- -// sage stores are reconfigured. -// - -#include "PutImage.h" -#include "ChangeGC.h" -#include "PolyFillRectangle.h" -#include "PutPackedImage.h" - -// -// This is from the main loop. -// - -extern void CleanupListeners(); - -extern int HandleChild(int); - -// -// Default size of string buffers. -// - -#define DEFAULT_STRING_LENGTH 512 - -// -// Set the verbosity level. You also need -// to define DUMP in Misc.cpp if DUMP is -// defined here. -// - -#define WARNING -#define PANIC -#undef TEST -#undef DEBUG -#undef DUMP - -// -// Log the important tracepoints related -// to writing packets to the peer proxy. -// - -#undef FLUSH - -// -// Log the operations related to splits. -// - -#undef SPLIT - -// -// Log the operations related to sending -// and receiving the control tokens. -// - -#undef TOKEN - -// -// Log the operations related to setting -// the token limits. -// - -#undef LIMIT - -// -// Log a warning if no data is written by -// the proxy within a timeout. -// - -#undef TIME - -// -// Log the operation related to generating -// the ping message at idle time. -// - -#undef PING - -Proxy::Proxy(int fd) - - : transport_(new ProxyTransport(fd)), fd_(fd), readBuffer_(transport_) -{ - for (int channelId = 0; - channelId < CONNECTIONS_LIMIT; - channelId++) - { - channels_[channelId] = NULL; - transports_[channelId] = NULL; - congestions_[channelId] = 0; - - fdMap_[channelId] = nothing; - channelMap_[channelId] = nothing; - slavePidMap_[channelId] = nothing; - } - - inputChannel_ = nothing; - outputChannel_ = nothing; - - controlLength_ = 0; - - operation_ = operation_in_negotiation; - - draining_ = 0; - priority_ = 0; - finish_ = 0; - shutdown_ = 0; - congestion_ = 0; - - timer_ = 0; - alert_ = 0; - - agent_ = nothing; - - // - // Set null timeouts. This will require - // a new link configuration. - // - - timeouts_.split = 0; - timeouts_.motion = 0; - - timeouts_.readTs = getTimestamp(); - timeouts_.writeTs = getTimestamp(); - - timeouts_.loopTs = getTimestamp(); - timeouts_.pingTs = getTimestamp(); - timeouts_.alertTs = nullTimestamp(); - timeouts_.loadTs = nullTimestamp(); - - timeouts_.splitTs = nullTimestamp(); - timeouts_.motionTs = nullTimestamp(); - - // - // Initialize the token counters. This - // will require a new link configuration. - // - - for (int i = token_control; i <= token_data; i++) - { - tokens_[i].size = 0; - tokens_[i].limit = 0; - - tokens_[i].bytes = 0; - tokens_[i].remaining = 0; - } - - tokens_[token_control].request = code_control_token_request; - tokens_[token_control].reply = code_control_token_reply; - tokens_[token_control].type = token_control; - - tokens_[token_split].request = code_split_token_request; - tokens_[token_split].reply = code_split_token_reply; - tokens_[token_split].type = token_split; - - tokens_[token_data].request = code_data_token_request; - tokens_[token_data].reply = code_data_token_reply; - tokens_[token_data].type = token_data; - - currentStatistics_ = NULL; - - // - // Create compressor and decompressor - // for image and data payload. - // - - compressor_ = new StaticCompressor(control -> LocalDataCompressionLevel, - control -> LocalDataCompressionThreshold); - - // - // Create object storing NX specific - // opcodes. - // - - opcodeStore_ = new OpcodeStore(); - - // - // Create the message stores. - // - - clientStore_ = new ClientStore(compressor_); - serverStore_ = new ServerStore(compressor_); - - clientCache_ = new ClientCache(); - serverCache_ = new ServerCache(); - - if (clientCache_ == NULL || serverCache_ == NULL) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Failed to create the channel cache.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Failed to create the channel cache.\n"; - - HandleCleanup(); - } - - // - // Prepare for image decompression. - // - - UnpackInit(); - - #ifdef DEBUG - *logofs << "Proxy: Created new object at " << this - << ".\n" << logofs_flush; - #endif -} - -Proxy::~Proxy() -{ - for (int channelId = 0; - channelId < CONNECTIONS_LIMIT; - channelId++) - { - if (channels_[channelId] != NULL) - { - deallocateTransport(channelId); - - delete channels_[channelId]; - channels_[channelId] = NULL; - } - } - - // - // Kill all active slave channel children, and - // give them 5 seconds to exit nicely. - - #ifdef DEBUG - *logofs << "Proxy: Killing active slaves" << endl; - #endif - - int slave_count = 999; - int loop_count = 0; - - while(slave_count > 0 && loop_count < 50) - { - slave_count = 0; - - for (int channelId = 0; channelId<CONNECTIONS_LIMIT; channelId++) - { - int pid = slavePidMap_[channelId]; - - if (pid > 1) { - slave_count++; - - #ifdef DEBUG - *logofs << "Proxy: Active slave with pid " << pid << logofs_flush; - #endif - - if ( loop_count == 0 ) - { - #ifdef DEBUG - *logofs << "Proxy: Sending SIGTERM to " << pid << logofs_flush; - #endif - kill(pid, SIGTERM); - } - else if ( loop_count == 25 ) - { - #ifdef DEBUG - *logofs << "Proxy: Sending SIGKILL to " << pid << logofs_flush; - #endif - kill(pid, SIGKILL); - } - - if (HandleChild(pid)) - { - #ifdef DEBUG - *logofs << "Proxy: Slave " << pid << " terminated" << logofs_flush; - #endif - slavePidMap_[channelId] = nothing; - } - } - } - - if ( slave_count > 0 ) - { - cerr << "Proxy: Error: Failed to kill all slave channel processes. " << slave_count << " processes still remaining." << endl; - } - - usleep(200000); - loop_count++; - } - - delete transport_; - delete compressor_; - - // - // Delete storage shared among channels. - // - - delete opcodeStore_; - - delete clientStore_; - delete serverStore_; - - delete clientCache_; - delete serverCache_; - - // - // Get rid of the image decompression - // resources. - // - - UnpackDestroy(); - - #ifdef DEBUG - *logofs << "Proxy: Deleted proxy object at " << this - << ".\n" << logofs_flush; - #endif -} - -int Proxy::setOperational() -{ - #ifdef TEST - *logofs << "Proxy: Entering operational mode.\n" - << logofs_flush; - #endif - - operation_ = operation_in_messages; - - return 1; -} - -int Proxy::setReadDescriptors(fd_set *fdSet, int &fdMax, T_timestamp &tsMax) -{ - // - // Set the initial timeout to the time of - // the next ping. If the congestion count - // is greater than zero, anyway, use a - // shorter timeout to force a congestion - // update. - // - - if (agent_ != nothing && congestions_[agent_] == 0 && - statistics -> getCongestionInFrame() >= 1 && - tokens_[token_control].remaining >= - (tokens_[token_control].limit - 1)) - { - setMinTimestamp(tsMax, control -> IdleTimeout); - - #ifdef TEST - *logofs << "Proxy: Initial timeout is " << tsMax.tv_sec - << " S and " << (double) tsMax.tv_usec / - 1000 << " Ms with congestion " - << statistics -> getCongestionInFrame() - << ".\n" << logofs_flush; - #endif - } - else - { - setMinTimestamp(tsMax, control -> PingTimeout); - - #ifdef TEST - *logofs << "Proxy: Initial timeout is " << tsMax.tv_sec - << " S and " << (double) tsMax.tv_usec / - 1000 << " Ms.\n" << logofs_flush; - #endif - } - - int fd = -1; - - if (isTimeToRead() == 1) - { - // - // If we don't have split tokens available - // don't set the timeout. - // - - if (tokens_[token_split].remaining > 0 && - isTimestamp(timeouts_.splitTs) == 1) - { - int diffTs = getTimeToNextSplit(); - - #if defined(TEST) || defined(INFO) || \ - defined(FLUSH) || defined(SPLIT) - - if (diffTimestamp(timeouts_.splitTs, - getTimestamp()) > timeouts_.split) - { - *logofs << "Proxy: FLUSH! SPLIT! WARNING! Running with " - << diffTimestamp(timeouts_.splitTs, getTimestamp()) - << " Ms elapsed since the last split.\n" - << logofs_flush; - } - - *logofs << "Proxy: FLUSH! SPLIT! Requesting timeout of " - << diffTs << " Ms as there are splits to send.\n" - << logofs_flush; - - #endif - - setMinTimestamp(tsMax, diffTs); - } - #if defined(TEST) || defined(INFO) - else if (isTimestamp(timeouts_.splitTs) == 1) - { - *logofs << "Proxy: WARNING! Not requesting a split " - << "timeout with " << tokens_[token_split].remaining - << " split tokens remaining.\n" << logofs_flush; - } - #endif - - // - // Loop through the valid channels and set - // the descriptors selected for read and - // the timeout. - // - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] == NULL) - { - continue; - } - - fd = getFd(channelId); - - if (channels_[channelId] -> getFinish() == 0 && - (channels_[channelId] -> getType() == channel_x11 || - tokens_[token_data].remaining > 0) && - congestions_[channelId] == 0) - { - FD_SET(fd, fdSet); - - if (fd >= fdMax) - { - fdMax = fd + 1; - } - - #ifdef TEST - *logofs << "Proxy: Descriptor FD#" << fd - << " selected for read with buffer length " - << transports_[channelId] -> length() - << ".\n" << logofs_flush; - #endif - - // - // Wakeup the proxy if there are motion - // events to flush. - // - - if (isTimestamp(timeouts_.motionTs) == 1) - { - int diffTs = getTimeToNextMotion(); - - #if defined(TEST) || defined(INFO) - - if (diffTimestamp(timeouts_.motionTs, - getTimestamp()) > timeouts_.motion) - { - *logofs << "Proxy: FLUSH! WARNING! Running with " - << diffTimestamp(timeouts_.motionTs, getTimestamp()) - << " Ms elapsed since the last motion.\n" - << logofs_flush; - } - - *logofs << "Proxy: FLUSH! Requesting timeout of " - << diffTs << " Ms as FD#" << fd << " has motion " - << "events to send.\n" << logofs_flush; - - #endif - - setMinTimestamp(tsMax, diffTs); - } - } - #if defined(TEST) || defined(INFO) - else - { - if (channels_[channelId] -> getType() != channel_x11 && - tokens_[token_data].remaining <= 0) - { - *logofs << "Proxy: WARNING! Descriptor FD#" << fd - << " not selected for read with " - << tokens_[token_data].remaining << " data " - << "tokens remaining.\n" << logofs_flush; - } - } - #endif - } - } - #if defined(TEST) || defined(INFO) - else - { - *logofs << "Proxy: WARNING! Disabled reading from channels.\n" - << logofs_flush; - - *logofs << "Proxy: WARNING! Congestion is " << congestion_ - << " pending " << transport_ -> pending() << " blocked " - << transport_ -> blocked() << " length " << transport_ -> - length() << ".\n" << logofs_flush; - } - #endif - - // - // Include the proxy descriptor. - // - - FD_SET(fd_, fdSet); - - if (fd_ >= fdMax) - { - fdMax = fd_ + 1; - } - - #ifdef TEST - *logofs << "Proxy: Proxy descriptor FD#" << fd_ - << " selected for read with buffer length " - << transport_ -> length() << ".\n" - << logofs_flush; - #endif - - return 1; -} - -// -// Add to the mask the file descriptors of all -// X connections to write to. -// - -int Proxy::setWriteDescriptors(fd_set *fdSet, int &fdMax, T_timestamp &tsMax) -{ - int fd = -1; - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] != NULL) - { - fd = getFd(channelId); - - if (transports_[channelId] -> length() > 0) - { - FD_SET(fd, fdSet); - - #ifdef TEST - *logofs << "Proxy: Descriptor FD#" << fd << " selected " - << "for write with blocked " << transports_[channelId] -> - blocked() << " and length " << transports_[channelId] -> - length() << ".\n" << logofs_flush; - #endif - - if (fd >= fdMax) - { - fdMax = fd + 1; - } - } - #ifdef TEST - else - { - *logofs << "Proxy: Descriptor FD#" << fd << " not selected " - << "for write with blocked " << transports_[channelId] -> - blocked() << " and length " << transports_[channelId] -> - length() << ".\n" << logofs_flush; - } - #endif - - #if defined(TEST) || defined(INFO) - - if (transports_[channelId] -> getType() != - transport_agent && transports_[channelId] -> - length() > 0 && transports_[channelId] -> - blocked() != 1) - { - *logofs << "Proxy: PANIC! Descriptor FD#" << fd - << " has data to write but blocked flag is " - << transports_[channelId] -> blocked() - << ".\n" << logofs_flush; - - cerr << "Error" << ": Descriptor FD#" << fd - << " has data to write but blocked flag is " - << transports_[channelId] -> blocked() - << ".\n"; - - HandleCleanup(); - } - - #endif - } - } - - // - // Check if the proxy transport has data - // from a previous blocking write. - // - - if (transport_ -> blocked() == 1) - { - FD_SET(fd_, fdSet); - - #ifdef TEST - *logofs << "Proxy: Proxy descriptor FD#" - << fd_ << " selected for write. Blocked is " - << transport_ -> blocked() << " length is " - << transport_ -> length() << ".\n" - << logofs_flush; - #endif - - if (fd_ >= fdMax) - { - fdMax = fd_ + 1; - } - } - #ifdef TEST - else - { - *logofs << "Proxy: Proxy descriptor FD#" - << fd_ << " not selected for write. Blocked is " - << transport_ -> blocked() << " length is " - << transport_ -> length() << ".\n" - << logofs_flush; - } - #endif - - // - // We are entering the main select. Save - // the timestamp of the last loop so that - // we can detect the clock drifts. - // - - timeouts_.loopTs = getTimestamp(); - - return 1; -} - -int Proxy::getChannels(T_channel_type type) -{ - int channels = 0; - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] != NULL && - (type == channel_none || - type == channels_[channelId] -> - getType())) - { - channels++; - } - } - - return channels; -} - -T_channel_type Proxy::getType(int fd) -{ - int channelId = getChannel(fd); - - if (channelId < 0 || channels_[channelId] == NULL) - { - return channel_none; - } - - return channels_[channelId] -> getType(); -} - -const char *Proxy::getTypeName(T_channel_type type) -{ - switch (type) - { - case channel_x11: - { - return "X"; - } - case channel_cups: - { - return "CUPS"; - } - case channel_smb: - { - return "SMB"; - } - case channel_media: - { - return "media"; - } - case channel_http: - { - return "HTTP"; - } - case channel_font: - { - return "font"; - } - case channel_slave: - { - return "slave"; - } - default: - { - return "unknown"; - } - } -} - -const char *Proxy::getComputerName() -{ - // - // Strangely enough, under some Windows OSes SMB - // service doesn't bind to localhost. Fall back - // to localhost if can't find computer name in - // the environment. In future we should try to - // bind to localhost and then try the other IPs. - // - - const char *hostname = NULL; - - #ifdef __CYGWIN32__ - - hostname = getenv("COMPUTERNAME"); - - #endif - - if (hostname == NULL) - { - hostname = "localhost"; - } - - return hostname; -} - -// -// Handle data from channels selected for read. -// - -int Proxy::handleRead(int &resultFds, fd_set &readSet) -{ - #ifdef DEBUG - *logofs << "Proxy: Checking descriptors selected for read.\n" - << logofs_flush; - #endif - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - #ifdef DEBUG - *logofs << "Proxy: Looping with current channel " - << *j << ".\n" << logofs_flush; - #endif - - int fd = getFd(*j); - - if (fd >= 0 && resultFds > 0 && FD_ISSET(fd, &readSet)) - { - #ifdef DEBUG - *logofs << "Proxy: Going to read messages from FD#" - << fd << ".\n" << logofs_flush; - #endif - - int result = handleRead(fd); - - if (result < 0) - { - #ifdef TEST - *logofs << "Proxy: Failure reading messages from FD#" - << fd << ".\n" << logofs_flush; - #endif - - return -1; - } - - #ifdef DEBUG - *logofs << "Proxy: Clearing the read descriptor " - << "for FD#" << fd << ".\n" << logofs_flush; - #endif - - FD_CLR(fd, &readSet); - - resultFds--; - } - } - - if (resultFds > 0 && FD_ISSET(fd_, &readSet)) - { - #ifdef DEBUG - *logofs << "Proxy: Going to read messages from " - << "proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - if (handleRead() < 0) - { - #ifdef TEST - *logofs << "Proxy: Failure reading from proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - return -1; - } - - #ifdef DEBUG - *logofs << "Proxy: Clearing the read descriptor " - << "for proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - FD_CLR(fd_, &readSet); - - resultFds--; - } - - return 1; -} - -// -// Perform flush on descriptors selected for write. -// - -int Proxy::handleFlush(int &resultFds, fd_set &writeSet) -{ - #ifdef DEBUG - *logofs << "Proxy: Checking descriptors selected for write.\n" - << logofs_flush; - #endif - - if (resultFds > 0 && FD_ISSET(fd_, &writeSet)) - { - #ifdef TEST - *logofs << "Proxy: FLUSH! Proxy descriptor FD#" << fd_ - << " reported to be writable.\n" - << logofs_flush; - #endif - - if (handleFlush() < 0) - { - #ifdef TEST - *logofs << "Proxy: Failure flushing the writable " - << "proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - return -1; - } - - #ifdef DEBUG - *logofs << "Proxy: Clearing the write descriptor " - << "for proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - FD_CLR(fd_, &writeSet); - - resultFds--; - } - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - resultFds > 0 && j != channelList.end(); j++) - { - #ifdef DEBUG - *logofs << "Proxy: Looping with current channel " - << *j << ".\n" << logofs_flush; - #endif - - int fd = getFd(*j); - - if (fd >= 0 && FD_ISSET(fd, &writeSet)) - { - #ifdef TEST - *logofs << "Proxy: X descriptor FD#" << fd - << " reported to be writable.\n" - << logofs_flush; - #endif - - // - // It can happen that, in handling reads, we - // have destroyed the buffer associated to a - // closed socket, so don't complain about - // the errors. - // - - handleFlush(fd); - - // - // Clear the descriptor from the mask so - // we don't confuse the agent if it's - // not checking only its own descriptors. - // - - #ifdef DEBUG - *logofs << "Proxy: Clearing the write descriptor " - << "for FD#" << fd << ".\n" - << logofs_flush; - #endif - - FD_CLR(fd, &writeSet); - - resultFds--; - } - } - - return 1; -} - -int Proxy::handleRead() -{ - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Decoding data from proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - // - // Decode all the available messages from - // the remote proxy until is not possible - // to read more. - // - - for (;;) - { - int result = readBuffer_.readMessage(); - - #if defined(TEST) || defined(DEBUG) || defined(INFO) - *logofs << "Proxy: Read result on proxy FD#" << fd_ - << " is " << result << ".\n" - << logofs_flush; - #endif - - if (result < 0) - { - if (shutdown_ == 0) - { - if (finish_ == 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Failure reading from the " - << "peer proxy on FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Failure reading from the " - << "peer proxy.\n"; - } - } - #ifdef TEST - else - { - *logofs << "Proxy: Closure of the proxy link detected " - << "after clean shutdown.\n" << logofs_flush; - } - #endif - - priority_ = 0; - finish_ = 1; - congestion_ = 0; - - return -1; - } - else if (result == 0) - { - #if defined(TEST) || defined(DEBUG) || defined(INFO) - *logofs << "Proxy: No data read from proxy FD#" - << fd_ << "\n" << logofs_flush; - #endif - - return 0; - } - - // - // We read some data from the remote. If we set - // the congestion flag because we couldn't read - // before the timeout and have tokens available, - // then reset the congestion flag. - // - - if (congestion_ == 1 && - tokens_[token_control].remaining > 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Exiting congestion due to " - << "proxy data with " << tokens_[token_control].remaining - << " tokens.\n" << logofs_flush; - #endif - - congestion_ = 0; - } - - // - // Set the timestamp of the last read - // operation from the remote proxy and - // enable again showing the 'no data - // received' dialog at the next timeout. - // - - timeouts_.readTs = getTimestamp(); - - if (alert_ != 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Displacing the dialog " - << "for proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - HandleAlert(DISPLACE_MESSAGE_ALERT, 1); - } - - timeouts_.alertTs = nullTimestamp(); - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Getting messages from proxy FD#" << fd_ - << " with " << readBuffer_.getLength() << " bytes " - << "in the read buffer.\n" << logofs_flush; - #endif - - unsigned int controlLength; - unsigned int dataLength; - - const unsigned char *message; - - while ((message = readBuffer_.getMessage(controlLength, dataLength)) != NULL) - { - statistics -> addFrameIn(); - - if (controlLength == 3 && *message == 0 && - *(message + 1) < code_last_tag) - { - if (handleControlFromProxy(message) < 0) - { - return -1; - } - } - else if (operation_ == operation_in_messages) - { - int channelId = inputChannel_; - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Identified message of " << dataLength - << " bytes for FD#" << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - if (channelId >= 0 && channelId < CONNECTIONS_LIMIT && - channels_[channelId] != NULL) - { - int finish = channels_[channelId] -> getFinish(); - - #ifdef WARNING - - if (finish == 1) - { - *logofs << "Proxy: WARNING! Handling data for finishing " - << "FD#" << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - } - - #endif - - // - // We need to decode all the data to preserve - // the consistency of the cache, so can't re- - // turn as soon as the first error is encount- - // ered. Check if this is the first time that - // the failure is detected. - // - - int result = channels_[channelId] -> handleWrite(message, dataLength); - - if (result < 0 && finish == 0) - { - #ifdef TEST - *logofs << "Proxy: Failed to write proxy data to FD#" - << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - if (handleFinish(channelId) < 0) - { - return -1; - } - } - - // - // Check if we have splits or motion - // events to send. - // - - setSplitTimeout(channelId); - setMotionTimeout(channelId); - } - #ifdef WARNING - else - { - *logofs << "Proxy: WARNING! Received data for " - << "invalid channel ID#" << channelId - << ".\n" << logofs_flush; - } - #endif - } - else if (operation_ == operation_in_statistics) - { - #ifdef TEST - *logofs << "Proxy: Received statistics data from remote proxy.\n" - << logofs_flush; - #endif - - if (handleStatisticsFromProxy(message, dataLength) < 0) - { - return -1; - } - - operation_ = operation_in_messages; - } - else if (operation_ == operation_in_negotiation) - { - #ifdef TEST - *logofs << "Proxy: Received new negotiation data from remote proxy.\n" - << logofs_flush; - #endif - - if (handleNegotiationFromProxy(message, dataLength) < 0) - { - return -1; - } - } - - // - // if (controlLength == 3 && *message == 0 && ...) ... - // else if (operation_ == operation_in_statistics) ... - // else if (operation_ == operation_in_messages) ... - // else if (operation_ == operation_in_negotiation) ... - // else ... - // - - else - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Unrecognized message received on proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - cerr << "Error" << ": Unrecognized message received on proxy FD#" - << fd_ << ".\n"; - - return -1; - } - - } // while ((message = readBuffer_.getMessage(controlLength, dataLength)) != NULL) ... - - // - // Reset the read buffer. - // - - readBuffer_.fullReset(); - - // - // Give up if no data is readable. - // - - if (transport_ -> readable() == 0) - { - break; - } - - } // End of for (;;) ... - - return 1; -} - -int Proxy::handleControlFromProxy(const unsigned char *message) -{ - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Received message '" << DumpControl(*(message + 1)) - << "' at " << strMsTimestamp() << " with data ID#" - << (int) *(message + 2) << ".\n" << logofs_flush; - #endif - - T_channel_type channelType = channel_none; - - switch (*(message + 1)) - { - case code_switch_connection: - { - int channelId = *(message + 2); - - // - // If channel is invalid further messages will - // be ignored. The acknowledged shutdown of - // channels should prevent this. - // - - inputChannel_ = channelId; - - break; - } - case code_begin_congestion: - { - // - // Set the congestion state for the - // channel reported by the remote. - // - - int channelId = *(message + 2); - - if (channels_[channelId] != NULL) - { - congestions_[channelId] = 1; - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Received a begin congestion " - << "for channel id ID#" << channelId - << ".\n" << logofs_flush; - #endif - - if (channelId == agent_ && congestions_[agent_] != 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Forcing an update of the congestion " - << "counter with agent congested.\n" - << logofs_flush; - #endif - - statistics -> updateCongestion(-tokens_[token_control].remaining, - tokens_[token_control].limit); - } - } - #ifdef WARNING - else - { - *logofs << "Proxy: WARNING! Received a begin congestion " - << "for invalid channel id ID#" << channelId - << ".\n" << logofs_flush; - } - #endif - - break; - } - case code_end_congestion: - { - // - // Attend again to the channel. - // - - int channelId = *(message + 2); - - if (channels_[channelId] != NULL) - { - congestions_[channelId] = 0; - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Received an end congestion " - << "for channel id ID#" << channelId - << ".\n" << logofs_flush; - #endif - - if (channelId == agent_ && congestions_[agent_] != 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Forcing an update of the congestion " - << "counter with agent decongested.\n" - << logofs_flush; - #endif - - statistics -> updateCongestion(tokens_[token_control].remaining, - tokens_[token_control].limit); - } - } - #ifdef WARNING - else - { - *logofs << "Proxy: WARNING! Received an end congestion " - << "for invalid channel id ID#" << channelId - << ".\n" << logofs_flush; - } - #endif - - break; - } - case code_control_token_request: - { - T_proxy_token &token = tokens_[token_control]; - - if (handleTokenFromProxy(token, *(message + 2)) < 0) - { - return -1; - } - - break; - } - case code_split_token_request: - { - T_proxy_token &token = tokens_[token_split]; - - if (handleTokenFromProxy(token, *(message + 2)) < 0) - { - return -1; - } - - break; - } - case code_data_token_request: - { - T_proxy_token &token = tokens_[token_data]; - - if (handleTokenFromProxy(token, *(message + 2)) < 0) - { - return -1; - } - - break; - } - case code_control_token_reply: - { - T_proxy_token &token = tokens_[token_control]; - - if (handleTokenReplyFromProxy(token, *(message + 2)) < 0) - { - return -1; - } - - break; - } - case code_split_token_reply: - { - T_proxy_token &token = tokens_[token_split]; - - if (handleTokenReplyFromProxy(token, *(message + 2)) < 0) - { - return -1; - } - - break; - } - case code_data_token_reply: - { - T_proxy_token &token = tokens_[token_data]; - - if (handleTokenReplyFromProxy(token, *(message + 2)) < 0) - { - return -1; - } - - break; - } - case code_new_x_connection: - { - // - // Opening the channel is handled later. - // - - channelType = channel_x11; - - break; - } - case code_new_cups_connection: - { - channelType = channel_cups; - - break; - } - case code_new_aux_connection: - { - // - // Starting from version 1.5.0 we create real X - // connections for the keyboard channel. We need - // to refuse old auxiliary X connections because - // they would be unable to leverage the new fake - // authorization cookie. - // - - #ifdef WARNING - *logofs << "Proxy: WARNING! Can't open outdated auxiliary X " - << "channel for code " << *(message + 1) << ".\n" - << logofs_flush; - #endif - - cerr << "Warning" << ": Can't open outdated auxiliary X " - << "channel for code " << *(message + 1) << ".\n"; - - if (handleControl(code_drop_connection, *(message + 2)) < 0) - { - return -1; - } - - break; - } - case code_new_smb_connection: - { - channelType = channel_smb; - - break; - } - case code_new_media_connection: - { - channelType = channel_media; - - break; - } - case code_new_http_connection: - { - channelType = channel_http; - - break; - } - case code_new_font_connection: - { - channelType = channel_font; - - break; - } - case code_new_slave_connection: - { - channelType = channel_slave; - - break; - } - case code_drop_connection: - { - int channelId = *(message + 2); - - if (channelId >= 0 && channelId < CONNECTIONS_LIMIT && - channels_[channelId] != NULL) - { - handleDropFromProxy(channelId); - } - #ifdef WARNING - else - { - *logofs << "Proxy: WARNING! Received a drop message " - << "for invalid channel id ID#" << channelId - << ".\n" << logofs_flush; - } - #endif - - break; - } - case code_finish_connection: - { - int channelId = *(message + 2); - - if (channelId >= 0 && channelId < CONNECTIONS_LIMIT && - channels_[channelId] != NULL) - { - // - // Force the finish state on the channel. - // We can receive this message while in - // the read loop, so we only mark the - // channel for deletion. - // - - #ifdef TEST - *logofs << "Proxy: Received a finish message for FD#" - << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - handleFinishFromProxy(channelId); - } - #ifdef WARNING - else - { - *logofs << "Proxy: WARNING! Received a finish message " - << "for invalid channel id ID#" << channelId - << ".\n" << logofs_flush; - } - #endif - - break; - } - case code_finish_listeners: - { - // - // This is from the main loop. - // - - #ifdef TEST - *logofs << "Proxy: Closing down all local listeners.\n" - << logofs_flush; - #endif - - CleanupListeners(); - - finish_ = 1; - - break; - } - case code_reset_request: - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Proxy reset not supported " - << "in this version.\n" << logofs_flush; - #endif - - cerr << "Error" << ": Proxy reset not supported " - << "in this version.\n"; - - HandleCleanup(); - } - case code_shutdown_request: - { - // - // Time to rest in peace. - // - - shutdown_ = 1; - - break; - } - case code_load_request: - { - if (handleLoadFromProxy() < 0) - { - return -1; - } - - break; - } - case code_save_request: - { - // - // Don't abort the connection - // if can't write to disk. - // - - handleSaveFromProxy(); - - break; - } - case code_statistics_request: - { - int type = *(message + 2); - - if (handleStatisticsFromProxy(type) < 0) - { - return -1; - } - - break; - } - case code_statistics_reply: - { - operation_ = operation_in_statistics; - - break; - } - case code_alert_request: - { - HandleAlert(*(message + 2), 1); - - break; - } - case code_sync_request: - { - int channelId = *(message + 2); - - if (handleSyncFromProxy(channelId) < 0) - { - return -1; - } - - break; - } - case code_sync_reply: - { - // - // We are not the one that issued - // the request. - // - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: PANIC! Received an unexpected " - << "synchronization reply.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Received an unexpected " - << "synchronization reply.\n"; - - HandleCleanup(); - } - default: - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Received bad control message number " - << (unsigned int) *(message + 1) << " with attribute " - << (unsigned int) *(message + 2) << ".\n" << logofs_flush; - #endif - - cerr << "Error" << ": Received bad control message number " - << (unsigned int) *(message + 1) << " with attribute " - << (unsigned int) *(message + 2) << ".\n"; - - HandleCleanup(); - } - - } // End of switch (*(message + 1)) ... - - if (channelType == channel_none) - { - return 1; - } - - // - // Handle the channel allocation that we - // left from the main switch case. - // - - int channelId = *(message + 2); - - // - // Check if the channel has been dropped. - // - - if (channels_[channelId] != NULL && - (channels_[channelId] -> getDrop() == 1 || - channels_[channelId] -> getClosing() == 1)) - { - #ifdef TEST - *logofs << "Proxy: Dropping the descriptor FD#" - << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - handleDrop(channelId); - } - - // - // Check if the channel is in the valid - // range. - // - - int result = checkChannelMap(channelId); - - if (result >= 0) - { - result = handleNewConnectionFromProxy(channelType, channelId); - } - - if (result < 0) - { - // - // Realization of new channel failed. - // Send channel shutdown message to - // the peer proxy. - // - - if (handleControl(code_drop_connection, channelId) < 0) - { - return -1; - } - } - else - { - int fd = getFd(channelId); - - if (getReadable(fd) > 0) - { - #ifdef TEST - *logofs << "Proxy: Trying to read immediately " - << "from descriptor FD#" << fd << ".\n" - << logofs_flush; - #endif - - if (handleRead(fd) < 0) - { - return -1; - } - } - #ifdef TEST - *logofs << "Proxy: Nothing to read immediately " - << "from descriptor FD#" << fd << ".\n" - << logofs_flush; - #endif - } - - return 1; -} - -int Proxy::handleRead(int fd, const char *data, int size) -{ - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Handling data for connection on FD#" - << fd << ".\n" << logofs_flush; - #endif - - if (canRead(fd) == 0) - { - #if defined(TEST) || defined(INFO) - - if (getChannel(fd) < 0) - { - *logofs << "Proxy: PANIC! Can't read from invalid FD#" - << fd << ".\n" << logofs_flush; - - HandleCleanup(); - } - else - { - *logofs << "Proxy: WARNING! Read method called for FD#" - << fd << " but operation is not possible.\n" - << logofs_flush; - } - - #endif - - return 0; - } - - int channelId = getChannel(fd); - - // - // Let the channel object read all the new data from - // its file descriptor, isolate messages, compress - // those messages, and append the compressed form to - // the encode buffer. - // - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Reading messages from FD#" << fd - << " channel ID#" << channelId << ".\n" - << logofs_flush; - #endif - - int result = channels_[channelId] -> handleRead(encodeBuffer_, (const unsigned char *) data, - (unsigned int) size); - - // - // Even in the case of a failure, write the produced - // data to the proxy connection. To keep the stores - // synchronized, the remote side needs to decode any - // message encoded by this side, also if the X socket - // was closed in the meanwhile. If this is the case, - // the decompressed output will be silently discarded. - // - - if (result < 0) - { - #ifdef TEST - *logofs << "Proxy: Failed to read data from connection FD#" - << fd << " channel ID#" << channelId << ".\n" - << logofs_flush; - #endif - - if (handleFinish(channelId) < 0) - { - return -1; - } - } - - // - // Check if there are new splits or - // motion events to send. - // - - setSplitTimeout(channelId); - setMotionTimeout(channelId); - - return 1; -} - -int Proxy::handleEvents() -{ - #ifdef TEST - *logofs << "Proxy: Going to check the events on channels.\n" - << logofs_flush; - #endif - - // - // Check if we can safely write to the - // proxy link. - // - - int read = isTimeToRead(); - - // - // Loop on channels and send the pending - // events. We must copy the list because - // channels can be removed in the middle - // of the loop. - // - - T_list channelList = activeChannels_.copyList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] == NULL) - { - continue; - } - - // - // Check if we need to drop the channel. - // - - if (channels_[channelId] -> getDrop() == 1 || - channels_[channelId] -> getClosing() == 1) - { - #ifdef TEST - *logofs << "Proxy: Dropping the descriptor FD#" - << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - if (handleDrop(channelId) < 0) - { - return -1; - } - - continue; - } - else if (channels_[channelId] -> getFinish() == 1) - { - #ifdef TEST - *logofs << "Proxy: Skipping finishing " - << "descriptor FD#" << getFd(channelId) - << " channel ID#" << channelId << ".\n" - << logofs_flush; - #endif - - continue; - } - - // - // If the proxy link or the channel is - // in congestion state, don't handle - // the further events. - // - - if (read == 0 || congestions_[channelId] == 1) - { - #ifdef TEST - - if (read == 0) - { - *logofs << "Proxy: Can't handle events for FD#" - << getFd(channelId) << " channel ID#" - << channelId << " with proxy not available.\n" - << logofs_flush; - } - else - { - *logofs << "Proxy: Can't handle events for FD#" - << getFd(channelId) << " channel ID#" - << channelId << " with channel congested.\n" - << logofs_flush; - } - - #endif - - continue; - } - - // - // Handle the timeouts on the channel - // operations. - // - - int result = 0; - - // - // Handle the motion events. - // - - if (result >= 0 && channels_[channelId] -> needMotion() == 1) - { - if (isTimeToMotion() == 1) - { - #if defined(TEST) || defined(INFO) || defined(FLUSH) - - *logofs << "Proxy: FLUSH! Motion timeout expired after " - << diffTimestamp(timeouts_.motionTs, getTimestamp()) - << " Ms.\n" << logofs_flush; - - #endif - - result = channels_[channelId] -> handleMotion(encodeBuffer_); - - #ifdef TEST - - if (result < 0) - { - *logofs << "Proxy: Failed to handle motion events for FD#" - << getFd(channelId) << " channel ID#" << channelId - << ".\n" << logofs_flush; - } - - #endif - - timeouts_.motionTs = nullTimestamp(); - - setMotionTimeout(channelId); - } - #if defined(TEST) || defined(INFO) - else if (isTimestamp(timeouts_.motionTs) == 1) - { - *logofs << "Proxy: Running with " - << diffTimestamp(timeouts_.motionTs, getTimestamp()) - << " Ms elapsed since the last motion.\n" - << logofs_flush; - } - #endif - } - - if (result >= 0 && channels_[channelId] -> needSplit() == 1) - { - // - // Check if it is time to send more splits - // and how many bytes are going to be sent. - // - - if (isTimeToSplit() == 1) - { - #if defined(TEST) || defined(INFO) || defined(SPLIT) - *logofs << "Proxy: SPLIT! Split timeout expired after " - << diffTimestamp(timeouts_.splitTs, getTimestamp()) - << " Ms.\n" << logofs_flush; - #endif - - #if defined(TEST) || defined(INFO) || defined(SPLIT) - - *logofs << "Proxy: SPLIT! Encoding splits for FD#" - << getFd(channelId) << " at " << strMsTimestamp() - << " with " << clientStore_ -> getSplitTotalStorageSize() - << " total bytes and " << control -> SplitDataPacketLimit - << " bytes " << "to write.\n" - << logofs_flush; - - #endif - - result = channels_[channelId] -> handleSplit(encodeBuffer_); - - #ifdef TEST - - if (result < 0) - { - *logofs << "Proxy: Failed to handle splits for FD#" - << getFd(channelId) << " channel ID#" << channelId - << ".\n" << logofs_flush; - } - - #endif - - timeouts_.splitTs = nullTimestamp(); - - setSplitTimeout(channelId); - } - #if defined(TEST) || defined(INFO) || defined(SPLIT) - else if (channels_[channelId] -> needSplit() == 1 && - isTimestamp(timeouts_.splitTs) == 0) - { - *logofs << "Proxy: SPLIT! WARNING! Channel for FD#" - << getFd(channelId) << " has split to send but " - << "there is no timeout.\n" << logofs_flush; - } - else if (isTimestamp(timeouts_.splitTs) == 1) - { - *logofs << "Proxy: SPLIT! Running with " - << diffTimestamp(timeouts_.splitTs, getTimestamp()) - << " Ms elapsed since the last split.\n" - << logofs_flush; - } - #endif - } - - if (result < 0) - { - #ifdef TEST - *logofs << "Proxy: Error handling events for FD#" - << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - if (handleFinish(channelId) < 0) - { - return -1; - } - } - } - - return 1; -} - -int Proxy::handleFrame(T_frame_type type) -{ - // - // Write any outstanding control message, followed by the - // content of the encode buffer, to the proxy transport. - // - // This code assumes that the encode buffer data is at an - // offset several bytes from start of the buffer, so that - // the length header and any necessary control bytes can - // be inserted in front of the data already in the buffer. - // This is the easiest way to encapsulate header and data - // together in a single frame. - // - // The way framing is implemented is very efficient but - // inherently limited and does not allow for getting the - // best performance, especially when running over a fast - // link. Framing should be rewritten to include the length - // of the packets in a fixed size header and, possibly, - // to incapsulate the control messages and the channel's - // data in a pseudo X protocol message, so that the proxy - // itself would be treated like any other channel. - // - - #if defined(TEST) || defined(INFO) - - if (congestion_ == 1) - { - // - // This can happen because there may be control - // messages to send, like a proxy shutdown mes- - // sage or a statistics request. All the other - // cases should be considered an error. - // - - #ifdef WARNING - *logofs << "Proxy: WARNING! Data is to be sent while " - << "congestion is " << congestion_ << ".\n" - << logofs_flush; - #endif - } - - #endif - - // - // Check if there is any data available on - // the socket. Recent Linux kernels are very - // picky. They require that we read often or - // they assume that the process is non-inter- - // active. - // - - if (handleAsyncEvents() < 0) - { - return -1; - } - - // - // Check if this is a ping, not a data frame. - // - - if (type == frame_ping) - { - if (handleToken(frame_ping) < 0) - { - return -1; - } - } - - unsigned int dataLength = encodeBuffer_.getLength(); - - #ifdef DEBUG - *logofs << "Proxy: Data length is " << dataLength - << " control length is " << controlLength_ - << ".\n" << logofs_flush; - #endif - - if (dataLength > 0) - { - // - // If this is a generic channel we need - // to add the completion bits. Data can - // also have been encoded because of a - // statistics request, even if no output - // channel was currently selected. - // - - if (outputChannel_ != -1) - { - #if defined(TEST) || defined(INFO) - - if (channels_[outputChannel_] == NULL) - { - *logofs << "Proxy: PANIC! A new frame was requested " - << "but the channel is invalid.\n" - << logofs_flush; - - HandleCleanup(); - } - - #endif - - channels_[outputChannel_] -> handleCompletion(encodeBuffer_); - - dataLength = encodeBuffer_.getLength(); - } - } - else if (controlLength_ == 0) - { - #if defined(TEST) || defined(INFO) - - *logofs << "Proxy: PANIC! A new frame was requested " - << "but there is no data to write.\n" - << logofs_flush; - - HandleCleanup(); - - #endif - - return 0; - } - - #ifdef DEBUG - *logofs << "Proxy: Data length is now " << dataLength - << " control length is " << controlLength_ - << ".\n" << logofs_flush; - #endif - - // - // Check if this frame needs to carry a new - // token request. - // - - if (type == frame_data) - { - if (handleToken(frame_data) < 0) - { - return -1; - } - } - - #ifdef DEBUG - *logofs << "Proxy: Adding a new frame for the remote proxy.\n" - << logofs_flush; - #endif - - unsigned char temp[5]; - - unsigned int lengthLength = 0; - unsigned int shift = dataLength; - - while (shift) - { - temp[lengthLength++] = (unsigned char) (shift & 0x7f); - - shift >>= 7; - } - - unsigned char *data = encodeBuffer_.getData(); - - unsigned char *outputMessage = data - (controlLength_ + lengthLength); - - unsigned char *nextDest = outputMessage; - - for (int i = 0; i < controlLength_; i++) - { - *nextDest++ = controlCodes_[i]; - } - - for (int j = lengthLength - 1; j > 0; j--) - { - *nextDest++ = (temp[j] | 0x80); - } - - if (lengthLength) - { - *nextDest++ = temp[0]; - } - - unsigned int outputLength = dataLength + controlLength_ + lengthLength; - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Produced plain output for " << dataLength << "+" - << controlLength_ << "+" << lengthLength << " out of " - << outputLength << " bytes.\n" << logofs_flush; - #endif - - #if defined(TEST) || defined(INFO) || defined(FLUSH) || defined(TIME) - - T_timestamp nowTs = getTimestamp(); - - *logofs << "Proxy: FLUSH! Immediate with blocked " << transport_ -> - blocked() << " length " << transport_ -> length() - << " new " << outputLength << " flushable " << transport_ -> - flushable() << " tokens " << tokens_[token_control].remaining - << " after " << diffTimestamp(timeouts_.writeTs, nowTs) - << " Ms.\n" << logofs_flush; - - *logofs << "Proxy: FLUSH! Immediate flush to proxy FD#" << fd_ - << " of " << outputLength << " bytes at " << strMsTimestamp() - << " with priority " << priority_ << ".\n" << logofs_flush; - - *logofs << "Proxy: FLUSH! Current bitrate is " - << statistics -> getBitrateInShortFrame() << " with " - << statistics -> getBitrateInLongFrame() << " in the " - << "long frame and top " << statistics -> - getTopBitrate() << ".\n" << logofs_flush; - #endif - - statistics -> addWriteOut(); - - int result = transport_ -> write(write_immediate, outputMessage, outputLength); - - #ifdef TIME - - if (diffTimestamp(timeouts_.writeTs, nowTs) > 50) - { - *logofs << "Proxy: WARNING! TIME! Data written to proxy FD#" - << fd_ << " at " << strMsTimestamp() << " after " - << diffTimestamp(timeouts_.writeTs, nowTs) - << " Ms.\n" << logofs_flush; - } - - #endif - - #ifdef DUMP - *logofs << "Proxy: Sent " << outputLength << " bytes of data " - << "with checksum "; - - DumpChecksum(outputMessage, outputLength); - - *logofs << " on proxy FD#" << fd_ << ".\n" << logofs_flush; - #endif - - #ifdef DUMP - *logofs << "Proxy: Partial checksums are:\n"; - - DumpBlockChecksums(outputMessage, outputLength, 256); - - *logofs << logofs_flush; - #endif - - // - // Clean up the encode buffer and - // bring it to the initial size. - // - - encodeBuffer_.fullReset(); - - // - // Close the connection if we got - // an error. - // - - if (result < 0) - { - #ifdef TEST - *logofs << "Proxy: Failed write to proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - return -1; - } - - // - // Account for the data frame and the - // framing overhead. - // - - if (dataLength > 0) - { - statistics -> addFrameOut(); - } - - statistics -> addFramingBits((controlLength_ + lengthLength) << 3); - - controlLength_ = 0; - - // - // Reset all buffers, counters and the - // priority flag. - // - - handleResetFlush(); - - // - // Check if more data became available - // after writing. - // - - if (handleAsyncEvents() < 0) - { - return -1; - } - - // - // Drain the proxy link if we are in - // congestion state. - // - // if (needDrain() == 1 && draining_ == 0) - // { - // if (handleDrain() < 0) - // { - // return -1; - // } - // } - // - - return result; -} - -int Proxy::handleFlush() -{ - // - // We can have data in the encode buffer or - // control bytes to send. In the case make - // up a new frame. - // - - if (encodeBuffer_.getLength() + controlLength_ > 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Flushing data in the encode buffer.\n" - << logofs_flush; - #endif - - priority_ = 1; - - if (handleFrame(frame_data) < 0) - { - return -1; - } - } - - // - // Check if we have something to write. - // - - if (transport_ -> length() + transport_ -> flushable() == 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Nothing else to flush for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - return 0; - } - - #if defined(TEST) || defined(INFO) - - if (transport_ -> blocked() == 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Proxy descriptor FD#" << fd_ - << " has data to flush but the transport " - << "is not blocked.\n" << logofs_flush; - #endif - - cerr << "Error" << ": Proxy descriptor FD#" << fd_ - << " has data to flush but the transport " - << "is not blocked.\n"; - - HandleCleanup(); - } - - #endif - - #if defined(TEST) || defined(INFO) || defined(FLUSH) - *logofs << "Proxy: FLUSH! Deferred with blocked " << transport_ -> - blocked() << " length " << transport_ -> length() - << " flushable " << transport_ -> flushable() << " tokens " - << tokens_[token_control].remaining << ".\n" - << logofs_flush; - - *logofs << "Proxy: FLUSH! Deferred flush to proxy FD#" << fd_ - << " of " << transport_ -> length() + transport_ -> - flushable() << " bytes at " << strMsTimestamp() - << " with priority " << priority_ << ".\n" - << logofs_flush; - - *logofs << "Proxy: FLUSH! Current bitrate is " - << statistics -> getBitrateInShortFrame() << " with " - << statistics -> getBitrateInLongFrame() << " in the " - << "long frame and top " << statistics -> - getTopBitrate() << ".\n" << logofs_flush; - #endif - - statistics -> addWriteOut(); - - int result = transport_ -> flush(); - - if (result < 0) - { - return -1; - } - - // - // Reset the counters and update the - // timestamp of the last write. - // - - handleResetFlush(); - - return result; -} - -int Proxy::handleDrain() -{ - // - // If the proxy is run in the same process - // as SSH, we can't block or the program - // would not have a chance to read or write - // its data. - // - - if (control -> LinkEncrypted == 1) - { - return 0; - } - - if (needDrain() == 0 || draining_ == 1) - { - #if defined(TEST) || defined(INFO) - - if (draining_ == 1) - { - *logofs << "Proxy: WARNING! Already draining proxy FD#" - << fd_ << " at " << strMsTimestamp() << ".\n" - << logofs_flush; - } - else - { - *logofs << "Proxy: WARNING! No need to drain proxy FD#" - << fd_ << " with congestion " << congestion_ - << " length " << transport_ -> length() - << " and blocked " << transport_ -> blocked() - << ".\n" << logofs_flush; - } - - #endif - - return 0; - } - - draining_ = 1; - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Going to drain the proxy FD#" << fd_ - << " at " << strMsTimestamp() << ".\n" - << logofs_flush; - #endif - - int timeout = control -> PingTimeout / 2; - - T_timestamp startTs = getNewTimestamp(); - - T_timestamp nowTs = startTs; - - int remaining; - int result; - - // - // Keep draining the proxy socket while - // reading the incoming messages until - // the timeout is expired. - // - - for (;;) - { - remaining = timeout - diffTimestamp(startTs, nowTs); - - if (remaining <= 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Timeout raised while draining " - << "FD#" << fd_ << " at " << strMsTimestamp() - << " after " << diffTimestamp(startTs, nowTs) - << " Ms.\n" << logofs_flush; - #endif - - result = 0; - - goto ProxyDrainEnd; - } - - if (transport_ -> length() > 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Trying to write to FD#" << fd_ - << " at " << strMsTimestamp() << " with length " - << transport_ -> length() << " and " - << remaining << " Ms remaining.\n" - << logofs_flush; - #endif - - result = transport_ -> drain(0, remaining); - - if (result == -1) - { - result = -1; - - goto ProxyDrainEnd; - } - else if (result == 0 && transport_ -> readable() > 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Decoding more data from proxy FD#" - << fd_ << " at " << strMsTimestamp() << " with " - << transport_ -> length() << " bytes to write and " - << transport_ -> readable() << " readable.\n" - << logofs_flush; - #endif - - if (handleRead() < 0) - { - result = -1; - - goto ProxyDrainEnd; - } - } - #if defined(TEST) || defined(INFO) - else if (result == 1) - { - *logofs << "Proxy: Transport for proxy FD#" << fd_ - << " drained down to " << transport_ -> length() - << " bytes.\n" << logofs_flush; - } - #endif - } - else - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Waiting for more data from proxy " - << "FD#" << fd_ << " at " << strMsTimestamp() - << " with " << remaining << " Ms remaining.\n" - << logofs_flush; - #endif - - - result = transport_ -> wait(remaining); - - if (result == -1) - { - result = -1; - - goto ProxyDrainEnd; - } - else if (result > 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Decoding more data from proxy FD#" - << fd_ << " at " << strMsTimestamp() << " with " - << transport_ -> readable() << " bytes readable.\n" - << logofs_flush; - #endif - - if (handleRead() < 0) - { - result = -1; - - goto ProxyDrainEnd; - } - } - } - - // - // Check if we finally got the tokens - // that would allow us to come out of - // the congestion state. - // - - if (needDrain() == 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Got decongestion for proxy FD#" - << fd_ << " at " << strMsTimestamp() << " after " - << diffTimestamp(startTs, getTimestamp()) - << " Ms.\n" << logofs_flush; - #endif - - result = 1; - - goto ProxyDrainEnd; - } - - nowTs = getNewTimestamp(); - } - -ProxyDrainEnd: - - draining_ = 0; - - return result; -} - -int Proxy::handleFlush(int fd) -{ - int channelId = getChannel(fd); - - if (channelId < 0 || channels_[channelId] == NULL) - { - #ifdef TEST - *logofs << "Proxy: WARNING! Skipping flush on invalid " - << "descriptor FD#" << fd << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - return 0; - } - else if (channels_[channelId] -> getFinish() == 1) - { - #ifdef TEST - *logofs << "Proxy: Skipping flush on finishing " - << "descriptor FD#" << fd << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - return 0; - } - - #ifdef TEST - *logofs << "Proxy: Going to flush FD#" << fd - << " with blocked " << transports_[channelId] -> blocked() - << " length " << transports_[channelId] -> length() - << ".\n" << logofs_flush; - #endif - - if (channels_[channelId] -> handleFlush() < 0) - { - #ifdef TEST - *logofs << "Proxy: Failed to flush data to FD#" - << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - handleFinish(channelId); - - return -1; - } - - return 1; -} - -int Proxy::handleStatistics(int type, ostream *stream) -{ - if (stream == NULL || control -> EnableStatistics == 0) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! Cannot produce statistics " - << " for proxy FD#" << fd_ << ". Invalid settings " - << "for statistics or stream.\n" << logofs_flush; - #endif - - return 0; - } - else if (currentStatistics_ != NULL) - { - // - // Need to update the stream pointer as the - // previous one could have been destroyed. - // - - #ifdef WARNING - *logofs << "Proxy: WARNING! Replacing stream while producing " - << "statistics in stream at " << currentStatistics_ - << " for proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - } - - currentStatistics_ = stream; - - // - // Get statistics of remote peer. - // - - if (handleControl(code_statistics_request, type) < 0) - { - return -1; - } - - return 1; -} - -int Proxy::handleStatisticsFromProxy(int type) -{ - if (needFlush() == 1) - { - #if defined(TEST) || defined(INFO) || defined(FLUSH) - *logofs << "Proxy: WARNING! Data for the previous " - << "channel ID#" << outputChannel_ - << " flushed in statistics.\n" - << logofs_flush; - #endif - - if (handleFrame(frame_data) < 0) - { - return -1; - } - } - - if (control -> EnableStatistics == 1) - { - // - // Allocate a buffer for the output. - // - - char *buffer = new char[STATISTICS_LENGTH]; - - *buffer = '\0'; - - if (control -> ProxyMode == proxy_client) - { - #ifdef TEST - *logofs << "Proxy: Producing " - << (type == TOTAL_STATS ? "total" : "partial") - << " client statistics for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - statistics -> getClientProtocolStats(type, buffer); - - statistics -> getClientOverallStats(type, buffer); - } - else - { - #ifdef TEST - *logofs << "Proxy: Producing " - << (type == TOTAL_STATS ? "total" : "partial") - << " server statistics for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - statistics -> getServerProtocolStats(type, buffer); - } - - if (type == PARTIAL_STATS) - { - statistics -> resetPartialStats(); - } - - unsigned int length = strlen((char *) buffer) + 1; - - encodeBuffer_.encodeValue(type, 8); - - encodeBuffer_.encodeValue(length, 32); - - #ifdef TEST - *logofs << "Proxy: Encoding " << length - << " bytes of statistics data for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - encodeBuffer_.encodeMemory((unsigned char *) buffer, length); - - // - // Account statistics data as framing bits. - // - - statistics -> addFramingBits(length << 3); - - delete [] buffer; - } - else - { - #ifdef WARNING - *logofs << "Proxy: WARNING! Got statistics request " - << "but local statistics are disabled.\n" - << logofs_flush; - #endif - - cerr << "Warning" << ": Got statistics request " - << "but local statistics are disabled.\n"; - - type = NO_STATS; - - encodeBuffer_.encodeValue(type, 8); - - #ifdef TEST - *logofs << "Proxy: Sending error code to remote proxy on FD#" - << fd_ << ".\n" << logofs_flush; - #endif - } - - // - // The next write will flush the statistics - // data and the control message. - // - - if (handleControl(code_statistics_reply, type) < 0) - { - return -1; - } - - return 1; -} - -int Proxy::handleStatisticsFromProxy(const unsigned char *message, unsigned int length) -{ - if (currentStatistics_ == NULL) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! Unexpected statistics data received " - << "from remote proxy on FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - cerr << "Warning" << ": Unexpected statistics data received " - << "from remote proxy.\n"; - - return 0; - } - - // - // Allocate the decode buffer and at least - // the 'type' field to see if there was an - // error. - // - - DecodeBuffer decodeBuffer(message, length); - - unsigned int type; - - decodeBuffer.decodeValue(type, 8); - - if (type == NO_STATS) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Couldn't get statistics from remote " - << "proxy on FD#" << fd_ << ".\n" << logofs_flush; - #endif - - cerr << "Error" << ": Couldn't get statistics from remote proxy.\n"; - } - else if (type != TOTAL_STATS && type != PARTIAL_STATS) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Cannot produce statistics " - << "with qualifier '" << type << "'.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Cannot produce statistics " - << "with qualifier '" << type << "'.\n"; - - return -1; - } - else - { - unsigned int size; - - decodeBuffer.decodeValue(size, 32); - - char *buffer = new char[STATISTICS_LENGTH]; - - *buffer = '\0'; - - if (control -> EnableStatistics == 1) - { - if (control -> ProxyMode == proxy_client) - { - #ifdef TEST - *logofs << "Proxy: Finalizing " - << (type == TOTAL_STATS ? "total" : "partial") - << " client statistics for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - statistics -> getClientCacheStats(type, buffer); - - #ifdef TEST - *logofs << "Proxy: Decoding " << size - << " bytes of statistics data for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size); - - statistics -> getClientProtocolStats(type, buffer); - - statistics -> getClientOverallStats(type, buffer); - } - else - { - #ifdef TEST - *logofs << "Proxy: Finalizing " - << (type == TOTAL_STATS ? "total" : "partial") - << " server statistics for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - statistics -> getServerCacheStats(type, buffer); - - statistics -> getServerProtocolStats(type, buffer); - - #ifdef TEST - *logofs << "Proxy: Decoding " << size - << " bytes of statistics data for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size); - } - - if (type == PARTIAL_STATS) - { - statistics -> resetPartialStats(); - } - - *currentStatistics_ << buffer; - - // - // Mark the end of text to help external parsing. - // - - *currentStatistics_ << '\4'; - - *currentStatistics_ << flush; - } - else - { - // - // It can be that statistics were enabled at the time - // we issued the request (otherwise we could not have - // set the stream), but now they have been disabled - // by user. We must decode statistics data if we want - // to keep the connection. - // - - #ifdef TEST - *logofs << "Proxy: Discarding " << size - << " bytes of statistics data for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size); - } - - delete [] buffer; - } - - currentStatistics_ = NULL; - - return 1; -} - -int Proxy::handleNegotiation(const unsigned char *message, unsigned int length) -{ - #ifdef PANIC - *logofs << "Proxy: PANIC! Writing data during proxy " - << "negotiation is not implemented.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Writing data during proxy " - << "negotiation is not implemented.\n"; - - return -1; -} - -int Proxy::handleNegotiationFromProxy(const unsigned char *message, unsigned int length) -{ - #ifdef PANIC - *logofs << "Proxy: PANIC! Reading data during proxy " - << "negotiation is not implemented.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Reading data during proxy " - << "negotiation is not implemented.\n"; - - return -1; -} - -int Proxy::handleAlert(int alert) -{ - if (handleControl(code_alert_request, alert) < 0) - { - return -1; - } - - return 1; -} - -int Proxy::handleCloseConnection(int clientFd) -{ - int channelId = getChannel(clientFd); - - if (channels_[channelId] != NULL && - channels_[channelId] -> getFinish() == 0) - { - #ifdef TEST - *logofs << "Proxy: Closing down the channel for FD#" - << clientFd << ".\n" << logofs_flush; - #endif - - if (handleFinish(channelId) < 0) - { - return -1; - } - - return 1; - } - - return 0; -} - -int Proxy::handleCloseAllXConnections() -{ - #ifdef TEST - *logofs << "Proxy: Closing down any remaining X channel.\n" - << logofs_flush; - #endif - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] != NULL && - channels_[channelId] -> getType() == channel_x11 && - channels_[channelId] -> getFinish() == 0) - { - #ifdef TEST - *logofs << "Proxy: Closing down the channel for FD#" - << getFd(channelId) << ".\n" << logofs_flush; - #endif - - if (handleFinish(channelId) < 0) - { - return -1; - } - } - } - - return 1; -} - -int Proxy::handleCloseAllListeners() -{ - // Since ProtoStep7 (#issue 108) - if (finish_ == 0) - { - #ifdef TEST - *logofs << "Proxy: Closing down all remote listeners.\n" - << logofs_flush; - #endif - - if (handleControl(code_finish_listeners) < 0) - { - return -1; - } - - finish_ = 1; - } - - return 1; -} - -void Proxy::handleResetAlert() -{ - if (alert_ != 0) - { - #ifdef TEST - *logofs << "Proxy: The proxy alert '" << alert_ - << "' was displaced.\n" << logofs_flush; - #endif - - alert_ = 0; - } - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] != NULL) - { - channels_[channelId] -> handleResetAlert(); - } - } -} - -int Proxy::handleFinish(int channelId) -{ - // - // Send any outstanding encoded data and - // do any finalization needed on the - // channel. - // - - if (needFlush(channelId) == 1) - { - if (channels_[channelId] -> getFinish() == 1) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! The finishing channel ID#" - << channelId << " has data to flush.\n" - << logofs_flush; - #endif - } - - #if defined(TEST) || defined(INFO) || defined(FLUSH) - *logofs << "Proxy: WARNING! Flushing data for the " - << "finishing channel ID#" << channelId - << ".\n" << logofs_flush; - #endif - - if (handleFrame(frame_data) < 0) - { - return -1; - } - } - - // - // Reset the congestion state and the - // timeouts, if needed. - // - - congestions_[channelId] = 0; - - setSplitTimeout(channelId); - setMotionTimeout(channelId); - - if (channels_[channelId] -> getFinish() == 0) - { - channels_[channelId] -> handleFinish(); - - // - // Force a failure in the case somebody - // would try to read from the channel. - // - - shutdown(getFd(channelId), SHUT_RD); - - // - // If the failure was not originated by - // the remote, send a channel shutdown - // message. - // - - if (channels_[channelId] -> getClosing() == 0) - { - #ifdef TEST - *logofs << "Proxy: Finishing channel for FD#" - << getFd(channelId) << " channel ID#" - << channelId << " because of failure.\n" - << logofs_flush; - #endif - - if (handleControl(code_finish_connection, channelId) < 0) - { - return -1; - } - } - } - - return 1; -} - -int Proxy::handleFinishFromProxy(int channelId) -{ - // - // Check if this channel has pending - // data to send. - // - - if (needFlush(channelId) == 1) - { - #if defined(TEST) || defined(INFO) || defined(FLUSH) - *logofs << "Proxy: WARNING! Flushing data for the " - << "finishing channel ID#" << channelId - << ".\n" << logofs_flush; - #endif - - if (handleFrame(frame_data) < 0) - { - return -1; - } - } - - // - // Mark the channel. We will free its - // resources at the next loop and will - // send the drop message to the remote. - // - - if (channels_[channelId] -> getClosing() == 0) - { - #ifdef TEST - *logofs << "Proxy: Marking channel for FD#" - << getFd(channelId) << " channel ID#" - << channelId << " as closing.\n" - << logofs_flush; - #endif - - channels_[channelId] -> handleClosing(); - } - - if (channels_[channelId] -> getFinish() == 0) - { - #ifdef TEST - *logofs << "Proxy: Finishing channel for FD#" - << getFd(channelId) << " channel ID#" - << channelId << " because of proxy.\n" - << logofs_flush; - #endif - - channels_[channelId] -> handleFinish(); - } - - if (handleFinish(channelId) < 0) - { - return -1; - } - - return 1; -} - -int Proxy::handleDropFromProxy(int channelId) -{ - // - // Only mark the channel. - // - - #ifdef TEST - *logofs << "Proxy: Marking channel for FD#" - << getFd(channelId) << " channel ID#" - << channelId << " as being dropped.\n" - << logofs_flush; - #endif - - if (channels_[channelId] -> getDrop() == 0) - { - channels_[channelId] -> handleDrop(); - } - - return 1; -} - -// -// Close the channel and deallocate all its -// resources. -// - -int Proxy::handleDrop(int channelId) -{ - // - // Check if this channel has pending - // data to send. - // - - if (needFlush(channelId) == 1) - { - if (channels_[channelId] -> getFinish() == 1) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! The dropping channel ID#" - << channelId << " has data to flush.\n" - << logofs_flush; - #endif - } - - #if defined(TEST) || defined(INFO) || defined(FLUSH) - *logofs << "Proxy: WARNING! Flushing data for the " - << "dropping channel ID#" << channelId - << ".\n" << logofs_flush; - #endif - - if (handleFrame(frame_data) < 0) - { - return -1; - } - } - - #ifdef TEST - *logofs << "Proxy: Dropping channel for FD#" - << getFd(channelId) << " channel ID#" - << channelId << ".\n" << logofs_flush; - #endif - - if (channels_[channelId] -> getFinish() == 0) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! The channel for FD#" - << getFd(channelId) << " channel ID#" - << channelId << " was not marked as " - << "finishing.\n" << logofs_flush; - #endif - - cerr << "Warning" << ": The channel for FD#" - << getFd(channelId) << " channel ID#" - << channelId << " was not marked as " - << "finishing.\n"; - - channels_[channelId] -> handleFinish(); - } - - // - // Send the channel shutdown message - // to the peer proxy. - // - - if (channels_[channelId] -> getClosing() == 1) - { - if (handleControl(code_drop_connection, channelId) < 0) - { - return -1; - } - } - - // - // Get rid of the channel. - // - - if (channels_[channelId] -> getType() != channel_x11) - { - #ifdef TEST - *logofs << "Proxy: Closed connection to " - << getTypeName(channels_[channelId] -> getType()) - << " server.\n" << logofs_flush; - #endif - - cerr << "Info" << ": Closed connection to " - << getTypeName(channels_[channelId] -> getType()) - << " server.\n"; - } - - delete channels_[channelId]; - channels_[channelId] = NULL; - - cleanupChannelMap(channelId); - - // - // Get rid of the transport. - // - - deallocateTransport(channelId); - - congestions_[channelId] = 0; - - decreaseChannels(channelId); - - // - // Check if the channel was the - // one currently selected for - // output. - // - - if (outputChannel_ == channelId) - { - outputChannel_ = -1; - } - - return 1; -} - -// -// Send an empty message to the remote peer -// to verify if the link is alive and let -// the remote proxy detect a congestion. -// - -int Proxy::handlePing() -{ - T_timestamp nowTs = getTimestamp(); - - #if defined(DEBUG) || defined(PING) - - *logofs << "Proxy: Checking ping at " - << strMsTimestamp(nowTs) << logofs_flush; - - *logofs << " with last loop at " - << strMsTimestamp(timeouts_.loopTs) << ".\n" - << logofs_flush; - - *logofs << "Proxy: Last bytes in at " - << strMsTimestamp(timeouts_.readTs) << logofs_flush; - - *logofs << " last bytes out at " - << strMsTimestamp(timeouts_.writeTs) << ".\n" - << logofs_flush; - - *logofs << "Proxy: Last ping at " - << strMsTimestamp(timeouts_.pingTs) << ".\n" - << logofs_flush; - - #endif - - // - // Be sure we take into account any clock drift. This - // can be caused by the user changing the system timer - // or by small adjustments introduced by the operating - // system making the clock go backward. - // - - if (checkDiffTimestamp(timeouts_.loopTs, nowTs) == 0) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! Detected drift in system " - << "timer. Resetting to current time.\n" - << logofs_flush; - #endif - - timeouts_.pingTs = nowTs; - timeouts_.readTs = nowTs; - timeouts_.writeTs = nowTs; - } - - // - // Check timestamp of last read from remote proxy. It can - // happen that we stayed in the main loop long enough to - // have idle timeout expired, for example if the proxy was - // stopped and restarted or because of an extremely high - // load of the system. In this case we don't complain if - // there is something new to read from the remote. - // - - int diffIn = diffTimestamp(timeouts_.readTs, nowTs); - - if (diffIn >= (control -> PingTimeout * 2) - - control -> LatencyTimeout) - { - // - // Force a read to detect whether the remote proxy - // aborted the connection. - // - - int result = handleRead(); - - if (result < 0) - { - #if defined(TEST) || defined(INFO) || defined(PING) - *logofs << "Proxy: WARNING! Detected shutdown waiting " - << "for the ping after " << diffIn / 1000 - << " seconds.\n" << logofs_flush; - #endif - - return -1; - } - else if (result > 0) - { - diffIn = diffTimestamp(timeouts_.readTs, nowTs); - - if (handleFlush() < 0) - { - return -1; - } - } - } - - if (diffIn >= (control -> PingTimeout * 2) - - control -> LatencyTimeout) - { - #if defined(TEST) || defined(INFO) || defined(PING) - *logofs << "Proxy: Detected congestion at " - << strMsTimestamp() << " with " << diffIn / 1000 - << " seconds since the last read.\n" - << logofs_flush; - #endif - - // - // There are two types of proxy congestion. The first, - // affecting the ability of the proxy to write the - // encoded data to the network, is controlled by the - // congestion_ flag. The flag is raised when no data - // is received from the remote proxy within a timeout. - // On the X client side, the flag is also raised when - // the proxy runs out of tokens. - // - - if (control -> ProxyMode == proxy_server) - { - // - // At X server side we must return to read data - // from the channels after a while, because we - // need to give a chance to the channel to read - // the key sequence CTRL+ALT+SHIFT+ESC. - // - - if (congestion_ == 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Forcibly entering congestion due to " - << "timeout with " << tokens_[token_control].remaining - << " tokens.\n" << logofs_flush; - #endif - - congestion_ = 1; - } - else - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Forcibly exiting congestion due to " - << "timeout with " << tokens_[token_control].remaining - << " tokens.\n" << logofs_flush; - #endif - - congestion_ = 0; - } - } - else - { - #if defined(TEST) || defined(INFO) - - if (congestion_ == 0) - { - *logofs << "Proxy: Entering congestion due to timeout " - << "with " << tokens_[token_control].remaining - << " tokens.\n" << logofs_flush; - } - - #endif - - congestion_ = 1; - } - - if (control -> ProxyTimeout > 0 && - diffIn >= (control -> ProxyTimeout - - control -> LatencyTimeout)) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! No data received from " - << "remote proxy on FD#" << fd_ << " within " - << (diffIn + control -> LatencyTimeout) / 1000 - << " seconds.\n" << logofs_flush; - #endif - - cerr << "Error" << ": No data received from remote " - << "proxy within " << (diffIn + control -> - LatencyTimeout) / 1000 << " seconds.\n"; - - HandleAbort(); - } - else - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: WARNING! No data received from " - << "remote proxy on FD#" << fd_ << " since " - << diffIn << " Ms.\n" << logofs_flush; - #endif - - if (control -> ProxyTimeout > 0 && - isTimestamp(timeouts_.alertTs) == 0 && - diffIn >= (control -> ProxyTimeout - - control -> LatencyTimeout) / 4) - { - // - // If we are in the middle of a shutdown - // procedure but the remote is not resp- - // onding, force the closure of the link. - // - - if (finish_ != 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! No response received from " - << "the remote proxy on FD#" << fd_ << " while " - << "waiting for the shutdown.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": No response received from remote " - << "proxy while waiting for the shutdown.\n"; - - HandleAbort(); - } - else - { - cerr << "Warning" << ": No data received from remote " - << "proxy within " << (diffIn + control -> - LatencyTimeout) / 1000 << " seconds.\n"; - - if (alert_ == 0) - { - if (control -> ProxyMode == proxy_client) - { - alert_ = CLOSE_DEAD_PROXY_CONNECTION_CLIENT_ALERT; - } - else - { - alert_ = CLOSE_DEAD_PROXY_CONNECTION_SERVER_ALERT; - } - - HandleAlert(alert_, 1); - } - - timeouts_.alertTs = nowTs; - } - } - } - } - - // - // Check if we need to update the congestion - // counter. - // - - int diffOut = diffTimestamp(timeouts_.writeTs, nowTs); - - if (agent_ != nothing && congestions_[agent_] == 0 && - statistics -> getCongestionInFrame() >= 1 && - diffOut >= (control -> IdleTimeout - - control -> LatencyTimeout * 5)) - { - #if defined(TEST) || defined(INFO) || defined(PING) - *logofs << "Proxy: Forcing an update of the " - << "congestion counter after timeout.\n" - << logofs_flush; - #endif - - statistics -> updateCongestion(tokens_[token_control].remaining, - tokens_[token_control].limit); - } - - // - // Send a new token if we didn't send any data to - // the remote for longer than the ping timeout. - // The client side sends a token, the server side - // responds with a token reply. - // - // VMWare virtual machines can have the system - // timer deadly broken. Try to send a ping regard- - // less we are the client or the server proxy to - // force a write by the remote. - // - - if (control -> ProxyMode == proxy_client || - diffIn >= (control -> PingTimeout * 4) - - control -> LatencyTimeout) - { - // - // We need to send a new ping even if we didn't - // receive anything from the remote within the - // ping timeout. The server side will respond - // to our ping, so we use the ping to force the - // remote end to send some data. - // - - if (diffIn >= (control -> PingTimeout - - control -> LatencyTimeout * 5) || - diffOut >= (control -> PingTimeout - - control -> LatencyTimeout * 5)) - { - int diffPing = diffTimestamp(timeouts_.pingTs, nowTs); - - if (diffPing < 0 || diffPing >= (control -> PingTimeout - - control -> LatencyTimeout * 5)) - { - #if defined(TEST) || defined(INFO) || defined(PING) - *logofs << "Proxy: Sending a new ping at " << strMsTimestamp() - << " with " << tokens_[token_control].remaining - << " tokens and elapsed in " << diffIn << " out " - << diffOut << " ping " << diffPing - << ".\n" << logofs_flush; - #endif - - if (handleFrame(frame_ping) < 0) - { - return -1; - } - - timeouts_.pingTs = nowTs; - } - #if defined(TEST) || defined(INFO) || defined(PING) - else - { - *logofs << "Proxy: Not sending a new ping with " - << "elapsed in " << diffIn << " out " - << diffOut << " ping " << diffPing - << ".\n" << logofs_flush; - } - #endif - } - } - - return 1; -} - -int Proxy::handleSyncFromProxy(int channelId) -{ - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: WARNING! Received a synchronization " - << "request from the remote proxy.\n" - << logofs_flush; - #endif - - if (handleControl(code_sync_reply, channelId) < 0) - { - return -1; - } - - return 1; -} - -int Proxy::handleResetStores() -{ - // - // Recreate the message stores. - // - - delete clientStore_; - delete serverStore_; - - clientStore_ = new ClientStore(compressor_); - serverStore_ = new ServerStore(compressor_); - - timeouts_.loadTs = nullTimestamp(); - - // - // Replace message stores in channels. - // - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] != NULL) - { - if (channels_[channelId] -> setStores(clientStore_, serverStore_) < 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Failed to replace message stores in " - << "channel for FD#" << getFd(channelId) << ".\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Failed to replace message stores in " - << "channel for FD#" << getFd(channelId) << ".\n"; - - return -1; - } - #ifdef TEST - else - { - *logofs << "Proxy: Replaced message stores in channel " - << "for FD#" << getFd(channelId) << ".\n" - << logofs_flush; - } - #endif - } - } - - return 1; -} - -int Proxy::handleResetPersistentCache() -{ - char *fullName = new char[strlen(control -> PersistentCachePath) + - strlen(control -> PersistentCacheName) + 2]; - - strcpy(fullName, control -> PersistentCachePath); - strcat(fullName, "/"); - strcat(fullName, control -> PersistentCacheName); - - #ifdef TEST - *logofs << "Proxy: Going to remove persistent cache file '" - << fullName << "'\n" << logofs_flush; - #endif - - unlink(fullName); - - delete [] fullName; - - delete [] control -> PersistentCacheName; - - control -> PersistentCacheName = NULL; - - return 1; -} - -void Proxy::handleResetFlush() -{ - #ifdef TEST - *logofs << "Proxy: Going to reset flush counters " - << "for proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - // - // Reset the proxy priority flag. - // - - priority_ = 0; - - // - // Restore buffers to their initial - // size. - // - - transport_ -> partialReset(); - - // - // Update the timestamp of the last - // write operation performed on the - // socket. - // - - timeouts_.writeTs = getTimestamp(); -} - -int Proxy::handleFinish() -{ - // - // Reset the timestamps to give the proxy - // another chance to show the 'no response' - // dialog if the shutdown message doesn't - // come in time. - // - - timeouts_.readTs = getTimestamp(); - - timeouts_.alertTs = nullTimestamp(); - - finish_ = 1; - - return 1; -} - -int Proxy::handleShutdown() -{ - // - // Send shutdown message to remote proxy. - // - - shutdown_ = 1; - - handleControl(code_shutdown_request); - - #ifdef TEST - *logofs << "Proxy: Starting shutdown procedure " - << "for proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - // - // Ensure that all the data accumulated - // in the transport buffer is flushed - // to the network layer. - // - - for (int i = 0; i < 100; i++) - { - if (canFlush() == 1) - { - handleFlush(); - } - else - { - break; - } - - usleep(100000); - } - - // - // Now wait for the network layers to - // consume all the data. - // - - for (int i = 0; i < 100; i++) - { - if (transport_ -> queued() <= 0) - { - break; - } - - usleep(100000); - } - - // - // Give time to the remote end to read - // the shutdown message and close the - // connection. - // - - transport_ -> wait(10000); - - #ifdef TEST - *logofs << "Proxy: Ending shutdown procedure " - << "for proxy FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - return 1; -} - -int Proxy::handleChannelConfiguration() -{ - if (activeChannels_.getSize() == 0) - { - #ifdef TEST - *logofs << "Proxy: Going to initialize the static " - << "members in channels for proxy FD#" - << fd_ << ".\n" << logofs_flush; - #endif - - Channel::setReferences(); - - ClientChannel::setReferences(); - ServerChannel::setReferences(); - - GenericChannel::setReferences(); - } - - return 1; -} - -int Proxy::handleSocketConfiguration() -{ - // - // Set linger mode on proxy to correctly - // get shutdown notification. - // - - SetLingerTimeout(fd_, 30); - - // - // Set keep-alive on socket so that if remote link - // terminates abnormally (as killed hard or because - // of a power-off) process will get a SIGPIPE. In - // practice this is useless as proxies already ping - // each other every few seconds. - // - - if (control -> OptionProxyKeepAlive == 1) - { - SetKeepAlive(fd_); - } - - // - // Set 'priority' flag at TCP layer for path - // proxy-to-proxy. Look at IPTOS_LOWDELAY in - // man 7 ip. - // - - if (control -> OptionProxyLowDelay == 1) - { - SetLowDelay(fd_); - } - - // - // Update size of TCP send and receive buffers. - // - - if (control -> OptionProxySendBuffer != -1) - { - SetSendBuffer(fd_, control -> OptionProxySendBuffer); - } - - if (control -> OptionProxyReceiveBuffer != -1) - { - SetReceiveBuffer(fd_, control -> OptionProxyReceiveBuffer); - } - - // - // Update TCP_NODELAY settings. Note that on old Linux - // kernels turning off the Nagle algorithm didn't work - // when proxy was run through a PPP link. Trying to do - // so caused the kernel to stop delivering data to us - // if a serious network congestion was encountered. - // - - if (control -> ProxyMode == proxy_client) - { - if (control -> OptionProxyClientNoDelay != -1) - { - SetNoDelay(fd_, control -> OptionProxyClientNoDelay); - } - } - else - { - if (control -> OptionProxyServerNoDelay != -1) - { - SetNoDelay(fd_, control -> OptionProxyServerNoDelay); - } - } - - return 1; -} - -int Proxy::handleLinkConfiguration() -{ - #ifdef TEST - *logofs << "Proxy: Propagating parameters to " - << "channels' read buffers.\n" - << logofs_flush; - #endif - - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] != NULL) - { - channels_[channelId] -> handleConfiguration(); - } - } - - #ifdef TEST - *logofs << "Proxy: Propagating parameters to " - << "proxy buffers.\n" - << logofs_flush; - #endif - - readBuffer_.setSize(control -> ProxyInitialReadSize, - control -> ProxyMaximumBufferSize); - - encodeBuffer_.setSize(control -> TransportProxyBufferSize, - control -> TransportProxyBufferThreshold, - control -> TransportMaximumBufferSize); - - transport_ -> setSize(control -> TransportProxyBufferSize, - control -> TransportProxyBufferThreshold, - control -> TransportMaximumBufferSize); - - #ifdef TEST - *logofs << "Proxy: Configuring the proxy timeouts.\n" - << logofs_flush; - #endif - - timeouts_.split = control -> SplitTimeout; - timeouts_.motion = control -> MotionTimeout; - - #ifdef TEST - *logofs << "Proxy: Configuring the proxy tokens.\n" - << logofs_flush; - #endif - - tokens_[token_control].size = control -> TokenSize; - tokens_[token_control].limit = control -> TokenLimit; - - if (tokens_[token_control].limit < 1) - { - tokens_[token_control].limit = 1; - } - - #if defined(TEST) || defined(INFO) || defined(LIMIT) - *logofs << "Proxy: TOKEN! LIMIT! Setting token [" - << DumpToken(token_control) << "] size to " - << tokens_[token_control].size << " and limit to " - << tokens_[token_control].limit << ".\n" - << logofs_flush; - #endif - - tokens_[token_split].size = control -> TokenSize; - tokens_[token_split].limit = control -> TokenLimit / 2; - - if (tokens_[token_split].limit < 1) - { - tokens_[token_split].limit = 1; - } - - #if defined(TEST) || defined(INFO) || defined(LIMIT) - *logofs << "Proxy: TOKEN! LIMIT! Setting token [" - << DumpToken(token_split) << "] size to " - << tokens_[token_split].size << " and limit to " - << tokens_[token_split].limit << ".\n" - << logofs_flush; - #endif - - tokens_[token_data].size = control -> TokenSize; - tokens_[token_data].limit = control -> TokenLimit / 4; - - if (tokens_[token_data].limit < 1) - { - tokens_[token_data].limit = 1; - } - - #if defined(TEST) || defined(INFO) || defined(LIMIT) - *logofs << "Proxy: TOKEN! LIMIT! Setting token [" - << DumpToken(token_data) << "] size to " - << tokens_[token_data].size << " and limit to " - << tokens_[token_data].limit << ".\n" - << logofs_flush; - #endif - - for (int i = token_control; i <= token_data; i++) - { - tokens_[i].remaining = tokens_[i].limit; - } - - #if defined(TEST) || defined(INFO) || defined(LIMIT) - *logofs << "Proxy: LIMIT! Using client bitrate " - << "limit " << control -> ClientBitrateLimit - << " server bitrate limit " << control -> - ServerBitrateLimit << " with local limit " - << control -> LocalBitrateLimit << ".\n" - << logofs_flush; - #endif - - // - // Set the other parameters based on - // the token size. - // - - int base = control -> TokenSize; - - control -> SplitDataThreshold = base * 4; - control -> SplitDataPacketLimit = base / 2; - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: LIMIT! Setting split data threshold " - << "to " << control -> SplitDataThreshold - << " split packet limit to " << control -> - SplitDataPacketLimit << " with base " - << base << ".\n" << logofs_flush; - #endif - - // - // Set the number of bytes read from the - // data channels at each loop. This will - // basically determine the maximum band- - // width available for the generic chan- - // nels. - // - - control -> GenericInitialReadSize = base / 2; - control -> GenericMaximumBufferSize = base / 2; - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: LIMIT! Setting generic channel " - << "initial read size to " << control -> - GenericInitialReadSize << " maximum read " - << "size to " << control -> GenericMaximumBufferSize - << " with base " << base << ".\n" - << logofs_flush; - #endif - - return 1; -} - -int Proxy::handleCacheConfiguration() -{ - #ifdef TEST - *logofs << "Proxy: Configuring cache according to pack parameters.\n" - << logofs_flush; - #endif - - // - // Further adjust the cache parameters. If - // packing of the images is enabled, reduce - // the size available for plain images. - // - - if (control -> SessionMode == session_agent) - { - if (control -> PackMethod != NO_PACK) - { - clientStore_ -> getRequestStore(X_PutImage) -> - cacheThreshold = PUTIMAGE_CACHE_THRESHOLD_IF_PACKED; - - clientStore_ -> getRequestStore(X_PutImage) -> - cacheLowerThreshold = PUTIMAGE_CACHE_LOWER_THRESHOLD_IF_PACKED; - } - } - - // - // If this is a shadow session increase the - // size of the image cache. - // - - if (control -> SessionMode == session_shadow) - { - if (control -> PackMethod != NO_PACK) - { - clientStore_ -> getRequestStore(X_NXPutPackedImage) -> - cacheThreshold = PUTPACKEDIMAGE_CACHE_THRESHOLD_IF_PACKED_SHADOW; - - clientStore_ -> getRequestStore(X_NXPutPackedImage) -> - cacheLowerThreshold = PUTPACKEDIMAGE_CACHE_LOWER_THRESHOLD_IF_PACKED_SHADOW; - } - else - { - clientStore_ -> getRequestStore(X_PutImage) -> - cacheThreshold = PUTIMAGE_CACHE_THRESHOLD_IF_SHADOW; - - clientStore_ -> getRequestStore(X_PutImage) -> - cacheLowerThreshold = PUTIMAGE_CACHE_LOWER_THRESHOLD_IF_SHADOW; - } - } - - return 1; -} - -int Proxy::handleSaveStores() -{ - // - // Save content of stores on disk. - // - - char *cacheToAdopt = NULL; - - // - // Set to false the indicator for cumulative store - // size too small - // - bool isTooSmall = false; - - if (control -> PersistentCacheEnableSave) - { - #ifdef TEST - *logofs << "Proxy: Going to save content of client store.\n" - << logofs_flush; - #endif - - cacheToAdopt = handleSaveAllStores(control -> PersistentCachePath, isTooSmall); - } - #ifdef TEST - else - { - if (control -> ProxyMode == proxy_client) - { - *logofs << "Proxy: Saving persistent cache to disk disabled.\n" - << logofs_flush; - } - else - { - *logofs << "Proxy: PANIC! Protocol violation in command save.\n" - << logofs_flush; - - cerr << "Error" << ": Protocol violation in command save.\n"; - - HandleCleanup(); - } - } - #endif - - if (cacheToAdopt != NULL) - { - // - // Do we have a cache already? - // - - if (control -> PersistentCacheName != NULL) - { - // - // Check if old and new cache are the same. - // In this case don't remove the old cache. - // - - if (strcasecmp(control -> PersistentCacheName, cacheToAdopt) != 0) - { - handleResetPersistentCache(); - } - - delete [] control -> PersistentCacheName; - } - - #ifdef TEST - *logofs << "Proxy: Setting current persistent cache file to '" - << cacheToAdopt << "'\n" << logofs_flush; - #endif - - control -> PersistentCacheName = cacheToAdopt; - - return 1; - } - else - { - #ifdef TEST - *logofs << "Proxy: No cache file produced from message stores.\n" - << logofs_flush; - #endif - - // - // It can be that we didn't generate a new cache - // because store was too small or persistent cache - // was disabled. This is not an error. - // - - if (control -> PersistentCacheEnableSave && !isTooSmall) - { - return -1; - } - else - { - return 0; - } - } -} - -int Proxy::handleLoadStores() -{ - // - // Restore the content of the client store - // from disk if a valid cache was negotiated - // at session startup. - // - - if (control -> PersistentCacheEnableLoad == 1 && - control -> PersistentCachePath != NULL && - control -> PersistentCacheName != NULL) - { - #ifdef TEST - *logofs << "Proxy: Going to load content of client store.\n" - << logofs_flush; - #endif - - // - // Returns the same string passed as name of - // the cache, or NULL if it was not possible - // to load the cache from disk. - // - - if (handleLoadAllStores(control -> PersistentCachePath, - control -> PersistentCacheName) == NULL) - { - // - // The corrupted cache should have been - // removed from disk. Get rid of the - // reference so we don't try to delete - // it once again. - // - - if (control -> PersistentCacheName != NULL) - { - delete [] control -> PersistentCacheName; - } - - control -> PersistentCacheName = NULL; - - return -1; - } - - // - // Set timestamp of last time cache - // was loaded from data on disk. - // - - timeouts_.loadTs = getTimestamp(); - - return 1; - } - #ifdef TEST - else - { - if (control -> ProxyMode == proxy_client) - { - *logofs << "Proxy: Loading of cache disabled or no cache file selected.\n" - << logofs_flush; - } - else - { - *logofs << "Proxy: PANIC! Protocol violation in command load.\n" - << logofs_flush; - - cerr << "Error" << ": Protocol violation in command load.\n"; - - HandleCleanup(); - } - } - #endif - - return 0; -} - -int Proxy::handleControl(T_proxy_code code, int data) -{ - // - // Send the given control messages - // to the remote proxy. - // - - #if defined(TEST) || defined(INFO) - - if (data != -1) - { - if (code == code_control_token_reply || - code == code_split_token_reply || - code == code_data_token_reply) - { - *logofs << "Proxy: TOKEN! Sending message '" << DumpControl(code) - << "' at " << strMsTimestamp() << " with count " - << data << ".\n" << logofs_flush; - } - else - { - *logofs << "Proxy: Sending message '" << DumpControl(code) - << "' at " << strMsTimestamp() << " with data ID#" - << data << ".\n" << logofs_flush; - } - } - else - { - *logofs << "Proxy: Sending message '" << DumpControl(code) - << "' at " << strMsTimestamp() << ".\n" - << logofs_flush; - } - - #endif - - // - // Add the control message and see if the - // data has to be flushed immediately. - // - - if (addControlCodes(code, data) < 0) - { - return -1; - } - - switch (code) - { - // - // Append the first data read from the opened - // channel to the control code. - // - - case code_new_x_connection: - case code_new_cups_connection: - case code_new_aux_connection: - case code_new_smb_connection: - case code_new_media_connection: - case code_new_http_connection: - case code_new_font_connection: - case code_new_slave_connection: - - // - // Do we send the token reply immediately? - // The control messages are put at the begin- - // ning of the of the encode buffer, so we may - // reply to multiple tokens before having the - // chance of handling the actual frame data. - // On the other hand, the sooner we reply, the - // sooner the remote proxy is restarted. - // - - case code_control_token_reply: - case code_split_token_reply: - case code_data_token_reply: - { - break; - } - - // - // Also send the congestion control codes - // immediately. - // - // case code_begin_congestion: - // case code_end_congestion: - // - - default: - { - priority_ = 1; - - break; - } - } - - if (priority_ == 1) - { - if (handleFrame(frame_data) < 0) - { - return -1; - } - } - - return 1; -} - -int Proxy::handleSwitch(int channelId) -{ - // - // If data is for a different channel than last - // selected for output, prepend to the data the - // new channel id. - // - - #ifdef DEBUG - *logofs << "Proxy: Requested a switch with " - << "current channel ID#" << outputChannel_ - << " new channel ID#" << channelId << ".\n" - << logofs_flush; - #endif - - if (channelId != outputChannel_) - { - if (needFlush() == 1) - { - #if defined(TEST) || defined(INFO) || defined(FLUSH) - *logofs << "Proxy: WARNING! Flushing data for the " - << "previous channel ID#" << outputChannel_ - << ".\n" << logofs_flush; - #endif - - if (handleFrame(frame_data) < 0) - { - return -1; - } - } - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Sending message '" - << DumpControl(code_switch_connection) << "' at " - << strMsTimestamp() << " with FD#" << getFd(channelId) - << " channel ID#" << channelId << ".\n" - << logofs_flush; - #endif - - if (addControlCodes(code_switch_connection, channelId) < 0) - { - return -1; - } - - outputChannel_ = channelId; - } - - return 1; -} - -int Proxy::addTokenCodes(T_proxy_token &token) -{ - #if defined(TEST) || defined(INFO) || defined(TOKEN) - *logofs << "Proxy: TOKEN! Sending token [" - << DumpToken(token.type) << "] with " - << token.bytes << " bytes accumulated size " - << token.size << " and " << token.remaining - << " available.\n" << logofs_flush; - #endif - - // - // Give a 'weight' to the token. The tokens - // remaining can become negative if we sent - // a packet that was exceptionally big. - // - - int count = 0; - - // Since ProtoStep7 (#issue 108) - count = token.bytes / token.size; - - // - // Force a count of 1, for example - // if this is a ping. - // - - if (count < 1) - { - count = 1; - - token.bytes = 0; - } - else - { - // Since ProtoStep7 (#issue 108) - if (count > 255) - { - count = 255; - } - - // - // Let the next token account for the - // remaining bytes. - // - - token.bytes %= token.size; - } - - #if defined(TEST) || defined(INFO) || defined(TOKEN) - *logofs << "Proxy: Sending message '" - << DumpControl(token.request) << "' at " - << strMsTimestamp() << " with count " << count - << ".\n" << logofs_flush; - #endif - - controlCodes_[controlLength_++] = 0; - controlCodes_[controlLength_++] = (unsigned char) token.request; - controlCodes_[controlLength_++] = (unsigned char) count; - - statistics -> addFrameOut(); - - token.remaining -= count; - - return 1; -} - -int Proxy::handleToken(T_frame_type type) -{ - #if defined(TEST) || defined(INFO) || defined(TOKEN) - *logofs << "Proxy: TOKEN! Checking tokens with " - << "frame type ["; - - *logofs << (type == frame_ping ? "frame_ping" : "frame_data"); - - *logofs << "] with stream ratio " << statistics -> - getStreamRatio() << ".\n" << logofs_flush; - #endif - - if (type == frame_data) - { - // - // Since ProtoStep7 (#issue 108) - // - - // Send a distinct token for each data type. - // We don't want to slow down the sending of - // the X events, X replies and split confir- - // mation events on the X server side, so - // take care only of the generic data token. - // - - if (control -> ProxyMode == proxy_client) - { - statistics -> updateControlToken(tokens_[token_control].bytes); - - if (tokens_[token_control].bytes > tokens_[token_control].size) - { - if (addTokenCodes(tokens_[token_control]) < 0) - { - return -1; - } - - #if defined(TEST) || defined(INFO) || defined(TOKEN) - - T_proxy_token &token = tokens_[token_control]; - - *logofs << "Proxy: TOKEN! Token class [" - << DumpToken(token.type) << "] has now " - << token.bytes << " bytes accumulated and " - << token.remaining << " tokens remaining.\n" - << logofs_flush; - #endif - } - - statistics -> updateSplitToken(tokens_[token_split].bytes); - - if (tokens_[token_split].bytes > tokens_[token_split].size) - { - if (addTokenCodes(tokens_[token_split]) < 0) - { - return -1; - } - - #if defined(TEST) || defined(INFO) || defined(TOKEN) - - T_proxy_token &token = tokens_[token_split]; - - *logofs << "Proxy: TOKEN! Token class [" - << DumpToken(token.type) << "] has now " - << token.bytes << " bytes accumulated and " - << token.remaining << " tokens remaining.\n" - << logofs_flush; - #endif - } - } - - statistics -> updateDataToken(tokens_[token_data].bytes); - - if (tokens_[token_data].bytes > tokens_[token_data].size) - { - if (addTokenCodes(tokens_[token_data]) < 0) - { - return -1; - } - - #if defined(TEST) || defined(INFO) || defined(TOKEN) - - T_proxy_token &token = tokens_[token_data]; - - *logofs << "Proxy: TOKEN! Token class [" - << DumpToken(token.type) << "] has now " - << token.bytes << " bytes accumulated and " - << token.remaining << " tokens remaining.\n" - << logofs_flush; - #endif - } - } - else - { - if (addTokenCodes(tokens_[token_control]) < 0) - { - return -1; - } - - // - // Reset all counters on a ping. - // - - tokens_[token_control].bytes = 0; - tokens_[token_split].bytes = 0; - tokens_[token_data].bytes = 0; - - #if defined(TEST) || defined(INFO) || defined(TOKEN) - - T_proxy_token &token = tokens_[token_control]; - - *logofs << "Proxy: TOKEN! Token class [" - << DumpToken(token.type) << "] has now " - << token.bytes << " bytes accumulated and " - << token.remaining << " tokens remaining.\n" - << logofs_flush; - #endif - } - - // - // Check if we have entered in - // congestion state. - // - - if (congestion_ == 0 && - tokens_[token_control].remaining <= 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Entering congestion with " - << tokens_[token_control].remaining - << " tokens remaining.\n" << logofs_flush; - #endif - - congestion_ = 1; - } - - statistics -> updateCongestion(tokens_[token_control].remaining, - tokens_[token_control].limit); - - return 1; -} - -int Proxy::handleTokenFromProxy(T_proxy_token &token, int count) -{ - #if defined(TEST) || defined(INFO) || defined(TOKEN) - *logofs << "Proxy: TOKEN! Received token [" - << DumpToken(token.type) << "] request at " - << strMsTimestamp() << " with count " - << count << ".\n" << logofs_flush; - #endif - - // - // Since ProtoStep7 (#issue 108) with no limitations - // concerning invalid token requests at this point - // - - // - // Add our token reply. - // - - if (handleControl(token.reply, count) < 0) - { - return -1; - } - - return 1; -} - -int Proxy::handleTokenReplyFromProxy(T_proxy_token &token, int count) -{ - #if defined(TEST) || defined(INFO) || defined(TOKEN) - *logofs << "Proxy: TOKEN! Received token [" - << DumpToken(token.type) << "] reply at " - << strMsTimestamp() << " with count " << count - << ".\n" << logofs_flush; - #endif - - // - // Since ProtoStep7 (#issue 108) with no limitations - // concerning invalid token requests at this point - // - - // - // Increment the available tokens. - // - - token.remaining += count; - - if (token.remaining > token.limit) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Token overflow handling messages.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Token overflow handling messages.\n"; - - HandleCleanup(); - } - - #if defined(TEST) || defined(INFO) || defined(TOKEN) - *logofs << "Proxy: TOKEN! Token class [" - << DumpToken(token.type) << "] has now " << token.bytes - << " bytes accumulated and " << token.remaining - << " tokens remaining.\n" << logofs_flush; - #endif - - // - // Check if we can jump out of the - // congestion state. - // - - if (congestion_ == 1 && - tokens_[token_control].remaining > 0) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: Exiting congestion with " - << tokens_[token_control].remaining - << " tokens remaining.\n" << logofs_flush; - #endif - - congestion_ = 0; - } - - statistics -> updateCongestion(tokens_[token_control].remaining, - tokens_[token_control].limit); - - return 1; -} - -void Proxy::handleFailOnSave(const char *fullName, const char *failContext) const -{ - #ifdef WARNING - *logofs << "Proxy: WARNING! Error saving stores to cache file " - << "in context [" << failContext << "].\n" << logofs_flush; - #endif - - cerr << "Warning" << ": Error saving stores to cache file " - << "in context [" << failContext << "].\n"; - - #ifdef WARNING - *logofs << "Proxy: WARNING! Removing invalid cache '" - << fullName << "'.\n" << logofs_flush; - #endif - - cerr << "Warning" << ": Removing invalid cache '" - << fullName << "'.\n"; - - unlink(fullName); -} - -void Proxy::handleFailOnLoad(const char *fullName, const char *failContext) const -{ - #ifdef WARNING - *logofs << "Proxy: WARNING! Error loading stores from cache file " - << "in context [" << failContext << "].\n" << logofs_flush; - #endif - - cerr << "Warning" << ": Error loading stores from cache file " - << "in context [" << failContext << "].\n"; - - #ifdef WARNING - *logofs << "Proxy: WARNING! Removing invalid cache '" - << fullName << "'.\n" << logofs_flush; - #endif - - cerr << "Warning" << ": Removing invalid cache '" - << fullName << "'.\n"; - - unlink(fullName); -} - -int Proxy::handleSaveVersion(unsigned char *buffer, int &major, - int &minor, int &patch) const -{ - // Since ProtoStep8 (#issue 108) - major = 3; - minor = 0; - patch = 0; - - *(buffer + 0) = major; - *(buffer + 1) = minor; - - PutUINT(patch, buffer + 2, storeBigEndian()); - - return 1; -} - -int Proxy::handleLoadVersion(const unsigned char *buffer, int &major, - int &minor, int &patch) const -{ - major = *(buffer + 0); - minor = *(buffer + 1); - - patch = GetUINT(buffer + 2, storeBigEndian()); - - // - // Force the proxy to discard the - // incompatible caches. - // - - // Since ProtoStep8 (#issue 108) - if (major < 3) - { - return -1; - } - - return 1; -} - -char *Proxy::handleSaveAllStores(const char *savePath, bool & isTooSmall) const -{ - isTooSmall = false; - - int cumulativeSize = MessageStore::getCumulativeTotalStorageSize(); - - if (cumulativeSize < control -> PersistentCacheThreshold) - { - #ifdef TEST - *logofs << "Proxy: Cache not saved as size is " - << cumulativeSize << " with threshold set to " - << control -> PersistentCacheThreshold - << ".\n" << logofs_flush; - #endif - - // - // Cumulative store size is smaller than threshold - // so the indicator is set to true - // - - isTooSmall = true; - - return NULL; - } - else if (savePath == NULL) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! No name provided for save path.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": No name provided for save path.\n"; - - return NULL; - } - - #ifdef TEST - *logofs << "Proxy: Going to save content of message stores.\n" - << logofs_flush; - #endif - - // - // Our parent process is likely going to terminate. - // Until we finish saving cache we must ignore its - // SIGIPE. - // - - DisableSignals(); - - ofstream *cachefs = NULL; - - md5_state_t *md5StateStream = NULL; - md5_byte_t *md5DigestStream = NULL; - - md5_state_t *md5StateClient = NULL; - md5_byte_t *md5DigestClient = NULL; - - char md5String[MD5_LENGTH * 2 + 2]; - - char fullName[strlen(savePath) + MD5_LENGTH * 2 + 4]; - - // - // Prepare the template for the temporary file - // - - const char* const uniqueTemplate = "XXXXXX"; - char tempName[strlen(savePath) + strlen("/") + 4 + strlen(uniqueTemplate) + 1]; - - snprintf(tempName, sizeof tempName, "%s/%s%s", - savePath, - control -> ProxyMode == proxy_client ? - "Z-C-" : - "Z-S-", - uniqueTemplate); - - #ifdef TEST - *logofs << "Proxy: Generating temporary file with template '" - << tempName << "'.\n" << logofs_flush; - #endif - - // - // Change the mask to make the file only - // readable by the user, then restore the - // old mask. - // - - mode_t fileMode = umask(0077); - - // - // Generate a unique temporary filename from tempName - // and then create and open the file - // - - int fdTemp = mkstemp(tempName); - if (fdTemp == -1) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Can't create temporary file in '" - << savePath << "'. Cause = " << strerror(errno) << ".\n" << logofs_flush; - #endif - - cerr << "Error" << ": Can't create temporary file in '" - << savePath << "'. Cause = " << strerror(errno) << ".\n"; - - umask(fileMode); - - EnableSignals(); - - return NULL; - } - - #ifdef TEST - *logofs << "Proxy: Saving cache to file '" - << tempName << "'.\n" << logofs_flush; - #endif - - // - // Create and open the output stream for the new temporary - // file - // - - cachefs = new (std::nothrow) ofstream(tempName, ios::out | ios::binary); - if ((cachefs == NULL) || cachefs->fail()) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Can't create stream for temporary file '" - << tempName << "'.\n" << logofs_flush; - #endif - - cerr << "Error" << ": Can't create stream for temporary file '" - << tempName << "'.\n"; - - close(fdTemp); - unlink(tempName); - - umask(fileMode); - - EnableSignals(); - - return NULL; - } - - // - // Close the file descriptor returned by mkstemp - // and restore the old mask - // - - close(fdTemp); - umask(fileMode); - - md5StateStream = new md5_state_t(); - md5DigestStream = new md5_byte_t[MD5_LENGTH]; - - md5_init(md5StateStream); - - // - // First write the proxy version. - // - - unsigned char version[4]; - - int major; - int minor; - int patch; - - handleSaveVersion(version, major, minor, patch); - - #ifdef TEST - *logofs << "Proxy: Saving cache using version '" - << major << "." << minor << "." << patch - << "'.\n" << logofs_flush; - #endif - - if (PutData(cachefs, version, 4) < 0) - { - handleFailOnSave(tempName, "A"); - - delete cachefs; - - delete md5StateStream; - delete [] md5DigestStream; - - EnableSignals(); - - return NULL; - } - - // - // Make space for the calculated MD5 so we - // can later rewind the file and write it - // at this position. - // - - if (PutData(cachefs, md5DigestStream, MD5_LENGTH) < 0) - { - handleFailOnSave(tempName, "B"); - - delete cachefs; - - delete md5StateStream; - delete [] md5DigestStream; - - EnableSignals(); - - return NULL; - } - - md5StateClient = new md5_state_t(); - md5DigestClient = new md5_byte_t[MD5_LENGTH]; - - md5_init(md5StateClient); - - #ifdef DUMP - - ofstream *cacheDump = NULL; - - ofstream *tempfs = (ofstream*) logofs; - - char cacheDumpName[DEFAULT_STRING_LENGTH]; - - if (control -> ProxyMode == proxy_client) - { - snprintf(cacheDumpName, DEFAULT_STRING_LENGTH - 1, - "%s/client-cache-dump", control -> TempPath); - } - else - { - snprintf(cacheDumpName, DEFAULT_STRING_LENGTH - 1, - "%s/server-cache-dump", control -> TempPath); - } - - *(cacheDumpName + DEFAULT_STRING_LENGTH - 1) = '\0'; - - fileMode = umask(0077); - - cacheDump = new ofstream(cacheDumpName, ios::out); - - umask(fileMode); - - logofs = cacheDump; - - #endif - - // - // Use the virtual method of the concrete proxy class. - // - - int allSaved = handleSaveAllStores(cachefs, md5StateStream, md5StateClient); - - #ifdef DUMP - - logofs = tempfs; - - delete cacheDump; - - #endif - - if (allSaved == -1) - { - handleFailOnSave(tempName, "C"); - - delete cachefs; - - delete md5StateStream; - delete [] md5DigestStream; - - delete md5StateClient; - delete [] md5DigestClient; - - EnableSignals(); - - return NULL; - } - - md5_finish(md5StateClient, md5DigestClient); - - for (unsigned int i = 0; i < MD5_LENGTH; i++) - { - sprintf(md5String + (i * 2), "%02X", md5DigestClient[i]); - } - - strcpy(fullName, (control -> ProxyMode == proxy_client) ? "C-" : "S-"); - - strcat(fullName, md5String); - - md5_append(md5StateStream, (const md5_byte_t *) fullName, strlen(fullName)); - md5_finish(md5StateStream, md5DigestStream); - - // - // Go to the beginning of file plus - // the integer where we wrote our - // proxy version. - // - - cachefs -> seekp(4); - - if (PutData(cachefs, md5DigestStream, MD5_LENGTH) < 0) - { - handleFailOnSave(tempName, "D"); - - delete cachefs; - - delete md5StateStream; - delete [] md5DigestStream; - - delete md5StateClient; - delete [] md5DigestClient; - - EnableSignals(); - - return NULL; - } - - delete cachefs; - - // - // Copy the resulting cache name without - // the path so that we can return it to - // the caller. - // - - char *cacheName = new char[MD5_LENGTH * 2 + 4]; - - strcpy(cacheName, fullName); - - // - // Add the path to the full name and move - // the cache into the path. - // - - strcpy(fullName, savePath); - strcat(fullName, (control -> ProxyMode == proxy_client) ? "/C-" : "/S-"); - strcat(fullName, md5String); - - #ifdef TEST - *logofs << "Proxy: Renaming cache file from '" - << tempName << "' to '" << fullName - << "'.\n" << logofs_flush; - #endif - - rename(tempName, fullName); - - delete md5StateStream; - delete [] md5DigestStream; - - delete md5StateClient; - delete [] md5DigestClient; - - // - // Restore the original handlers. - // - - EnableSignals(); - - #ifdef TEST - *logofs << "Proxy: Successfully saved cache file '" - << cacheName << "'.\n" << logofs_flush; - #endif - - // - // This must be enabled only for test - // because it requires that client and - // server reside on the same machine. - // - - if (control -> PersistentCacheCheckOnShutdown == 1 && - control -> ProxyMode == proxy_server) - { - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: MATCH! Checking if the file '" - << fullName << "' matches a client cache.\n" - << logofs_flush; - #endif - - strcpy(fullName, savePath); - strcat(fullName, "/C-"); - strcat(fullName, md5String); - - struct stat fileStat; - - if (stat(fullName, &fileStat) != 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Can't find a client cache " - << "with name '" << fullName << "'.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Can't find a client cache " - << "with name '" << fullName << "'.\n"; - - HandleShutdown(); - } - - #if defined(TEST) || defined(INFO) - *logofs << "Proxy: MATCH! Client cache '" << fullName - << "' matches the local cache.\n" - << logofs_flush; - #endif - } - - return cacheName; -} - -const char *Proxy::handleLoadAllStores(const char *loadPath, const char *loadName) const -{ - #ifdef TEST - *logofs << "Proxy: Going to load content of message stores.\n" - << logofs_flush; - #endif - - // - // Until we finish loading cache we - // must at least ignore any SIGIPE. - // - - DisableSignals(); - - if (loadPath == NULL || loadName == NULL) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! No path or no file name provided for cache to restore.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": No path or no file name provided for cache to restore.\n"; - - EnableSignals(); - - return NULL; - } - else if (strlen(loadName) != MD5_LENGTH * 2 + 2) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Bad file name provided for cache to restore.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Bad file name provided for cache to restore.\n"; - - EnableSignals(); - - return NULL; - } - - istream *cachefs = NULL; - char md5String[(MD5_LENGTH * 2) + 2]; - md5_byte_t md5FromFile[MD5_LENGTH]; - - char *cacheName = new char[strlen(loadPath) + strlen(loadName) + 3]; - - strcpy(cacheName, loadPath); - strcat(cacheName, "/"); - strcat(cacheName, loadName); - - #ifdef TEST - *logofs << "Proxy: Name of cache file is '" - << cacheName << "'.\n" << logofs_flush; - #endif - - cachefs = new ifstream(cacheName, ios::in | ios::binary); - - unsigned char version[4]; - - if (cachefs == NULL || GetData(cachefs, version, 4) < 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Can't read cache file '" - << cacheName << "'.\n" << logofs_flush;; - #endif - - handleFailOnLoad(cacheName, "A"); - - delete cachefs; - - delete [] cacheName; - - EnableSignals(); - - return NULL; - } - - int major; - int minor; - int patch; - - if (handleLoadVersion(version, major, minor, patch) < 0) - { - #ifdef PANIC - *logofs << "Proxy: WARNING! Incompatible version '" - << major << "." << minor << "." << patch - << "' in cache file '" << cacheName - << "'.\n" << logofs_flush; - #endif - - cerr << "Warning" << ": Incompatible version '" - << major << "." << minor << "." << patch - << "' in cache file '" << cacheName - << "'.\n" << logofs_flush; - - if (control -> ProxyMode == proxy_server) - { - handleFailOnLoad(cacheName, "B"); - } - else - { - // - // Simply remove the cache file. - // - - unlink(cacheName); - } - - delete cachefs; - - delete [] cacheName; - - EnableSignals(); - - return NULL; - } - - #ifdef TEST - *logofs << "Proxy: Reading from cache file version '" - << major << "." << minor << "." << patch - << "'.\n" << logofs_flush; - #endif - - if (GetData(cachefs, md5FromFile, MD5_LENGTH) < 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! No checksum in cache file '" - << loadName << "'.\n" << logofs_flush; - #endif - - handleFailOnLoad(cacheName, "C"); - - delete cachefs; - - delete [] cacheName; - - EnableSignals(); - - return NULL; - } - - md5_state_t *md5StateStream = NULL; - md5_byte_t *md5DigestStream = NULL; - - md5StateStream = new md5_state_t(); - md5DigestStream = new md5_byte_t[MD5_LENGTH]; - - md5_init(md5StateStream); - - // - // Use the virtual method of the proxy class. - // - - if (handleLoadAllStores(cachefs, md5StateStream) < 0) - { - handleFailOnLoad(cacheName, "D"); - - delete cachefs; - - delete md5StateStream; - delete [] md5DigestStream; - - delete [] cacheName; - - EnableSignals(); - - return NULL; - } - - md5_append(md5StateStream, (const md5_byte_t *) loadName, strlen(loadName)); - md5_finish(md5StateStream, md5DigestStream); - - for (int i = 0; i < MD5_LENGTH; i++) - { - if (md5DigestStream[i] != md5FromFile[i]) - { - #ifdef PANIC - - *logofs << "Proxy: PANIC! Bad checksum for cache file '" - << cacheName << "'.\n" << logofs_flush; - - for (unsigned int i = 0; i < MD5_LENGTH; i++) - { - sprintf(md5String + (i * 2), "%02X", md5FromFile[i]); - } - - *logofs << "Proxy: PANIC! Saved checksum is '" - << md5String << "'.\n" << logofs_flush; - - for (unsigned int i = 0; i < MD5_LENGTH; i++) - { - sprintf(md5String + (i * 2),"%02X", md5DigestStream[i]); - } - - *logofs << "Proxy: PANIC! Calculated checksum is '" - << md5String << "'.\n" << logofs_flush; - - #endif - - handleFailOnLoad(cacheName, "E"); - - delete cachefs; - - delete md5StateStream; - delete [] md5DigestStream; - - delete [] cacheName; - - EnableSignals(); - - return NULL; - } - } - - delete cachefs; - - delete md5StateStream; - delete [] md5DigestStream; - - delete [] cacheName; - - // - // Restore the original handlers. - // - - EnableSignals(); - - #ifdef TEST - *logofs << "Proxy: Successfully loaded cache file '" - << loadName << "'.\n" << logofs_flush; - #endif - - // - // Return the string provided by caller. - // - - return loadName; -} - -int Proxy::allocateChannelMap(int fd) -{ - // - // We assume that the fd is lower than - // the maximum allowed number. This is - // checked at the time the connection - // is accepted. - // - - if (fd < 0 || fd >= CONNECTIONS_LIMIT) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Internal error allocating " - << "new channel with FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Internal error allocating " - << "new channel with FD#" << fd_ << ".\n"; - - HandleCleanup(); - } - - for (int channelId = 0; - channelId < CONNECTIONS_LIMIT; - channelId++) - { - if (checkLocalChannelMap(channelId) == 1 && - fdMap_[channelId] == -1) - { - fdMap_[channelId] = fd; - channelMap_[fd] = channelId; - - #ifdef TEST - *logofs << "Proxy: Allocated new channel ID#" - << channelId << " with FD#" << fd << ".\n" - << logofs_flush; - #endif - - return channelId; - } - } - - // - // No available channel is remaining. - // - - #ifdef TEST - *logofs << "Proxy: WARNING! Can't allocate a new channel " - << "for FD#" << fd_ << ".\n" << logofs_flush; - #endif - - return -1; -} - -int Proxy::checkChannelMap(int channelId) -{ - // - // To be acceptable, the channel id must - // be an id that is not possible to use - // to allocate channels at this side. - // - - if (checkLocalChannelMap(channelId) == 1) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Can't open a new channel " - << "with invalid ID#" << channelId << ".\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Can't open a new channel " - << "with invalid ID#" << channelId << ".\n"; - - return -1; - } - else if (channels_[channelId] != NULL) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Can't open a new channel " - << "over an existing ID#" << channelId - << " with FD#" << getFd(channelId) - << ".\n" << logofs_flush; - #endif - - cerr << "Error" << ": Can't open a new channel " - << "over an existing ID#" << channelId - << " with FD#" << getFd(channelId) - << ".\n"; - - return -1; - } - - return 1; -} - -int Proxy::assignChannelMap(int channelId, int fd) -{ - // - // We assume that the fd is lower than - // the maximum allowed number. This is - // checked at the time the connection - // is accepted. - // - - if (channelId < 0 || channelId >= CONNECTIONS_LIMIT || - fd < 0 || fd >= CONNECTIONS_LIMIT) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Internal error assigning " - << "new channel with FD#" << fd_ << ".\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Internal error assigning " - << "new channel with FD#" << fd_ << ".\n"; - - HandleCleanup(); - } - - fdMap_[channelId] = fd; - channelMap_[fd] = channelId; - - return 1; -} - -void Proxy::cleanupChannelMap(int channelId) -{ - int fd = fdMap_[channelId]; - - if (fd != -1) - { - fdMap_[channelId] = -1; - channelMap_[fd] = -1; - } -} - -int Proxy::addControlCodes(T_proxy_code code, int data) -{ - // - // Flush the encode buffer plus all the outstanding - // control codes if there is not enough space for - // the new control message. We need to ensure that - // there are further bytes available, in the case - // we will need to add more token control messages. - // - - if (controlLength_ + 3 > CONTROL_CODES_THRESHOLD) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! Flushing control messages " - << "while sending code '" << DumpControl(code) - << "'.\n" << logofs_flush; - #endif - - if (handleFlush() < 0) - { - return -1; - } - } - - controlCodes_[controlLength_++] = 0; - controlCodes_[controlLength_++] = (unsigned char) code; - controlCodes_[controlLength_++] = (unsigned char) (data == -1 ? 0 : data); - - // - // Account for the control frame. - // - - statistics -> addFrameOut(); - - return 1; -} - -void Proxy::setSplitTimeout(int channelId) -{ - int needed = channels_[channelId] -> needSplit(); - - if (needed != isTimestamp(timeouts_.splitTs)) - { - if (needed == 1) - { - #if defined(TEST) || defined(INFO) || defined(SPLIT) - *logofs << "Proxy: SPLIT! Allocating split timestamp at " - << strMsTimestamp() << ".\n" << logofs_flush; - #endif - - timeouts_.splitTs = getTimestamp(); - } - else - { - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] != NULL && - channels_[channelId] -> needSplit() == 1) - { - #ifdef TEST - *logofs << "Proxy: SPLIT! Channel for FD#" - << getFd(channelId) << " still needs splits.\n" - << logofs_flush; - #endif - - return; - } - } - - #if defined(TEST) || defined(INFO) || defined(SPLIT) - *logofs << "Proxy: SPLIT! Resetting split timestamp at " - << strMsTimestamp() << ".\n" << logofs_flush; - #endif - - timeouts_.splitTs = nullTimestamp(); - } - } -} - -void Proxy::setMotionTimeout(int channelId) -{ - int needed = channels_[channelId] -> needMotion(); - - if (needed != isTimestamp(timeouts_.motionTs)) - { - if (channels_[channelId] -> needMotion() == 1) - { - #if defined(TEST) || defined(INFO) || defined(SPLIT) - *logofs << "Proxy: Allocating motion timestamp at " - << strMsTimestamp() << ".\n" << logofs_flush; - #endif - - timeouts_.motionTs = getTimestamp(); - } - else - { - T_list &channelList = activeChannels_.getList(); - - for (T_list::iterator j = channelList.begin(); - j != channelList.end(); j++) - { - int channelId = *j; - - if (channels_[channelId] != NULL && - channels_[channelId] -> needMotion() == 1) - { - #ifdef TEST - *logofs << "Proxy: SPLIT! Channel for FD#" - << getFd(channelId) << " still needs motions.\n" - << logofs_flush; - #endif - - return; - } - } - - #if defined(TEST) || defined(INFO) || defined(SPLIT) - *logofs << "Proxy: Resetting motion timestamp at " - << strMsTimestamp() << ".\n" << logofs_flush; - #endif - - timeouts_.motionTs = nullTimestamp(); - } - } -} - -void Proxy::increaseChannels(int channelId) -{ - #ifdef TEST - *logofs << "Proxy: Adding channel " << channelId - << " to the list of active channels.\n" - << logofs_flush; - #endif - - activeChannels_.add(channelId); - - #ifdef TEST - *logofs << "Proxy: There are " << activeChannels_.getSize() - << " allocated channels for proxy FD#" << fd_ - << ".\n" << logofs_flush; - #endif -} - -void Proxy::decreaseChannels(int channelId) -{ - #ifdef TEST - *logofs << "Proxy: Removing channel " << channelId - << " from the list of active channels.\n" - << logofs_flush; - #endif - - activeChannels_.remove(channelId); - - #ifdef TEST - *logofs << "Proxy: There are " << activeChannels_.getSize() - << " allocated channels for proxy FD#" << fd_ - << ".\n" << logofs_flush; - #endif -} - -int Proxy::allocateTransport(int channelFd, int channelId) -{ - if (transports_[channelId] == NULL) - { - transports_[channelId] = new Transport(channelFd); - - if (transports_[channelId] == NULL) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Can't allocate transport for " - << "channel id " << channelId << ".\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Can't allocate transport for " - << "channel id " << channelId << ".\n"; - - return -1; - } - } - else if (transports_[channelId] -> - getType() != transport_agent) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Transport for channel id " - << channelId << " should be null.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Transport for channel id " - << channelId << " should be null.\n"; - - return -1; - } - - return 1; -} - -int Proxy::deallocateTransport(int channelId) -{ - // - // Transport for the agent connection - // is passed from the outside when - // creating the channel. - // - - if (transports_[channelId] -> - getType() != transport_agent) - { - delete transports_[channelId]; - } - - transports_[channelId] = NULL; - - return 1; -} - -int Proxy::handleNewGenericConnection(int clientFd, T_channel_type type, const char *label) -{ - int channelId = allocateChannelMap(clientFd); - - if (channelId == -1) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Maximum number of available " - << "channels exceeded.\n" << logofs_flush; - #endif - - cerr << "Error" << ": Maximum number of available " - << "channels exceeded.\n"; - - return -1; - } - - #ifdef TEST - *logofs << "Proxy: Channel for " << label << " descriptor " - << "FD#" << clientFd << " mapped to ID#" - << channelId << ".\n" - << logofs_flush; - #endif - - // - // Turn queuing off for path server-to-proxy. - // - - SetNoDelay(clientFd, 1); - - if (allocateTransport(clientFd, channelId) < 0) - { - return -1; - } - - switch (type) - { - case channel_cups: - { - channels_[channelId] = new CupsChannel(transports_[channelId], compressor_); - - break; - } - case channel_smb: - { - channels_[channelId] = new SmbChannel(transports_[channelId], compressor_); - - break; - } - case channel_media: - { - channels_[channelId] = new MediaChannel(transports_[channelId], compressor_); - - break; - } - case channel_http: - { - channels_[channelId] = new HttpChannel(transports_[channelId], compressor_); - - break; - } - case channel_font: - { - channels_[channelId] = new FontChannel(transports_[channelId], compressor_); - - break; - } - default: - { - channels_[channelId] = new SlaveChannel(transports_[channelId], compressor_); - - break; - } - } - - if (channels_[channelId] == NULL) - { - deallocateTransport(channelId); - - return -1; - } - - #ifdef TEST - *logofs << "Proxy: Accepted new connection to " - << label << " server.\n" << logofs_flush; - #endif - - cerr << "Info" << ": Accepted new connection to " - << label << " server.\n"; - - increaseChannels(channelId); - - switch (type) - { - case channel_cups: - { - if (handleControl(code_new_cups_connection, channelId) < 0) - { - return -1; - } - - break; - } - case channel_smb: - { - if (handleControl(code_new_smb_connection, channelId) < 0) - { - return -1; - } - - break; - } - case channel_media: - { - if (handleControl(code_new_media_connection, channelId) < 0) - { - return -1; - } - - break; - } - case channel_http: - { - if (handleControl(code_new_http_connection, channelId) < 0) - { - return -1; - } - - break; - } - case channel_font: - { - if (handleControl(code_new_font_connection, channelId) < 0) - { - return -1; - } - - break; - } - default: - { - if (handleControl(code_new_slave_connection, channelId) < 0) - { - return -1; - } - - break; - } - } - - channels_[channelId] -> handleConfiguration(); - - return 1; -} - -int Proxy::handleNewSlaveConnection(int clientFd) -{ - // Since ProtoStep7 (#issue 108) - return handleNewGenericConnection(clientFd, channel_slave, "slave"); -} - - - -int Proxy::handleNewGenericConnectionFromProxy(int channelId, T_channel_type type, - ChannelEndPoint &endPoint, const char *label) -{ - char *unixPath, *host; - long port; - - if (endPoint.getUnixPath(&unixPath)) { - return handleNewGenericConnectionFromProxyUnix(channelId, type, unixPath, label); - } - - if (endPoint.getTCPHostAndPort(&host, &port)) { - return handleNewGenericConnectionFromProxyTCP(channelId, type, host, port, label); - } - - #ifdef WARNING - *logofs << "Proxy: WARNING! Refusing attempted connection " - << "to " << label << " server.\n" << logofs_flush; - #endif - - cerr << "Warning" << ": Refusing attempted connection " - << "to " << label << " server.\n"; - - return -1; -} - -int Proxy::handleNewGenericConnectionFromProxyTCP(int channelId, T_channel_type type, - const char *hostname, long port, const char *label) - -{ - if (port <= 0) - { - // - // This happens when user has disabled - // forwarding of the specific service. - // - - #ifdef WARNING - *logofs << "Proxy: WARNING! Refusing attempted connection " - << "to " << label << " server.\n" << logofs_flush; - #endif - - cerr << "Warning" << ": Refusing attempted connection " - << "to " << label << " server.\n"; - - return -1; - } - - const char *serverHost = hostname; - int serverAddrFamily = AF_INET; - sockaddr *serverAddr = NULL; - unsigned int serverAddrLength = 0; - - #ifdef TEST - *logofs << "Proxy: Connecting to " << label - << " server '" << serverHost << "' on TCP port '" - << port << "'.\n" << logofs_flush; - #endif - - int serverIPAddr = GetHostAddress(serverHost); - - if (serverIPAddr == 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Unknown " << label - << " server host '" << serverHost << "'.\n" - << logofs_flush; - #endif - - cerr << "Error" << ": Unknown " << label - << " server host '" << serverHost - << "'.\n"; - - return -1; - } - - sockaddr_in *serverAddrTCP = new sockaddr_in; - - serverAddrTCP -> sin_family = AF_INET; - serverAddrTCP -> sin_port = htons(port); - serverAddrTCP -> sin_addr.s_addr = serverIPAddr; - - serverAddr = (sockaddr *) serverAddrTCP; - serverAddrLength = sizeof(sockaddr_in); - - // - // Connect to the requested server. - // - - int serverFd = socket(serverAddrFamily, SOCK_STREAM, PF_UNSPEC); - - if (serverFd < 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Call to socket failed. " - << "Error is " << EGET() << " '" << ESTR() - << "'.\n" << logofs_flush; - #endif - - cerr << "Error" << ": Call to socket failed. " - << "Error is " << EGET() << " '" << ESTR() - << "'.\n"; - - delete serverAddrTCP; - - return -1; - } - else if (connect(serverFd, serverAddr, serverAddrLength) < 0) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! Connection to " << label - << " server '" << serverHost << ":" << port - << "' failed with error '" << ESTR() << "'.\n" - << logofs_flush; - #endif - - cerr << "Warning" << ": Connection to " << label - << " server '" << serverHost << ":" << port - << "' failed with error '" << ESTR() << "'.\n"; - - close(serverFd); - - delete serverAddrTCP; - - return -1; - } - - delete serverAddrTCP; - - if (handlePostConnectionFromProxy(channelId, serverFd, type, label) < 0) - { - return -1; - } - - #ifdef TEST - *logofs << "Proxy: Forwarded new connection to " - << label << " server on port '" << port - << "'.\n" << logofs_flush; - #endif - - cerr << "Info" << ": Forwarded new connection to " - << label << " server on port '" << port - << "'.\n"; - - return 1; -} - -int Proxy::handleNewGenericConnectionFromProxyUnix(int channelId, T_channel_type type, - const char *path, const char *label) -{ - if (path == NULL || *path == '\0' ) - { - // - // This happens when user has disabled - // forwarding of the specific service. - // - - #ifdef WARNING - *logofs << "Proxy: WARNING! Refusing attempted connection " - << "to " << label << " server.\n" << logofs_flush; - #endif - - cerr << "Warning" << ": Refusing attempted connection " - << "to " << label << " server.\n"; - - return -1; - } - - sockaddr_un serverAddrUnix; - - unsigned int serverAddrLength = sizeof(sockaddr_un); - - int serverAddrFamily = AF_UNIX; - - serverAddrUnix.sun_family = AF_UNIX; - - const int serverAddrNameLength = 108; - - strncpy(serverAddrUnix.sun_path, path, serverAddrNameLength); - - *(serverAddrUnix.sun_path + serverAddrNameLength - 1) = '\0'; - - #ifdef TEST - *logofs << "Proxy: Connecting to " << label << " server " - << "on Unix port '" << path << "'.\n" << logofs_flush; - #endif - - // - // Connect to the requested server. - // - - int serverFd = socket(serverAddrFamily, SOCK_STREAM, PF_UNSPEC); - - if (serverFd < 0) - { - #ifdef PANIC - *logofs << "Proxy: PANIC! Call to socket failed. " - << "Error is " << EGET() << " '" << ESTR() - << "'.\n" << logofs_flush; - #endif - - cerr << "Error" << ": Call to socket failed. " - << "Error is " << EGET() << " '" << ESTR() - << "'.\n"; - - return -1; - } - else if (connect(serverFd, (sockaddr *) &serverAddrUnix, serverAddrLength) < 0) - { - #ifdef WARNING - *logofs << "Proxy: WARNING! Connection to " << label - << " server on Unix port '" << path << "' failed " - << "with error " << EGET() << ", '" << ESTR() << "'.\n" - << logofs_flush; - #endif - - cerr << "Warning" << ": Connection to " << label - << " server on Unix port '" << path << "' failed " - << "with error " << EGET() << ", '" << ESTR() << "'.\n"; - - close(serverFd); - - return -1; - } - - if (handlePostConnectionFromProxy(channelId, serverFd, type, label) < 0) - { - return -1; - } - - #ifdef TEST - *logofs << "Proxy: Forwarded new connection to " - << label << " server on Unix port '" << path - << "'.\n" << logofs_flush; - #endif - - cerr << "Info" << ": Forwarded new connection to " - << label << " server on Unix port '" << path - << "'.\n"; - - return 1; -} - -int Proxy::handleNewSlaveConnectionFromProxy(int channelId) -{ - - cerr << "Info" << ": New slave connection on " - << "channel ID#" << channelId << "\n"; - - char *nx_slave_cmd = getenv("NX_SLAVE_CMD"); - if (nx_slave_cmd == NULL) { - return -1; - } - - int spair[2]; - if (socketpair(AF_UNIX, SOCK_STREAM, 0, spair) == -1) { - perror("socketpair"); - return -1; - } - - int serverFd = spair[0]; - int clientFd = spair[1]; - - if (handlePostConnectionFromProxy(channelId, serverFd, channel_slave, "slave") < 0) - { - close(serverFd); - close(clientFd); - return -1; - } - - - int pid = fork(); - if (pid == 0) - { - - if (dup2(clientFd, 0) == -1) - { - perror("dup2"); - exit(1); - } - - if (dup2(clientFd, 1) == -1) - { - perror("dup2"); - exit(1); - } - - close(serverFd); - close(clientFd); - - /* Close FDs used by NX, QVD #1208 */ - for (int fd = 3; fd < 256; fd++) { - close(fd); - } - - char *const argv[2] = {nx_slave_cmd, NULL}; - - if (execv(nx_slave_cmd, argv) == -1) - { - perror("execv"); - } - exit(1); - - } - else if (pid == -1) - { - // TODO Test this! - perror("fork"); - close(serverFd); - close(clientFd); - return -1; - } - - close(clientFd); - slavePidMap_[channelId] = pid; - - cerr << "Info" << ": slave channel ID#" << channelId << " handler has PID " << pid << endl; - - return 1; -} - -void Proxy::checkSlaves() -{ - for (int channelId = 0; channelId<CONNECTIONS_LIMIT; channelId++) - { - int pid = slavePidMap_[channelId]; - - if (pid > 1 && HandleChild(pid)) - { - slavePidMap_[channelId] = nothing; - cerr << "Info:" << " Handled death of slave with pid " << pid << endl; - } - } -} - -int Proxy::handlePostConnectionFromProxy(int channelId, int serverFd, - T_channel_type type, const char *label) -{ - // - // Turn queuing off for path proxy-to-server. - // - - SetNoDelay(serverFd, 1); - - assignChannelMap(channelId, serverFd); - - #ifdef TEST - *logofs << "Proxy: Descriptor FD#" << serverFd - << " mapped to channel ID#" << channelId << ".\n" - << logofs_flush; - #endif - - if (allocateTransport(serverFd, channelId) < 0) - { - return -1; - } - - switch (type) - { - case channel_cups: - { - channels_[channelId] = new CupsChannel(transports_[channelId], compressor_); - break; - } - case channel_smb: - { - channels_[channelId] = new SmbChannel(transports_[channelId], compressor_); - - break; - } - case channel_media: - { - channels_[channelId] = new MediaChannel(transports_[channelId], compressor_); - - break; - } - case channel_http: - { - channels_[channelId] = new HttpChannel(transports_[channelId], compressor_); - - break; - } - case channel_font: - { - channels_[channelId] = new FontChannel(transports_[channelId], compressor_); - - break; - } - default: - { - channels_[channelId] = new SlaveChannel(transports_[channelId], compressor_); - - break; - } - } - - if (channels_[channelId] == NULL) - { - deallocateTransport(channelId); - - return -1; - } - - increaseChannels(channelId); - - channels_[channelId] -> handleConfiguration(); - - return 1; -} |